使用airflow将文件传输到kafka
答
可能最好使用PythonOperator
来逐行处理文件。我有一个用于轮询和SFTP服务器文件的用例,当我找到一些时,我逐行处理它们,并将结果写成JSON。我不喜欢的东西解析日期为YYYY-MM-DD格式等这样的事情可能为你工作:
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
现在,它真的取决于你将如何得到要下载的文件。在我的情况下,我使用SSHHook
和GoogleCloudStorageHook
从SFTP服务器获取文件,然后将这些文件的名称传递给解析和清理csv文件的任务。我通过SFTP拉低文件,并把它们放入谷歌云存储做到这一点:
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
我知道我可以做到这一切在一个大蟒蛇可调用的,这就是我现在如何重构代码,以便在可调用。因此,它轮询SFTP服务器,提取最新的文件,并根据我的规则解析它们,所有这些都在一个Python函数中。我听说使用XCom并不理想,据推测,Airflow任务不应该彼此交流太多。
根据您的使用情况,您甚至可能想要探索Apache Nifi之类的东西,我现在也正在研究它。
你是真的将这些文件加载到文件中,还是将它们加入到文件中?气流确实支持配料/微配料,但对于流媒体来说,我的经验表明它不是太好,基本上就像_nano_-batching。我对远程主机上的CSV文件进行了大量轮询,并将它们作为批次拉入BigQuery中。 – Mike
我逐行处理它们并将每行发送到kafka。 – bsd