2017-07-23 2 views
1

J'ai fait le DAG suivant dans le flux d'air où j'exécute un ensemble de EMRSteps pour exécuter mon pipeline.Airflow EMR exécuter l'étape de capteur

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 07, 20, 10, 00), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 5, 
    'retry_delay': timedelta(minutes=2), 
} 

dag = DAG('dag_import_match_hourly', 
     default_args=default_args, 
     description='Fancy Description', 
     schedule_interval=timedelta(hours=1), 
     dagrun_timeout=timedelta(hours=2)) 

try: 
    merge_s3_match_step = EmrAddStepsOperator(
     task_id='merge_s3_match_step', 
     job_flow_id=cluster_id, 
     aws_conn_id='aws_default', 
     steps=create_step('Merge S3 Match'), 
     dag=dag 
    ) 

    mapreduce_step = EmrAddStepsOperator(
     task_id='mapreduce_match_step', 
     job_flow_id=cluster_id, 
     aws_conn_id='aws_default', 
     steps=create_step('MapReduce Match Hourly'), 
     dag=dag 
    ) 

    merge_hdfs_step = EmrAddStepsOperator(
     task_id='merge_hdfs_step', 
     job_flow_id=cluster_id, 
     aws_conn_id='aws_default', 
     steps=create_step('Merge HDFS Match Hourly'), 
     dag=dag 
    ) 

    ## Sensors 
    check_merge_s3 = EmrStepSensor(
     task_id='watch_merge_s3', 
     job_flow_id=cluster_id, 
     step_id="{{ task_instance.xcom_pull('merge_s3_match_step', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    check_mapreduce = EmrStepSensor(
     task_id='watch_mapreduce', 
     job_flow_id=cluster_id, 
     step_id="{{ task_instance.xcom_pull('mapreduce_match_step', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    check_merge_hdfs = EmrStepSensor(
     task_id='watch_merge_hdfs', 
     job_flow_id=cluster_id, 
     step_id="{{ task_instance.xcom_pull('merge_hdfs_step', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    mapreduce_step.set_upstream(merge_s3_match_step) 
    merge_s3_match_step.set_downstream(check_merge_s3) 

    mapreduce_step.set_downstream(check_mapreduce) 

    merge_hdfs_step.set_upstream(mapreduce_step) 
    merge_hdfs_step.set_downstream(check_merge_hdfs) 

except AirflowException as ae: 
    print ae.message 

Le DAG fonctionne très bien, mais je voudrais utiliser les capteurs pour vous assurer que je vais exécuter l'étape suivante si et seulement si le DME emploi a été correctement. J'ai essayé peu de choses mais aucune d'entre elles ne fonctionne. Le code ci-dessus ne fait pas le travail correctement. Est-ce que quelqu'un sait comment utiliser l'EMRSensorStep pour atteindre mon objectif?

+0

Ces capteurs personnalisés sont-ils écrits? Je suis en train de configurer le débit d'air et l'emr et j'ai besoin d'un moyen de vérifier l'état des étapes dans le cluster. – luckytaxi

+1

Non ce sont les capteurs que vous pouvez trouver dans le paquet '' 'contrib''' - voir ici [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/sensors/emr_step_sensor.py ] (https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/sensors/emr_step_sensor.py) – davideberdin

+0

Thx ... je ne savais pas qu'il y en avait d'autres dans le répertoire 'contrib' . – luckytaxi

Répondre

1

Il ressemble à vos tâches de EmrStepSensor doivent définir des dépendances correctes, par exemple, check_mapreduce, si vous voulez attendre check_mapreduce pour terminer, l'étape suivante devrait être merge_hdfs_step.set_upstream(check_mapreduce) ou check_mapreduce.set_downstream(merge_hdfs_step). Donc, ce serait TaskA >> SensorA >> TaskB >> SensorB >> TaskC >> SensorC, essayez d'utiliser cette façon de configurer les dépendances

+0

Cela a totalement fonctionné, merci! – davideberdin