2017-01-09 1 views
1

J'essaie de faire passer un message sur un graphique pour calculer les caractéristiques récursives. Je reçois une erreur lorsque je définis un graphique dont les sommets sont la sortie de aggregateMessages. Code pour le contexteGraphX ​​VertexRDD NullPointerException

> val newGraph = Graph(newVertices, edges) 

newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

//This is the RDD that causes the problem 
> val result = newGraph.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57 

> result.take(1) 
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0))) 

Jusqu'à présent, aucun problème, mais lorsque je tente

> val newGraph2 = Graph(result, edges) 

newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

> val result2 = newGraph2.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

> result2.count 

Je reçois le texte suivant (garni) Erreur:

result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
... 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
... 
Caused by: java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    ... 3 more 

Je ne pense pas que ce soit un erreur d'incompatibilité de type parce que aggregateMessages renvoie un VertexRDD, des idées pourquoi je reçois ce problème?

Répondre

1

Tous les nœuds du graphique ne sont pas renvoyés par aggregateMessages, mais uniquement ceux qui reçoivent un message. L'exception NullPointerException est provoquée par les arêtes du graphique pointant sur ces noeuds, plus l'absence d'une valeur de noeud par défaut dans la définition de graphe.