J'essaie de faire passer un message sur un graphique pour calculer les caractéristiques récursives. Je reçois une erreur lorsque je définis un graphique dont les sommets sont la sortie de aggregateMessages
. Code pour le contexteGraphX VertexRDD NullPointerException
> val newGraph = Graph(newVertices, edges)
newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected]
//This is the RDD that causes the problem
> val result = newGraph.aggregateMessages[List[Double]](
{triplet => triplet.sendToDst(triplet.srcAttr)},
{(a,b) => a.zip(b).map { case (x, y) => x + y }},
{TripletFields.Src})
result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57
> result.take(1)
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0)))
Jusqu'à présent, aucun problème, mais lorsque je tente
> val newGraph2 = Graph(result, edges)
newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected]
> val result2 = newGraph2.aggregateMessages[List[Double]](
{triplet => triplet.sendToDst(triplet.srcAttr)},
{(a,b) => a.zip(b).map { case (x, y) => x + y }},
{TripletFields.Src})
> result2.count
Je reçois le texte suivant (garni) Erreur:
result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException
at $anonfun$2.apply(<console>:62)
at $anonfun$2.apply(<console>:62)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
at $anonfun$1.apply(<console>:61)
at $anonfun$1.apply(<console>:61)
at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
...
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
...
Caused by: java.lang.NullPointerException
at $anonfun$2.apply(<console>:62)
at $anonfun$2.apply(<console>:62)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
at $anonfun$1.apply(<console>:61)
at $anonfun$1.apply(<console>:61)
at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
... 3 more
Je ne pense pas que ce soit un erreur d'incompatibilité de type parce que aggregateMessages
renvoie un VertexRDD
, des idées pourquoi je reçois ce problème?