J'ai une configuration de dataframes avec le connecteur spark-cassandra 1.6.2. J'essaie d'effectuer quelques transformations avec cassandra. Datastax Enterprise version est 5.0.5.CassandraSourceRelation non sérialisable lors de la jonction de deux dataframes
DataFrame df1 = sparkContext
.read().format("org.apache.spark.sql.cassandra")
.options(readOptions).load()
.where("field2 ='XX'")
.limit(limitVal)
.repartition(partitions);
List<String> distinctKeys = df1.getColumn("field3").collect();
values = some transformations to get IN query values;
String cassandraQuery = String.format("SELECT * FROM "
+ "table2 "
+ "WHERE field2 = 'XX' "
+ "AND field3 IN (%s)", values);
DataFrame df2 = sparkContext.cassandraSql(cassandraQuery);
String column1 = "field3";
String column2 = "field4";
List<String> columns = new ArrayList<>();
columns.add(column1);
columns.add(column2);
scala.collection.Seq<String> usingColumns =
scala.collection.JavaConverters.
collectionAsScalaIterableConverter(columns).asScala().toSeq();
DataFrame joined = df1.join(df2, usingColumns, "left_outer");
List<Row> collected = joined.collectAsList(); // doestn't work
Long count = joined.count(); // works
C'est le journal d'exception, ressemble à étincelle crée realation source cassandra, et il ne peut pas être sérialisé.
java.io.NotSerializableException: java.util.ArrayList$Itr
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.cassandra.CassandraSourceRelation, value:
[email protected])
- field (class: org.apache.spark.sql.execution.datasources.LogicalRelation,
name: relation, type: class org.apache.spark.sql.sources.BaseRelation)
- object (class org.apache.spark.sql.execution.datasources.LogicalRelation,
Relation[fields]
[email protected]
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Filter, name:
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Filter, Filter
(field2#0 = XX)
+- Relation[fields]
[email protected]
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Repartition, name:
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Repartition,
Repartition 4, true
+- Filter (field2#0 = XX)
+- Relation[fields]
[email protected]
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Join, name: left,
type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Join, Join
LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
: +- Filter (field2#0 = XX)
: +- Relation[fields]
[email protected]
+- Project [fields]
+- Filter ((field2#17 = YY) && field3#18 IN (IN array))
+- Relation[fields]
[email protected]
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Project, name:
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Project, Project
[fields]
+- Join LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
: +- Filter (field2#0 = XX)
: +- Relation[fields]
[email protected]
+- Project [fields]
+- Filter ((field2#17 = XX) && field3#18 IN (IN array))
+- Relation[fields]
[email protected]
)
- field (class: org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, name:
$outer, type: class org.apache.spark.sql.catalyst.trees.TreeNode)
- object (class org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4,
<function1>)
- field (class:
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9,
name: $outer, type: class
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4)
- object (class
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9,
<function1>)
- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: f$1,
type: interface scala.Function1)
- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon,
List([email protected]))
- field (class: org.apache.spark.rdd.RDD, name:
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[32] at
collectAsList at RevisionPushJob.java:308)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer,
type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name:
$outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12,
<function1>)
Est-il possible de le faire sérialisé? Pourquoi l'opération count fonctionne-t-elle mais l'opération de collecte ne fonctionne pas?