2017-07-03 3 views
0

J'ai une structure de suivi de fichier. Dans la première colonne, c'est nodeID. Après ":" c'est un noeud qui a une connexion avec nodeID. Chaque nodeID peut avoir plusieurs connexions.Comment importer dans GraphFrame la structure de suivi de temps de texte

0: 5305811, 
1: 4798401, 
2: 7922543, 
3: 7195074, 
4: 6399935, 
5: 5697217, 
6: 5357407, 
7: 4798401, 
8: 629131,5330605,6481451,6280292,6909396,7325128, 
... 

Comment appliquer des transformations à importer dans GraphFrame?

Répondre

1
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.LongType 
import org.graphframes.GraphFrame 
import scala.util.Try 


val spark = SparkSession.builder() 
    .master("local[2]") 
    .appName("test") 
    .getOrCreate() 

spark.sparkContext.setCheckpointDir(spark.conf.getOption(s"spark.checkpointdir").getOrElse("/tmp")) 

import spark.implicits._ 

def cleanIds = udf((ids: Seq[String]) => ids.flatMap(x => Try(x.trim.toLong).toOption)) 

val ds = spark 
    .read 
    .option("mode", "PERMISSIVE") 
    .option("header", "false") 
    .option("delimiter", ":") 
    .csv("src/main/resources/connections.txt") 
    .toDF("id", "links") 
    .select(
    'id.cast(LongType), 
    cleanIds(split(trim('links), ",")).as("links")) 
    .cache() 


val vertices = ds.select('id).distinct() 

val edges = ds.select(
    'id.as("src"), 
    explode('links).as("dst") 
) 

val graphFrame = GraphFrame(vertices, edges) 

val connectedComponents = graphFrame.connectedComponents.run() 

connectedComponents 
    .groupBy('component).agg(
    collect_list(struct('id)).as("vertices") 
).show(false) 

entrée Vu comme ceci:

0: 5,6, 
1: 4, 
2: 3,4,5, 
3: 2, 
4: 2,1, 
5: 2,0, 
6: 0,7, 
10: 11,13, 
11: 12,14, 
12: 13,14, 
13: 10,12, 
14: 11,12, 

Cela va créer une trame de données de sommet qui ressemble à ceci:

+---+ 
| id| 
+---+ 
| 0| 
| 6| 
| 5| 
| 1| 
| 10| 
| 3| 
| 12| 
| 11| 
| 2| 
| 4| 
| 13| 
| 14| 
+---+ 

et bords comme celui-ci:

+---+---+ 
|src|dst| 
+---+---+ 
| 0| 5| 
| 0| 6| 
| 1| 4| 
| 2| 3| 
| 2| 4| 
| 2| 5| 
| 3| 2| 
| 4| 2| 
| 4| 1| 
| 5| 2| 
| 5| 0| 
| 6| 0| 
| 6| 7| 
| 10| 11| 
| 10| 13| 
| 11| 12| 
| 11| 14| 
| 12| 13| 
| 12| 14| 
| 13| 10| 
+---+---+ 

et connecté des composants comme celui-ci:

+---------+-----------------------------------+ 
|component|vertices       | 
+---------+-----------------------------------+ 
|0  |[[0], [6], [5], [1], [3], [2], [4]]| 
|10  |[[10], [12], [11], [13], [14]]  | 
+---------+-----------------------------------+