J'ai besoin d'un récepteur jdbc pour ma trame de données en continu à structure étincelante. Pour le moment, autant que je sache, l'API de DataFrame n'a pas d'écriture en jdbc (ni dans pyspark ni dans scala (version actuelle de l'étincelle 2.2.0)). La seule suggestion que j'ai trouvé était d'écrire ma propre classe scala ForeachWriter basée sur this article. J'ai donc modifié un simple exemple de compte de mots à partir de here en ajoutant une classe ForeachWriter personnalisée et j'ai essayé d'écrire en postgress. Le flux de mots est généré manuellement à partir de la console (en utilisant NetCat: nc -lk -p 9999) et lu par spark à partir du socket.Comment écrire JDBC Sink pour Spark Structured Streaming [SparkException: Tâche non sérialisable]?
Malheureusement, j'obtiens une erreur "Tâche non sérialisable".
APACHE_SPARK_VERSION = 2.1.0 En utilisant la version Scala 2.11.8 (Java HotSpot (TM) 64 bits serveur VM, Java 1.8.0_112)
Mon code Scala:
//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.
import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCountToJDBC")
.config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
val driver = "org.postgresql.Driver"
var connection:java.sql.Connection = _
var statement:java.sql.Statement = _
def open(partitionId: Long, version: Long):Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.executeUpdate("INSERT INTO public.test(col1, col2) " +
"VALUES ('" + value(0) + "'," + value(1) + ");")
}
def close(errorOrNull:Throwable):Unit = {
connection.close
}
}
val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>"
val user="<user name>"
val pwd="<pass>"
val writer = new JDBCSink(url, user, pwd)
import org.apache.spark.sql.streaming.ProcessingTime
val query=wordCounts
.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(ProcessingTime("25 seconds"))
.start()
query.awaitTermination()
Message d'erreur :
ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED])
- field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery)
- object (class $line21.$read$$iw$$iw, [email protected])
- field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw)
- object (class $line21.$read$$iw, [email protected])
- field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw)
- object (class $line21.$read, [email protected])
- field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read)
- object (class $line25.$read$$iw, [email protected])
- field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw)
- object (class $line25.$read$$iw$$iw, [email protected])
- field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw)
- object (class $line25.$read$$iw$$iw$JDBCSink, [email protected])
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink, [email protected])
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 25 more
Comment le faire fonctionner?
SOLUTION
(Merci à tous, thaks spéciaux à @zsxwing pour une solution simple):
- classe Enregistrer JDBCSink dans un fichier.
- Dans la charge d'étincelle charger une classe f.eg. using
scala> :load <path_to_a_JDBCSink.scala_file>
- Enfin le code
scala> :paste
sans définition de classe JDBCSink.
avez-vous essayé de faire les propriétés 'connection' et' statement' @transient? –
Merci @Vitaliy Kotlyarenko. Je viens d'essayer avec '@transient var connection' et' @transient var statement' mais j'ai malheureusement reçu la même erreur. – Lukiz
Comment exécutez-vous le code? Je crois que vous l'avez collé à «spark-shell», n'est-ce pas? Si tel est le cas, cela ne fonctionnera pas (comme vous l'avez expérimenté) car il se ferme sur certains objets non-sérialisables. –