使用Spark将文本文件导出到PostgreSQL - 自动化
问题描述:
我试图使用Spark将文本文件导出到Postgres数据库。我正在使用下面的一段代码来导出单个文本文件。我在同一个文件夹中有近200个文本文件,每个文本文件具有相同的结构。不幸的是,一年的价值不是我的输入文件的一部分,因此我很难编码它。使用Spark将文本文件导出到PostgreSQL - 自动化
我希望一次上传所有这些文件,但不知道该怎么做,有人有什么建议吗?
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt")
splits = lines.map(lambda l: l.split(","))
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870)))
schemaBabies = sqlContext.createDataFrame(raw_data)
schemaBabies.registerTempTable("raw_data")
df = sqlContext.sql("select * from raw_data")
pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX"
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"}
df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties)
答
让我们假设你的数据是这样的:
import csv
import tempfile
import os
out = tempfile.mkdtemp()
data = [
("1870", [("Jane Doe", "F", 3)]),
("1890", [("John Doe", "M", 1)]),
]
for year, rows in data:
with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw:
csv.writer(fw).writerows(rows)
开始PySpark会议或提交脚本传递正确spark-csv
到--packages
参数,负载数据与指定模式:
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("gender", StringType(), True),
StructField("count", LongType(), True)
])
df = (sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.load(out))
提取物年从文件名中写入:
from pyspark.sql.functions import input_file_name, regexp_extract
df_with_year = (df.withColumn(
"year",
regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int")))
df_with_year.show()
## +--------+------+-----+----+
## | name|gender|count|year|
## +--------+------+-----+----+
## |John Doe| M| 1|1890|
## |Jane Doe| F| 3|1870|
## +--------+------+-----+----+
df_with_year.write.jdbc(...)
重要:在Spark < 2.0中,此方法依赖于不在Python和JVM之间传递数据。它将无法与Python UDF或DataFrame.rdd.map
工作。
我确实根据您的输入对我的代码进行了一些更改,我可以将所有200多个文本文件加载到数据库中。真的很感激你的帮助。 – ytasfeb15