2017-08-22 1 views
1

Je suis nouveau dans le système éco BigData et je commence à le faire.Lisez depuis Kafka et écrivez à hdfs dans le parquet

J'ai lu plusieurs articles sur la lecture d'un sujet de kafka utilisant la diffusion d'étincelles mais j'aimerais savoir s'il est possible de lire à partir de kafka en utilisant un étincelle au lieu de diffuser? Si oui, pourriez-vous m'aider à signaler certains articles ou extraits de code qui me permettent de démarrer?

Ma deuxième partie de la question est d'écrire en format hdfs en parquet. Une fois que je lis de Kafka, je suppose que j'aurai un rdd. Convertissez ce fichier rdd en une base de données, puis écrivez la trame de données sous forme de fichier parquet. Est-ce la bonne approche?

Toute aide appréciée.

Merci

Répondre

1

Pour la lecture des données de Kafka et de l'écriture à HDFS, en format parquet, en utilisant traitement par lots Spark au lieu de streaming, vous pouvez utiliser Spark Structured Streaming.

Le streaming structuré est un moteur de traitement de flux évolutif et tolérant aux pannes, basé sur le moteur Spark SQL. Vous pouvez exprimer votre calcul en continu de la même manière que vous exprimeriez un calcul par lots sur des données statiques. Le moteur Spark SQL se chargera de l'exécuter de manière incrémentielle et continue et de mettre à jour le résultat final au fur et à mesure de l'arrivée des données en continu. Vous pouvez utiliser l'API Dataset/DataFrame dans Scala, Java, Python ou R pour exprimer des agrégations en continu, des fenêtres d'événements, des jointures de flux à lot, etc. Le calcul est exécuté sur le même moteur SQL Spark optimisé. Enfin, le système garantit une tolérance aux pannes de bout en bout, une fois pour toutes, grâce aux points de contrôle et aux journaux d'écriture immédiate. En bref, Structured Streaming fournit un traitement de flux de bout en bout rapide, évolutif, tolérant aux pannes et de bout en bout, sans que l'utilisateur n'ait à se préoccuper de la diffusion en continu.

Il est fourni avec Kafka en tant que Source intégrée, c'est-à-dire que nous pouvons interroger les données de Kafka. Il est compatible avec les versions de courtier Kafka 0.10.0 ou plus.

Pour extraire les données de Kafka en mode de traitement par lots, vous pouvez créer un dataset/DataFrame pour une plage définie de décalages.

// Subscribe to 1 topic defaults to the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to multiple topics, specifying explicit Kafka offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1,topic2") 
    .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") 
    .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to a pattern, at the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribePattern", "topic.*") 
    .option("startingOffsets", "earliest") 
    .option("endingOffsets", "latest") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

Chaque ligne de la source a le schéma suivant:

| Column   | Type   | 
|:-----------------|--------------:| 
| key    |  binary | 
| value   |  binary | 
| topic   |  string | 
| partition  |   int | 
| offset   |   long | 
| timestamp  |   long | 
| timestampType |   int | 

Maintenant, pour écrire des données HDFS en format parquet, le code suivant peut être écrit:

df.write.parquet("hdfs://data.parquet") 

Pour en savoir plus informations sur Spark Structured Streaming + Kafka, s'il vous plaît se référer au guide suivant - Kafka Integration Guide

J'espère que ça aide!

+0

Est-ce cette réponse est utile? – himanshuIIITian

+1

Merci Himanshu, c'était utile. On dirait que cela nécessite Spark 2.2, est-il un autre moyen de le faire dans les versions inférieures de spark comme 2.0. – Henosis