Je le tableau suivant stocké dans Hive appelé ExampleData:Spark2 dataframe/processus RDD dans les groupes
+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1 |10:00| 20|
|1 |11:00| 21|
|2 |10:00| 24|
|2 |11:00| 24|
|2 |12:00| 20|
|3 |11:00| 24|
+--------+-----+---+
Je dois être en mesure de traiter les données par site. Malheureusement, le partitionnement par site ne fonctionne pas (il y a plus de 100 000 sites, tous avec des quantités de données relativement faibles).
Pour chaque site, je dois sélectionner la colonne du temps et de la colonne Âge séparément, et l'utiliser pour alimenter une fonction (qui idéalement, je veux courir sur les exécuteurs, pas le pilote)
J'ai J'ai une idée de comment je pense que je veux que cela fonctionne, mais cette solution ne fonctionnerait que sur le pilote, donc c'est très lent. Je dois trouver une façon de l'écrire il courrai un niveau d'exécuteur testamentaire:
// fetch a list of distinct sites and return them to the driver
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect
val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")
distinctSites.foreach(row => {
allSiteData.filter("site_id = " + row.get(0))
val times = allSiteData.select("time").collect()
val ages = allSiteData.select("ages").collect()
processTimesAndAges(times, ages)
})
def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
// do some processing
}
J'ai essayé la diffusion des distinctSites sur tous les nœuds, mais cela n'a pas été fructueuse.
Cela semble un concept si simple et pourtant j'ai passé quelques jours à étudier cela. Je suis très nouveau à Scala/Spark, alors excuses si c'est une question ridicule!
Toutes les suggestions ou conseils sont grandement appréciés.
Merci! C'était le groupByKey() qui m'a amené à l'endroit dont j'avais besoin. Très apprécié, et merci pour votre réponse rapide aussi. –