2014-06-25 3 views
4

J'écris un programme Spark autonome qui obtient ses données de Cassandra. J'ai suivi les exemples et créé le RDD via newAPIHadoopRDD() et la classe ColumnFamilyInputFormat. Le RDD est créé, mais je reçois un NotSerializableException quand je l'appelle la méthode .groupByKey de RDD():Apache Spark avec le comportement de Cassandra

public static void main(String[] args) { 
    SparkConf sparkConf = new SparkConf(); 
    sparkConf.setMaster("local").setAppName("Test"); 
    JavaSparkContext ctx = new JavaSparkContext(sparkConf); 

    Job job = new Job(); 
    Configuration jobConf = job.getConfiguration(); 
    job.setInputFormatClass(ColumnFamilyInputFormat.class); 

    ConfigHelper.setInputInitialAddress(jobConf, host); 
    ConfigHelper.setInputRpcPort(jobConf, port); 
    ConfigHelper.setOutputInitialAddress(jobConf, host); 
    ConfigHelper.setOutputRpcPort(jobConf, port); 
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true); 
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner"); 
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner"); 

    SlicePredicate predicate = new SlicePredicate(); 
    SliceRange sliceRange = new SliceRange(); 
    sliceRange.setFinish(new byte[0]); 
    sliceRange.setStart(new byte[0]); 
    predicate.setSlice_range(sliceRange); 
    ConfigHelper.setInputSlicePredicate(jobConf, predicate); 

    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd = 
    spark.newAPIHadoopRDD(jobConf, 
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), 
    ByteBuffer.class, SortedMap.class); 

    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey(); 
    System.out.println(groupRdd.count()); 
} 

L'exception:

java.io.NotSerializableException: java.nio.HeapByteBuffer à java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1164) à java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1518) à java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1483) à java .io.ObjectOutputStream.writeOrdinaryO bject (ObjectOutputStream.java:1400) à java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1158) à java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:330) à org.apache.spark.serializer. JavaSerializationStream.writeObject (JavaSerializer.scala: 42) à org.apache.spark.storage.DiskBlockObjectWriter.write (BlockObjectWriter.scala: 179) à org.apache.sparkuler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply (ShuffleMapTask.scala: 161) à org.apache.sparkuler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply (ShuffleMapTask.scala: 158) à scala.collection.Iterator $ class.foreach (Iterator.scala: 727) sur org.apache.spark.InterruptibleIterator.foreach (InterruptibleIterator.scala: 28) sur org.apache.spar k.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 158) à org.apache.sparkuler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 99) à org.apache.spark.scheduler.Task.run (tâche. scala: 51) à org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 187) à java.util.concurrent.ThreadPoolExecutor $ Worker.runTask (ThreadPoolExecutor.java:895) à java. util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:918) à java.lang.Thread.run (Thread.java:662)

Ce que je suis en train de faire est de fusionner toutes les clés de la ligne colonnes en une seule entrée. Je reçois aussi la même exception lorsque je tente d'utiliser la méthode reduceByKey() comme ceci:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() { 
     public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0, 
      SortedMap<ByteBuffer, IColumn> arg1) throws Exception { 
      SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator()); 
      sortedMap.putAll(arg0); 
      sortedMap.putAll(arg1); 
      return sortedMap; 
     } 
    } 
); 

J'utilise:

  • étincelle-bin-1.0.0 hadoop1
  • Cassandra 1.2.12
  • Java 1,6

est-ce que quelqu'un sait quel est le problème? Qu'est-ce qui échoue à la sérialisation?

Merci,
Shai

Répondre

4

Votre problème est causé probablement en essayant de serialise ByteBuffers. Ils ne sont pas sérialisables et vous devez les convertir en tableaux d'octets avant de produire RDD.

Vous devriez essayer pilote officiel DataStax Cassandra pour Spark qui est disponible here

+0

Salut Jacek, Merci pour votre réponse. Ma solution était exactement cela. – user3770713

Questions connexes