如何访问来自Airflow的响应SimpleHttpOperator GET请求

问题描述:

我正在学习Airflow并有一个简单的问题。下面是我的DAG称为dog_retriever如何访问来自Airflow的响应SimpleHttpOperator GET请求

import airflow 
from airflow import DAG 
from airflow.operators.http_operator import SimpleHttpOperator 
from airflow.operators.sensors import HttpSensor 
from datetime import datetime, timedelta 
import json 



default_args = { 
    'owner': 'Loftium', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 10, 9), 
    'email': '[email protected]', 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=3), 
} 

dag = DAG('dog_retriever', 
    schedule_interval='@once', 
    default_args=default_args) 

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2 = SimpleHttpOperator(
    task_id='get_breeds', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breeds/list', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2.set_upstream(t1) 

至于测试出气流的手段,我只是在这个非常简单的http://dog.ceo API做两个GET请求一些端点。我们的目标是学习如何处理通过Airflow获取的一些数据

执行正在执行 - 我的代码成功地调用了任务t1和t2中的enpoints,我可以看到它们以正确的顺序记录在Airflow UI中基于我写的set_upstream规则。

我无法弄清楚如何访问这2个任务的json响应。这似乎很简单,但我无法弄清楚。在SimpleHtttpOperator中,我看到了response_check的一个参数,但没有简单地打印,存储或查看json响应。

谢谢。

所以,因为这是SimpleHttpOperator,实际的json被推送到XCOM,你可以从那里获得它。下面是代码的行动路线:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

你需要做的是设置xcom_push=True,使你的第一个T1将是以下几点:

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    xcom_push=True, 
    dag=dag) 

你应该能够找到所有JSON与return value在XCOM中,XCOM的更多细节可以在以下网址找到:https://airflow.incubator.apache.org/concepts.html#xcoms

+0

谢谢@Chengzhi,这个作品。虽然我认为我从现在开始简单地使用PythonOperator。 –