0

J'ai une configuration de dataframes avec le connecteur spark-cassandra 1.6.2. J'essaie d'effectuer quelques transformations avec cassandra. Datastax Enterprise version est 5.0.5.CassandraSourceRelation non sérialisable lors de la jonction de deux dataframes

DataFrame df1 = sparkContext 
      .read().format("org.apache.spark.sql.cassandra") 
      .options(readOptions).load() 
      .where("field2 ='XX'") 
      .limit(limitVal) 
      .repartition(partitions); 

List<String> distinctKeys = df1.getColumn("field3").collect(); 

values = some transformations to get IN query values; 

String cassandraQuery = String.format("SELECT * FROM " 
      + "table2 " 
      + "WHERE field2 = 'XX' " 
      + "AND field3 IN (%s)", values); 
DataFrame df2 = sparkContext.cassandraSql(cassandraQuery); 

String column1 = "field3"; 
String column2 = "field4"; 
List<String> columns = new ArrayList<>(); 
     columns.add(column1); 
     columns.add(column2); 
scala.collection.Seq<String> usingColumns = 
scala.collection.JavaConverters. 
collectionAsScalaIterableConverter(columns).asScala().toSeq(); 
DataFrame joined = df1.join(df2, usingColumns, "left_outer"); 

List<Row> collected = joined.collectAsList(); // doestn't work 
Long count = joined.count(); // works 

C'est le journal d'exception, ressemble à étincelle crée realation source cassandra, et il ne peut pas être sérialisé.

java.io.NotSerializableException: java.util.ArrayList$Itr 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.sql.cassandra.CassandraSourceRelation, value: 
[email protected]) 
- field (class: org.apache.spark.sql.execution.datasources.LogicalRelation, 
name: relation, type: class org.apache.spark.sql.sources.BaseRelation) 
- object (class org.apache.spark.sql.execution.datasources.LogicalRelation, 
Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Filter, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Filter, Filter 
(field2#0 = XX) 
+- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Repartition, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Repartition, 
Repartition 4, true 
+- Filter (field2#0 = XX) 
+- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Join, name: left, 
type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Join, Join 
LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20))) 
:- Repartition 4, true 
: +- Filter (field2#0 = XX) 
:  +- Relation[fields] 
[email protected] 
+- Project [fields] 
+- Filter ((field2#17 = YY) && field3#18 IN (IN array)) 
    +- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.plans.logical.Project, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
- object (class org.apache.spark.sql.catalyst.plans.logical.Project, Project 
[fields] 
+- Join LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20))) 
:- Repartition 4, true 
: +- Filter (field2#0 = XX) 
:  +- Relation[fields] 
[email protected] 
+- Project [fields] 
    +- Filter ((field2#17 = XX) && field3#18 IN (IN array)) 
    +- Relation[fields] 
[email protected] 
) 
- field (class: org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, name: 
$outer, type: class org.apache.spark.sql.catalyst.trees.TreeNode) 
- object (class org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, 
<function1>) 
- field (class: 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
name: $outer, type: class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4) 
- object (class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
<function1>) 
- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: f$1, 
type: interface scala.Function1) 
- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>) 
- writeObject data (class: scala.collection.immutable.$colon$colon) 
- object (class scala.collection.immutable.$colon$colon, 
List([email protected])) 
- field (class: org.apache.spark.rdd.RDD, name: 
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[32] at 
collectAsList at RevisionPushJob.java:308) 
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, 
type: class org.apache.spark.rdd.RDD) 
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>) 
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: 
$outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) 
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, 
<function1>) 

Est-il possible de le faire sérialisé? Pourquoi l'opération count fonctionne-t-elle mais l'opération de collecte ne fonctionne pas?

Répondre

0

le message d'erreur indiquant java.util.ArrayList$Itr être votre peu unserialzable que je pense peut-être une référence à

List<String> columns = new ArrayList<>(); 
    columns.add(column1); 
    columns.add(column2); 

Ce qui dans sa conversion implicite peut exiger la sérialisation du tableau-liste iterator? C'est la seule ArrayList que je vois donc ça peut être le coupable. Il peut également être dans le code que vous avez supprimé pour "valeurs".

Lorsque vous faites le Count il peut ignorer les informations de la colonne, ce qui vous sauve probablement mais je ne peux pas être sûr. Donc, ma suggestion de TLDR essaye de retirer des choses du code et de remplacer et de reconstruire votre code pour trouver les bits non sérialisables.