UDF从Spark SQL中的路径中仅提取文件名
问题描述:
在Apache Spark中有input_file_name函数,我用它将新列添加到Dataset中,并将其与当前正在处理的文件的名称相加。UDF从Spark SQL中的路径中仅提取文件名
问题是我想以某种方式定制此函数以仅返回文件名,在S3上省略它的完整路径。
现在,我在第二步骤中使用地图功能做更换的路径:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
但我想使用类似
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
答
在斯卡拉:
#register udf
spark.udf
.register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)
#use the udf to get last token(filename) in full path
val initialDs = spark.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name))
编辑:在Ja VA按照评论
#register udf
spark.udf()
.register("get_only_file_name", (String fullPath) -> {
int lastIndex = fullPath.lastIndexOf("/");
return fullPath.substring(lastIndex, fullPath.length - 1);
}, DataTypes.StringType);
import org.apache.spark.sql.functions.input_file_name
#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name()));
+1
谢谢,它做到了! – cingulata
+0
@ Anandj.Kadhi:我知道回复的时间非常晚,请检查一次更新。 – mrsrinivas
'.withColumn( “input_file_name”,get_only_file_name(input_file_name))'。这里'get_only_file_name'是udf。 – mrsrinivas