如何访问来自Airflow的响应SimpleHttpOperator GET请求
问题描述:
我正在学习Airflow并有一个简单的问题。下面是我的DAG称为dog_retriever
如何访问来自Airflow的响应SimpleHttpOperator GET请求
import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'Loftium',
'depends_on_past': False,
'start_date': datetime(2017, 10, 9),
'email': '[email protected]',
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
dag = DAG('dog_retriever',
schedule_interval='@once',
default_args=default_args)
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
dag=dag)
t2 = SimpleHttpOperator(
task_id='get_breeds',
method='GET',
http_conn_id='http_default',
endpoint='api/breeds/list',
headers={"Content-Type": "application/json"},
dag=dag)
t2.set_upstream(t1)
至于测试出气流的手段,我只是在这个非常简单的http://dog.ceo API做两个GET请求一些端点。我们的目标是学习如何处理通过Airflow获取的一些数据
执行正在执行 - 我的代码成功地调用了任务t1和t2中的enpoints,我可以看到它们以正确的顺序记录在Airflow UI中基于我写的set_upstream
规则。
我无法弄清楚如何访问这2个任务的json响应。这似乎很简单,但我无法弄清楚。在SimpleHtttpOperator中,我看到了response_check的一个参数,但没有简单地打印,存储或查看json响应。
谢谢。
答
所以,因为这是SimpleHttpOperator,实际的json被推送到XCOM,你可以从那里获得它。下面是代码的行动路线:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87
你需要做的是设置xcom_push=True
,使你的第一个T1将是以下几点:
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
xcom_push=True,
dag=dag)
你应该能够找到所有JSON与return value
在XCOM中,XCOM的更多细节可以在以下网址找到:https://airflow.incubator.apache.org/concepts.html#xcoms
谢谢@Chengzhi,这个作品。虽然我认为我从现在开始简单地使用PythonOperator。 –