3

J'ai besoin d'un récepteur jdbc pour ma trame de données en continu à structure étincelante. Pour le moment, autant que je sache, l'API de DataFrame n'a pas d'écriture en jdbc (ni dans pyspark ni dans scala (version actuelle de l'étincelle 2.2.0)). La seule suggestion que j'ai trouvé était d'écrire ma propre classe scala ForeachWriter basée sur this article. J'ai donc modifié un simple exemple de compte de mots à partir de here en ajoutant une classe ForeachWriter personnalisée et j'ai essayé d'écrire en postgress. Le flux de mots est généré manuellement à partir de la console (en utilisant NetCat: nc -lk -p 9999) et lu par spark à partir du socket.Comment écrire JDBC Sink pour Spark Structured Streaming [SparkException: Tâche non sérialisable]?

Malheureusement, j'obtiens une erreur "Tâche non sérialisable".

APACHE_SPARK_VERSION = 2.1.0 En utilisant la version Scala 2.11.8 (Java HotSpot (TM) 64 bits serveur VM, Java 1.8.0_112)

Mon code Scala:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770). 
//Spark session available as 'spark'. 

import java.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

val spark = SparkSession 
    .builder 
    .master("local[*]") 
    .appName("StructuredNetworkWordCountToJDBC") 
    .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar") 
    .getOrCreate() 

import spark.implicits._ 

val lines = spark.readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", 9999) 
    .load() 

val words = lines.as[String].flatMap(_.split(" ")) 

val wordCounts = words.groupBy("value").count() 

class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{ 
    val driver = "org.postgresql.Driver" 
    var connection:java.sql.Connection = _ 
    var statement:java.sql.Statement = _ 

    def open(partitionId: Long, version: Long):Boolean = { 
     Class.forName(driver) 
     connection = java.sql.DriverManager.getConnection(url, user, pwd) 
     statement = connection.createStatement 
     true 
    } 

    def process(value: org.apache.spark.sql.Row): Unit = {   
    statement.executeUpdate("INSERT INTO public.test(col1, col2) " + 
          "VALUES ('" + value(0) + "'," + value(1) + ");") 
    } 

    def close(errorOrNull:Throwable):Unit = { 
     connection.close 
    } 
} 

val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" 
val user="<user name>" 
val pwd="<pass>" 
val writer = new JDBCSink(url, user, pwd) 

import org.apache.spark.sql.streaming.ProcessingTime 

val query=wordCounts 
    .writeStream 
    .foreach(writer) 
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds")) 
    .start() 

query.awaitTermination() 

Message d'erreur :

ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error 
org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) 
     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) 
     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) 
     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) 
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution 
Serialization stack: 
     - object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED]) 
     - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery) 
     - object (class $line21.$read$$iw$$iw, [email protected]) 
     - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw) 
     - object (class $line21.$read$$iw, [email protected]) 
     - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw) 
     - object (class $line21.$read, [email protected]) 
     - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read) 
     - object (class $line25.$read$$iw, [email protected]) 
     - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw) 
     - object (class $line25.$read$$iw$$iw, [email protected]) 
     - field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw) 
     - object (class $line25.$read$$iw$$iw$JDBCSink, [email protected]) 
     - field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter) 
     - object (class org.apache.spark.sql.execution.streaming.ForeachSink, [email protected]) 
     - field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink) 
     - object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
     ... 25 more 

Comment le faire fonctionner?

SOLUTION

(Merci à tous, thaks spéciaux à @zsxwing pour une solution simple):

  1. classe Enregistrer JDBCSink dans un fichier.
  2. Dans la charge d'étincelle charger une classe f.eg. using scala> :load <path_to_a_JDBCSink.scala_file>
  3. Enfin le code scala> :paste sans définition de classe JDBCSink.
+0

avez-vous essayé de faire les propriétés 'connection' et' statement' @transient? –

+0

Merci @Vitaliy Kotlyarenko. Je viens d'essayer avec '@transient var connection' et' @transient var statement' mais j'ai malheureusement reçu la même erreur. – Lukiz

+0

Comment exécutez-vous le code? Je crois que vous l'avez collé à «spark-shell», n'est-ce pas? Si tel est le cas, cela ne fonctionnera pas (comme vous l'avez expérimenté) car il se ferme sur certains objets non-sérialisables. –

Répondre

3

Définissez simplement JDBCSink dans un fichier séparé plutôt que de le définir comme une classe interne qui peut capturer la référence externe.

+0

Beaucoup, merci beaucoup !!! Cela a résolu le problème! Je vais mettre à jour ma question avec votre solution si cela ne vous dérange pas. – Lukiz

0

On dirait que le délinquant est ici l'utilisation de import spark.implicits._ dans la classe JDBCSink:

  • JDBCSink doit être sérialisable
  • En ajoutant cette importation, vous faites référence JDBCSink la non-sérialisable SparkSession qui est ensuite sérialisé avec (techniquement, SparkSession extends Serializable, mais il n'est pas destiné à être désérialisé sur les nœuds de travail)

Les bonnes nouvelles: vous n'utilisez pas cette importation, donc si vous juste supprimez il, cela devrait fonctionner.

0

Dans le cas où quelqu'un rencontre cela dans un classeur interactif, cette solution fonctionne également:

Au lieu de sauver la classe JDBCSink dans un fichier séparé, vous pouvez également déclarer tout comme un paquet séparé (« cellule incluse dans le paquet ») dans le même classeur et importez ce paquet dans la cellule où vous l'utilisez. Bien décrit ici https://docs.databricks.com/user-guide/notebooks/package-cells.html