0

La source de données provient de la démo Databricks Notebook: Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types! Mais quand j'essaye ce code sur mon propre ordinateur portable, j'ai toujours des erreurs.Erreur lors de l'écriture en mode autonome de Spark 2.2.0 Dataframe sur un seul noeud local Kafka

En premier lieu, charger des données JSON comme dataframe

res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string] 

scala> res2.show 
+-------------+---------+ 
|battery_level|c02_level| 
+-------------+---------+ 
|   7|  886| 
|   5|  1378| 
|   8|  917| 
|   8|  1504| 
|   8|  831| 
|   9|  1304| 
|   8|  1574| 
|   9|  1208| 
+-------------+---------+ 

de deuxième, write données à Kafka:

res2.write 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("topic", "test") 
    .save() 

Tous les suit la démo de portable ci-dessus et officiels steps

Mais erreurs montre :

scala> res2.write 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "localhost:9092") 
     .option("topic", "iot-devices") 
     .save() 
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found; 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71) 
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) 
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:165) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472) 
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) 
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) 
    ... 52 elided 

Je suppose qu'il pourrait être le problème Kafka, alors je teste le dataframe read de Kafka pour assurer la connectivité:

scala> val kaDF = spark.read 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "localhost:9092") 
     .option("subscribe", "iot-devices") 
     .load() 
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] 

scala> kaDF.show 
+----+--------------------+-----------+---------+------+--------------------+-------------+ 
| key|    value|  topic|partition|offset|   timestamp|timestampType| 
+----+--------------------+-----------+---------+------+--------------------+-------------+ 
|null| [73 73 73 73 73]|iot-devices|  0|  0|2017-09-27 11:11:...|   0| 
|null|[64 69 63 6B 20 3...|iot-devices|  0|  1|2017-09-27 11:29:...|   0| 
|null|  [78 69 78 69]|iot-devices|  0|  2|2017-09-27 11:29:...|   0| 
|null|[31 20 32 20 33 2...|iot-devices|  0|  3|2017-09-27 11:30:...|   0| 
+----+--------------------+-----------+---------+------+--------------------+-------------+ 

Ainsi, le résultat montre que la lecture des données dans le sujet « IOT-dispositifs » de Kafka bootstrap.servers localhost:9092 fonctionne.

J'ai beaucoup cherché en ligne, mais je n'arrive toujours pas à le résoudre?

Est-ce que n'importe qui avec l'expérience Spark SQL peut me dire ce qui ne va pas dans ma commande?

Merci!

Répondre

1

Le message d'erreur indique clairement la source du problème:

org.apache.spark.sql.AnalysisException: 'valeur' ​​Attribut obligatoire introuvable;

Le Dataset à écrire has to have at least value column (et éventuellement key et topic) et res2 a seulement battery_level, c02_level.

Vous pouvez par exemple:

import org.apache.spark.sql.functions._ 

res2.select(to_json(struct($"battery_level", "c02_level")).alias("value")) 
    .writeStream 
    ... 
+0

Merci! 'alias (" value ")' le fait fonctionner! – karrot