J'ai fait le DAG suivant dans le flux d'air où j'exécute un ensemble de EMRSteps pour exécuter mon pipeline.Airflow EMR exécuter l'étape de capteur
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 07, 20, 10, 00),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=2),
}
dag = DAG('dag_import_match_hourly',
default_args=default_args,
description='Fancy Description',
schedule_interval=timedelta(hours=1),
dagrun_timeout=timedelta(hours=2))
try:
merge_s3_match_step = EmrAddStepsOperator(
task_id='merge_s3_match_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('Merge S3 Match'),
dag=dag
)
mapreduce_step = EmrAddStepsOperator(
task_id='mapreduce_match_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('MapReduce Match Hourly'),
dag=dag
)
merge_hdfs_step = EmrAddStepsOperator(
task_id='merge_hdfs_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('Merge HDFS Match Hourly'),
dag=dag
)
## Sensors
check_merge_s3 = EmrStepSensor(
task_id='watch_merge_s3',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('merge_s3_match_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
check_mapreduce = EmrStepSensor(
task_id='watch_mapreduce',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('mapreduce_match_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
check_merge_hdfs = EmrStepSensor(
task_id='watch_merge_hdfs',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('merge_hdfs_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
mapreduce_step.set_upstream(merge_s3_match_step)
merge_s3_match_step.set_downstream(check_merge_s3)
mapreduce_step.set_downstream(check_mapreduce)
merge_hdfs_step.set_upstream(mapreduce_step)
merge_hdfs_step.set_downstream(check_merge_hdfs)
except AirflowException as ae:
print ae.message
Le DAG fonctionne très bien, mais je voudrais utiliser les capteurs pour vous assurer que je vais exécuter l'étape suivante si et seulement si le DME emploi a été correctement. J'ai essayé peu de choses mais aucune d'entre elles ne fonctionne. Le code ci-dessus ne fait pas le travail correctement. Est-ce que quelqu'un sait comment utiliser l'EMRSensorStep pour atteindre mon objectif?
Ces capteurs personnalisés sont-ils écrits? Je suis en train de configurer le débit d'air et l'emr et j'ai besoin d'un moyen de vérifier l'état des étapes dans le cluster. – luckytaxi
Non ce sont les capteurs que vous pouvez trouver dans le paquet '' 'contrib''' - voir ici [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/sensors/emr_step_sensor.py ] (https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/sensors/emr_step_sensor.py) – davideberdin
Thx ... je ne savais pas qu'il y en avait d'autres dans le répertoire 'contrib' . – luckytaxi