2017-09-07 4 views
1

J'ai créé un accumulateur personnalisé comme ci-dessous. Ceci est sérialisablejava.lang.NullPointerException pour l'accumulateur personnalisé

public class ABCAccumulator extends AccumulatorV2<String, Set> implements Serializable { 
    Set<String> set = new HashSet(); 
@Override 
    public void add(String v) { 
     set.add(v); 
    } 
} 

Tout d'abord, n'est pas là une API Spark pour créer une collection pour une Accumulator (comme Set, carte, etc., je sais que la CollectionAccumulator est pour la liste)?

En second lieu, j'utilise cet accumulateur pour ajouter toutes les valeurs dans un RDD comme ci-dessous:

ABCAccumulator acc = new ABCAccumulator(); 
sparkContext.register(acc); 

rdd.foreach(record -> { 
acc.add(record.getName()); 
}); 

Mais quand je lance mon code, je reçois une exception:

org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:916) 
     at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351) 
     at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45) 
     at com.def.ghi.jkl.mno.ActualClass.lambda$main$ed7564e9$1(ActualClass.java:154) 
     at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
     at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
     at scala.util.Try$.apply(Try.scala:192) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NullPointerException 
     at org.apache.spark.util.AccumulatorV2.copyAndReset(AccumulatorV2.scala:129) 
     at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:167) 
     at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) 
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 

Veuillez aider

+0

pouvez-vous s'il vous plaît ajouter stacktrace complète? –

+0

a ajouté la pile complète –

Répondre

1

Je pense que cela échoue car ABCAccumulator n'a pas implémentation correcte pour toutes les méthodes.

Essayez quelque chose de similaire:

class ABCAccumulator extends AccumulatorV2<String,Set<String>> { 
     Set<String> values = new HashSet<String>(); 
     @Override 
     public boolean isZero() { 
      return values.size()==0; 
     } 

     @Override 
     public AccumulatorV2<String, Set<String>> copy() { 
      return this; 
     } 

     @Override 
     public void reset() { 
      values.clear(); 
     } 

     @Override 
     public void add(String v) { 
      values.add(v); 
     } 

     @Override 
     public void merge(AccumulatorV2<String, Set<String>> other) { 
      for(String str: other.value()){ 
       add(str); 
      } 
     } 

     @Override 
     public Set<String> value() { 
      return values; 
     } 
    }