2017-10-13 2 views
1

J'ai deux dataframes ayant chacun un tableau [String] comme colonne. Pour chaque entrée dans une base de données, j'ai besoin de trouver des sous-ensembles, le cas échéant, dans l'autre base de données. Un exemple est ici:Apache Spark - Recherche de groupes/listes/ensembles de sous-ensembles

DF1:

---------------------------------------------------- 
      id : Long | labels : Array[String] 
--------------------------------------------------- 
     10    | [label1, label2, label3] 
     11    | [label4, label5] 
     12    | [label6, label7] 

DF2:

---------------------------------------------------- 
     item : String | labels : Array[String] 
--------------------------------------------------- 
     item1   | [label1, label2, label3, label4, label5] 
     item2   | [label4, label5] 
     item3   | [label4, label5, label6, label7] 

Après l'opération de sous-ensemble je l'ai décrit, le rendement attendu o/p devrait être

DF3:

---------------------------------------------------- 
     item : String | id : Long 
--------------------------------------------------- 
     item1   | [10, 11] 
     item2   | [11] 
     item3   | [11, 12] 

C'est g garanti que le DF2 aura toujours des sous-ensembles correspondants dans DF1, il n'y aura donc pas d'éléments restants.

Quelqu'un peut-il aider s'il vous plaît avec la bonne approche ici? Cela ressemble à chaque élément dans DF2, j'ai besoin de scanner DF1 et de faire une opération de sous-ensemble (ou de soustraction) sur la 2ème colonne jusqu'à ce que je trouve tous les sous-ensembles et épuiser les étiquettes dans cette ligne. " des champs. Comment faire cela de manière compacte et efficace? Toute aide est grandement appréciée. De manière réaliste, je peux avoir 100s d'éléments dans DF1 et 1000s d'éléments dans DF2.

Répondre

0

Je ne connais aucun moyen d'effectuer ce type d'opération de manière efficace. Cependant, voici une solution possible en utilisant UDF ainsi que la jointure cartésienne.

Le UDF prend deux séquences et vérifie si toutes les chaînes de la première existe dans la seconde:

val matchLabel = udf((array1: Seq[String], array2: Seq[String]) => { 
    array1.forall{x => array2.contains(x)} 
}) 

Pour utiliser jointure cartésienne, il doit être activé comme il est informatiquement cher.

val spark = SparkSession.builder.getOrCreate() 
spark.conf.set("spark.sql.crossJoin.enabled", true) 

Les deux dataframes sont reliés entre eux en utilisant le UDF. Ensuite, l'image résultante est groupée par la colonne item pour collecter une liste de tous les identifiants. En utilisant la même DF1 et DF2 comme dans la question:

val DF3 = DF2.join(DF1, matchLabel(DF1("labels"), DF2("labels"))) 
    .groupBy("item") 
    .agg(collect_list("id").as("id")) 

Le résultat est le suivant:

+-----+--------+ 
| item|  id| 
+-----+--------+ 
|item3|[11, 12]| 
|item2| [11]| 
|item1|[10, 11]| 
+-----+--------+ 
+1

grâce à la solution .. cela a fonctionné comme un charme. Comme prévu, pas optimal, mais fonctionne fonctionnellement pour la validation de l'algorithme qui m'intéresse –