Pour la lecture des données de Kafka et de l'écriture à HDFS, en format parquet, en utilisant traitement par lots Spark au lieu de streaming, vous pouvez utiliser Spark Structured Streaming.
Le streaming structuré est un moteur de traitement de flux évolutif et tolérant aux pannes, basé sur le moteur Spark SQL. Vous pouvez exprimer votre calcul en continu de la même manière que vous exprimeriez un calcul par lots sur des données statiques. Le moteur Spark SQL se chargera de l'exécuter de manière incrémentielle et continue et de mettre à jour le résultat final au fur et à mesure de l'arrivée des données en continu. Vous pouvez utiliser l'API Dataset/DataFrame dans Scala, Java, Python ou R pour exprimer des agrégations en continu, des fenêtres d'événements, des jointures de flux à lot, etc. Le calcul est exécuté sur le même moteur SQL Spark optimisé. Enfin, le système garantit une tolérance aux pannes de bout en bout, une fois pour toutes, grâce aux points de contrôle et aux journaux d'écriture immédiate. En bref, Structured Streaming fournit un traitement de flux de bout en bout rapide, évolutif, tolérant aux pannes et de bout en bout, sans que l'utilisateur n'ait à se préoccuper de la diffusion en continu.
Il est fourni avec Kafka en tant que Source intégrée, c'est-à-dire que nous pouvons interroger les données de Kafka. Il est compatible avec les versions de courtier Kafka 0.10.0 ou plus.
Pour extraire les données de Kafka en mode de traitement par lots, vous pouvez créer un dataset/DataFrame pour une plage définie de décalages.
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Chaque ligne de la source a le schéma suivant:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
Maintenant, pour écrire des données HDFS en format parquet, le code suivant peut être écrit:
df.write.parquet("hdfs://data.parquet")
Pour en savoir plus informations sur Spark Structured Streaming + Kafka, s'il vous plaît se référer au guide suivant - Kafka Integration Guide
J'espère que ça aide!
Est-ce cette réponse est utile? – himanshuIIITian
Merci Himanshu, c'était utile. On dirait que cela nécessite Spark 2.2, est-il un autre moyen de le faire dans les versions inférieures de spark comme 2.0. – Henosis