2017-10-16 11 views
1

Quelle est la meilleure approche pour diffuser des fichiers CSV sur un sujet kafka en utilisant le flux d'air?Diffuser des fichiers dans kafka en utilisant le flux d'air

Écrire un opérateur personnalisé pour le flux d'air?

+0

êtes-vous vraiment _streaming_ les fichiers ou les _batching_ eux? Airflow supporte vraiment bien le batch/micro-batching mais pour le streaming, mon expérience montre que ce n'est pas génial, ça fonctionne comme _nano_-batching. Je fais beaucoup d'interrogation pour les fichiers CSV sur des hôtes distants et les stocke dans BigQuery en tant que lots. – Mike

+0

Je les traite ligne par ligne et envoie chaque ligne à kafka. – bsd

Répondre

1

Probablement préférable d'utiliser le PythonOperator pour traiter les fichiers ligne par ligne. J'ai un cas d'utilisation où je sondage et le serveur SFTP pour les fichiers et quand j'en trouve, je les traite ligne par ligne, en écrivant les résultats en JSON. Je fais des choses comme les dates parse dans un format AAAA-MM-JJ, etc. Quelque chose comme cela pourrait fonctionner pour vous:

def csv_file_to_kafka(**context): 

    f = '/path/to/downloaded/csv_file.csv' 
    csvfile = open(f, 'r') 
    reader = csv.DictReader(csvfile) 

    for row in reader: 
     """ 
     Send the row to Kafka 
     """ 
    return 

csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

Maintenant, il est vraiment à vous comment vous obtiendrez les fichiers à télécharger. Dans mon cas, j'utilise le SSHHook et GoogleCloudStorageHook pour obtenir des fichiers à partir d'un serveur SFTP, puis passer les noms des fichiers à une tâche qui analyse et nettoie les fichiers CSV. Je le fais en tirant les fichiers vers le bas de SFTP et de les mettre dans Google Cloud Storage:

""" 
HOOKS: Connections to external systems 
""" 
def sftp_connection(): 
    """ 
    Returns an SFTP connection created using the SSHHook 
    """ 
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection') 
    ssh_client = ssh_hook.get_conn() 
    return ssh_client.open_sftp() 
def gcs_connection(): 
    """ 
    Returns an GCP connection created using the GoogleCloudStorageHook 
    """ 
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection') 

""" 
PYTHON CALLABLES: Called by PythonOperators 
""" 
def get_files(**context): 
    """ 
    Looks at all files on the FTP server and returns a list files. 
    """ 
    sftp_client = sftp_connection() 
    all_files = sftp_client.listdir('/path/to/files/') 
    files = [] 

    for f in all_files: 
     files.append(f) 

    return files 

def save_files(**context): 
    """ 
    Looks to see if a file already exists in GCS. If not, the file is downloaed 
    from SFTP server and uploaded to GCS. A list of 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='get_files') 

    sftp_client = sftp_connection() 
    gcs = gcs_connection() 
    new_files = [] 
    new_outcomes_files = [] 
    new_si_files = [] 

    new_files = process_sftp_files(files, gcs, sftp_client) 

    return new_files 

def csv_file_to_kafka(**context): 
    """ 
    Untested sample parse csv files and send to kafka 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='save_files') 
    for f in new_files: 
     csvfile = open(f, 'r') 
     reader = csv.DictReader(csvfile) 

     for row in reader: 
      """ 
      Send the row to Kafka 
      """ 
    return 

get_files = PythonOperator(
    task_id='get_files', 
    python_callable=get_files, 
    dag=dag 
) 
save_files = PythonOperator(
    task_id='save_files', 
    python_callable=save_files, 
    dag=dag 
) 
csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

Je sais que je pouvais faire tout cela dans un grand python appelable, voilà comment je suis refactorisation le code maintenant de sorte que dans l'appelable. Il interroge donc le serveur SFTP, récupère les derniers fichiers et les analyse en fonction de mes règles en une seule fonction python. J'ai entendu dire que l'utilisation de XCom n'est pas idéale, les tâches Airflow ne sont pas supposées communiquer trop entre elles, supposément. En fonction de votre cas d'utilisation, vous pourriez même vouloir explorer quelque chose comme Apache Nifi, je suis en train d'examiner cela maintenant aussi.