J'écris un programme Spark autonome qui obtient ses données de Cassandra. J'ai suivi les exemples et créé le RDD via newAPIHadoopRDD() et la classe ColumnFamilyInputFormat. Le RDD est créé, mais je reçois un NotSerializableException quand je l'appelle la méthode .groupByKey de RDD():Apache Spark avec le comportement de Cassandra
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("Test");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Job job = new Job();
Configuration jobConf = job.getConfiguration();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputRpcPort(jobConf, port);
ConfigHelper.setOutputInitialAddress(jobConf, host);
ConfigHelper.setOutputRpcPort(jobConf, port);
ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setFinish(new byte[0]);
sliceRange.setStart(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
spark.newAPIHadoopRDD(jobConf,
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);
JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
System.out.println(groupRdd.count());
}
L'exception:
java.io.NotSerializableException: java.nio.HeapByteBuffer à java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1164) à java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1518) à java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1483) à java .io.ObjectOutputStream.writeOrdinaryO bject (ObjectOutputStream.java:1400) à java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1158) à java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:330) à org.apache.spark.serializer. JavaSerializationStream.writeObject (JavaSerializer.scala: 42) à org.apache.spark.storage.DiskBlockObjectWriter.write (BlockObjectWriter.scala: 179) à org.apache.sparkuler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply (ShuffleMapTask.scala: 161) à org.apache.sparkuler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply (ShuffleMapTask.scala: 158) à scala.collection.Iterator $ class.foreach (Iterator.scala: 727) sur org.apache.spark.InterruptibleIterator.foreach (InterruptibleIterator.scala: 28) sur org.apache.spar k.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 158) à org.apache.sparkuler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 99) à org.apache.spark.scheduler.Task.run (tâche. scala: 51) à org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 187) à java.util.concurrent.ThreadPoolExecutor $ Worker.runTask (ThreadPoolExecutor.java:895) à java. util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:918) à java.lang.Thread.run (Thread.java:662)
Ce que je suis en train de faire est de fusionner toutes les clés de la ligne colonnes en une seule entrée. Je reçois aussi la même exception lorsque je tente d'utiliser la méthode reduceByKey() comme ceci:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
sortedMap.putAll(arg0);
sortedMap.putAll(arg1);
return sortedMap;
}
}
);
J'utilise:
- étincelle-bin-1.0.0 hadoop1
- Cassandra 1.2.12
- Java 1,6
est-ce que quelqu'un sait quel est le problème? Qu'est-ce qui échoue à la sérialisation?
Merci,
Shai
Salut Jacek, Merci pour votre réponse. Ma solution était exactement cela. – user3770713