2017-07-24 1 views
-1

J'essaie mes mains sur Kafka étincelle structuré en streaming, mais obtenir une exception comme Exception dans le fil "principal" org.apache.spark.sql.AnalysisException : impossible de résoudre 'device' donné colonnes d'entrée: [valeur, décalage, partition, clé, horodatage, timestampType, rubrique];Exception dans le fil "principal" org.apache.spark.sql.AnalysisException:

Fixation mon code

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.streaming.ProcessingTime 
case class DeviceData(device: String, deviceType: String, signal: String) 

object dataset_kafka { 
    def main(args: Array[String]): Unit = { 
    val spark = SparkSession 
      .builder() 
      .appName("kafka-consumer") 
      .master("local[*]") 
      .getOrCreate() 
     import spark.implicits._ 

     spark.sparkContext.setLogLevel("WARN") 


    val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
     .option("subscribe", "test") 
     .option("startingOffsets", "earliest") 
     .load() 
     println(df.isStreaming) 
     println(df.printSchema()) 

    val ds: Dataset[DeviceData] = df.as[DeviceData] 

    val values = df.select("device").where("signal == Strong") 

    values.writeStream 
      .outputMode("append") 
      .format("console") 
      .start() 
      .awaitTermination() 


    } 
} 

Toute aide comment résoudre ce problème?

Répondre

0

flux Kafka produit toujours les champs suivants: value, offset, partition, key, timestamp, timestampType, topic. Dans votre cas, vous êtes intéressé par value, mais sachez que values are always deserialized as byte arrays, tapez cast to string est requis avant de désérialiser JSON.

Essayez le code suivant:

import org.apache.spark.sql.functions.from_json 
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import spark.implicits._ 

val kafkaStream = 
    spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
    .option("subscribe", "test") 
    .option("startingOffsets", "earliest") 
    .load() 

// If you don't want to build the schema manually 
val schema = ExpressionEncoder[DeviceData]().schema 

val ds = kafkaStream.select(from_json($"value".cast("string"), schema)).as[DeviceData] 

val values = ds.filter(_.signal == "Strong").map(_.device)