Est-il possible de combiner GraphX et DataFrames? Je veux pour chaque noeud dans le graphique un propre DataFrame. Je sais que GraphX et DataFrame étendent RDD et les RDD imbriqués ne sont pas possibles et SparkContext n'est pas Serializable. Mais dans Spark 2.0.0, j'ai vu que SparkSession est sérialisable. J'ai essayé, mais ça ne marche toujours pas. J'ai également essayé de stocker les DataFrames globales dans un tableau. Mais je ne peux pas accéder au tableau dans un nœud de travail. Ignorer les méthodes sendMsg et fusionner:Spark combine DataFrames et GraphX
object Main{
def main(args: Array[String]) : Unit = {
val spark = SparkSession
.builder
.appName("ScalaGraphX_SQL")
.master("spark://home:7077")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex()
//set array size
Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt)
//insert dataframe inside array tables
node_pair.collect().foreach{ case (arr,l) => {
val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true))
val schema = StructType(fields)
val rows = new util.ArrayList[Row]
Tables.tables{l.toInt} = spark.createDataFrame(rows, schema)
//val f =
}
}
//create vertices
val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => {
(l,new TreeNode(l,false))
}
}
//create edges
val edges : RDD[Edge[Boolean]] = node_pair
.filter{ case (arr,l) => arr(0).toLong != -1}
.map{ case (arr,l) => Edge(l,arr(0).toLong,true)
}
var init_node : TreeNode = new TreeNode(-1,false)
val graph = Graph(vertices,edges,init_node)
val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge)
graph_pregel.vertices.collect().foreach(v => println(v._2.index))
}
def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = {
println(Tables.tables{act.index.toInt})
act
}
def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = {
if(et.srcAttr.v){
println(et.srcId + "--->" + et.dstId)
Iterator((et.dstId,et.srcAttr))
}else{
//println(et.srcId + "-/->" + et.dstId)
Iterator.empty
}
}
def merge(n1:TreeNode, n2:TreeNode): TreeNode = {
n1
}
}
object Tables extends Serializable{
var tables : scala.Array[Dataset[Row]] = null
}
class TreeNode(val index:Long, var v: Boolean) extends Serializable {
}
Peut-être qu'il est possible d'accéder au réseau mondial avec RDD? Ou quelqu'un a une autre solution pour ce problème?
Le problème n'est pas et n'a jamais été la sérialisation. Non sérialisable est juste un indice ici qui pointe vers le problème principal que l'architecture Spark ne convient pas pour le traitement imbriqué sans limiter de manière significative le modèle de programmation. Donc, juste parce que vous pouvez sérialiser 'SparkSession' (vous pouvez sérialiser' SQLContext' dans 1.x de la même manière) cela ne veut pas dire que quelque chose a changé. – zero323