Python Apache Beam Pipeline状态API调用
问题描述:
我们目前有一个Python Apache Beam管道工作并能够在本地运行。我们现在正在使管道运行在Google Cloud Dataflow上,并且完全自动化,但是Dataflow/Apache Beam的流水线监控存在限制。Python Apache Beam Pipeline状态API调用
目前,Cloud Dataflow有两种方法可以通过它们的UI界面或命令行中的gcloud来监控您的管道状态。这两种解决方案都不适用于完全自动化的解决方案,我们可以考虑无损文件处理。
看着阿帕奇Beam的github上,他们有一个文件,internal/apiclient.py,显示有用于找工作的状态的功能,get_job。
我们发现get_job的一个实例是runners/dataflow_runner.py。
最终目标是使用此API来获取我们自动触发运行的一个或多个作业的状态,以确保它们最终都通过管道成功处理。
任何人都可以向我们解释在运行我们的管道(p.run()
)之后如何使用此API?我们不明白response = runner.dataflow_client.get_job(job_id)
的runner
来自哪里。
如果有人可以提供更大的理解我们如何在设置/运行我们的管道时访问此API调用,那将是非常好的!
答
我最终只是摆弄了代码,发现了如何获得工作细节。我们的下一步是看看是否有办法获得所有工作的清单。
# start the pipeline process
pipeline = p.run()
# get the job_id for the current pipeline and store it somewhere
job_id = pipeline.job_id()
# setup a job_version variable (either batch or streaming)
job_version = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
# setup "runner" which is just a dictionary, I call it local
local = {}
# create a dataflow_client
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version)
# get the job details from the dataflow_client
print local['dataflow_client'].get_job(job_id)