2016-09-19 2 views
1

Est-il possible de combiner GraphX ​​et DataFrames? Je veux pour chaque noeud dans le graphique un propre DataFrame. Je sais que GraphX ​​et DataFrame étendent RDD et les RDD imbriqués ne sont pas possibles et SparkContext n'est pas Serializable. Mais dans Spark 2.0.0, j'ai vu que SparkSession est sérialisable. J'ai essayé, mais ça ne marche toujours pas. J'ai également essayé de stocker les DataFrames globales dans un tableau. Mais je ne peux pas accéder au tableau dans un nœud de travail. Ignorer les méthodes sendMsg et fusionner:Spark combine DataFrames et GraphX ​​

object Main{ 
    def main(args: Array[String]) : Unit = {  
    val spark = SparkSession 
     .builder 
     .appName("ScalaGraphX_SQL") 
     .master("spark://home:7077") 
     .enableHiveSupport() 
     .getOrCreate() 

    val sc = spark.sparkContext 

    val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex() 

    //set array size 
    Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt) 

    //insert dataframe inside array tables 
    node_pair.collect().foreach{ case (arr,l) => { 
     val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true)) 
     val schema = StructType(fields) 
     val rows = new util.ArrayList[Row] 
     Tables.tables{l.toInt} = spark.createDataFrame(rows, schema) 
     //val f = 
     } 
    } 

    //create vertices 
    val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => { 
     (l,new TreeNode(l,false)) 
    } 
    } 

    //create edges 
    val edges : RDD[Edge[Boolean]] = node_pair 
     .filter{ case (arr,l) => arr(0).toLong != -1} 
     .map{ case (arr,l) => Edge(l,arr(0).toLong,true) 
     } 

    var init_node : TreeNode = new TreeNode(-1,false) 
    val graph = Graph(vertices,edges,init_node) 
    val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge) 

    graph_pregel.vertices.collect().foreach(v => println(v._2.index)) 
    } 

    def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = { 
    println(Tables.tables{act.index.toInt}) 
    act 
    } 

    def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = { 

    if(et.srcAttr.v){ 
     println(et.srcId + "--->" + et.dstId) 
     Iterator((et.dstId,et.srcAttr)) 
    }else{ 
     //println(et.srcId + "-/->" + et.dstId) 
     Iterator.empty 
    } 
    } 

    def merge(n1:TreeNode, n2:TreeNode): TreeNode = { 
    n1 
    } 
} 

object Tables extends Serializable{ 
    var tables : scala.Array[Dataset[Row]] = null 
} 

class TreeNode(val index:Long, var v: Boolean) extends Serializable { 
} 

Peut-être qu'il est possible d'accéder au réseau mondial avec RDD? Ou quelqu'un a une autre solution pour ce problème?

+0

Le problème n'est pas et n'a jamais été la sérialisation. Non sérialisable est juste un indice ici qui pointe vers le problème principal que l'architecture Spark ne convient pas pour le traitement imbriqué sans limiter de manière significative le modèle de programmation. Donc, juste parce que vous pouvez sérialiser 'SparkSession' (vous pouvez sérialiser' SQLContext' dans 1.x de la même manière) cela ne veut pas dire que quelque chose a changé. – zero323

Répondre

1

Veuillez jeter un coup d'œil à GraphFrames - c'est un paquet qui fournit l'API DataFrame pour GraphX. GraphFrames sera considéré pour être inclus dans Spark une fois qu'il fournira des fonctionnalités telles que le partitionnement important dans GraphX ​​et lorsque l'API sera testée de façon plus exhaustive.

Pour le problème décrit dans le commentaire ci-dessous, vous avez un dataframe avec des noeuds, à savoir: Aéroports

val airports = sqlContext.createDataFrame(List(
    ("A1", "Wrocław"), 
    ("A2", "London"), 
    ("A3", "NYC") 
)).toDF("id", "name") 

ID est unique. Vous pouvez créer un autre DataFrame, c'est-à-dire detailsDF, avec une structure telle que: ID | AirPortID | other data. Alors vous avez un-à-plusieurs et pour un aéroport (donc GraphFrame verticle) vous avez beaucoup d'entrées dans detailsDF. Maintenant, vous pouvez interroger: spark.sql("select a.name, d.id as detailID from airports a join detailsDF d on a.id = d.airportID");. Vous pouvez également avoir plusieurs colonnes dans Airports DataFrame si vous souhaitez y stocker des informations supplémentaires

+0

Merci, mais arent GraphFrames, Graphs structuré comme DataFrames? J'ai besoin d'un graphique avec DataFrames à l'intérieur des nœuds. C'est comme une table pour chaque noeud. Ou ai-je mal compris GraphFrames? –

+0

Oui et non :) Dans GraphFrames il y a une table (DataFrame) pour chaque nœud. Cependant, ce nœud peut avoir un identifiant et ensuite il peut y avoir un autre DataFrame, c'est-à-dire NodeDetails qui aura la colonne "baseNodeId". Ensuite, vous pouvez avoir plusieurs lignes pour un seul noeud –

+0

Je suis désolé, mais je ne comprends pas. N'est-ce pas un DataFrame imbriqué? Pouvez-vous me donner un petit exemple? Merci beaucoup! –