1

Je le tableau suivant stocké dans Hive appelé ExampleData:Spark2 dataframe/processus RDD dans les groupes

+--------+-----+---| 
|Site_ID |Time |Age| 
+--------+-----+---| 
|1  |10:00| 20| 
|1  |11:00| 21| 
|2  |10:00| 24| 
|2  |11:00| 24| 
|2  |12:00| 20| 
|3  |11:00| 24| 
+--------+-----+---+ 

Je dois être en mesure de traiter les données par site. Malheureusement, le partitionnement par site ne fonctionne pas (il y a plus de 100 000 sites, tous avec des quantités de données relativement faibles).

Pour chaque site, je dois sélectionner la colonne du temps et de la colonne Âge séparément, et l'utiliser pour alimenter une fonction (qui idéalement, je veux courir sur les exécuteurs, pas le pilote)

J'ai J'ai une idée de comment je pense que je veux que cela fonctionne, mais cette solution ne fonctionnerait que sur le pilote, donc c'est très lent. Je dois trouver une façon de l'écrire il courrai un niveau d'exécuteur testamentaire:

// fetch a list of distinct sites and return them to the driver 
//(if you don't, you won't be able to loop around them as they're not on the executors) 
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10") 
.collect 

val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData") 

distinctSites.foreach(row => { 
    allSiteData.filter("site_id = " + row.get(0)) 
    val times = allSiteData.select("time").collect() 
    val ages = allSiteData.select("ages").collect() 
    processTimesAndAges(times, ages) 
}) 

def processTimesAndAges(times: Array[Row], ages: Array[Row]) { 
    // do some processing 
} 

J'ai essayé la diffusion des distinctSites sur tous les nœuds, mais cela n'a pas été fructueuse.

Cela semble un concept si simple et pourtant j'ai passé quelques jours à étudier cela. Je suis très nouveau à Scala/Spark, alors excuses si c'est une question ridicule!

Toutes les suggestions ou conseils sont grandement appréciés.

Répondre

1

L'API RDD fournit un certain nombre de fonctions qui peuvent être utilisées pour effectuer des opérations dans des groupes commençant par repartitionAndSortWithinPartitions et se terminant par un certain nombre de méthodes * byKey (combineByKey, groupByKey, reduceByKey, etc.).

Exemple:

rdd.map(tup => ((tup._1, tup._2, tup._3), tup)). 
    groupByKey(). 
    forEachPartition(iter => doSomeJob(iter)) 

En dataframe vous pouvez utiliser des fonctions d'agrégation, classe GroupedData fournit un certain nombre de méthodes pour les fonctions les plus courantes, y compris le nombre, max, min, valeur moyenne et la somme

Exemple:

val df = sc.parallelize(Seq(
     (1, 10.3, 10), (1, 11.5, 10), 
     (2, 12.6, 20), (3, 2.6, 30)) 
    ).toDF("Site_ID ", "Time ", "Age") 

df.show() 

+--------+-----+---+ 
|Site_ID |Time |Age| 
+--------+-----+---+ 
|  1| 10.3| 10| 
|  1| 11.5| 10| 
|  2| 12.6| 20| 
|  3| 2.6| 30| 
+--------+-----+---+ 


    df.groupBy($"Site_ID ").count.show 

+--------+-----+ 
|Site_ID |count| 
+--------+-----+ 
|  1| 2| 
|  3| 1| 
|  2| 1| 
+--------+-----+ 

note: Comme vous l'avez mentionné que la solution est très lent, vous devez utiliser la partition, dans la partition de gamme de cas est une bonne option.

+0

Merci! C'était le groupByKey() qui m'a amené à l'endroit dont j'avais besoin. Très apprécié, et merci pour votre réponse rapide aussi. –