2017-09-15 2 views
0

J'ai un DataFrame avec N Attributs (Atr1, Atr2, Atr3, ..., AtrN) et une instance individuelle avec le même [1..N-1 ] attributs, sauf le Nième. Je veux vérifier s'il y a une instance dans le DataFrame avec les mêmes valeurs pour les attributs [1..N-1] de l'instance, et si elle existe une occurrence de cette instance, mon but est d'obtenir l'instance dans le DataFrame avec les attributs [1..N].La recherche d'une instance dans une base de données dans Pyspark avec un filtre prend trop de temps

Par exemple, si j'ai:

Instance: 

[Row(Atr1=u'A', Atr2=u'B', Atr3=24)] 

Dataframe: 

+------+------+------+------+ 
| Atr1 | Atr2 | Atr3 | Atr4 | 
+------+------+------+------+ 
| 'C' | 'B' | 21 | 'H' | 
+------+------+------+------+ 
| 'D' | 'B' | 21 | 'J' | 
+------+------+------+------+ 
| 'E' | 'B' | 21 | 'K' | 
+------+------+------+------+ 
| 'A' | 'B' | 24 | 'I' | 
+------+------+------+------+ 

Je veux obtenir la 4ème ligne de la trame de données aussi avec la valeur de ATR4.

Je l'ai essayé avec « filtre() » méthode comme ceci:

df.filter("Atr1 = 'C' and Atr2 = 'B', and Atr3 = 24").take(1) 

Et je reçois le résultat que je voulais, mais il a fallu beaucoup de temps. Donc, ma question est la suivante: y a-t-il un moyen de faire la même chose, mais en moins de temps?

Merci!

+0

Un peu plus d'informations serait utile ici. En particulier: combien de temps cela prend-il? Combien de temps voulez-vous prendre? Quelle est la taille du cluster/matériel sur lequel vous l'utilisez? En général, quelle que soit la taille de votre cluster, il y aura des frais généraux en cas d'opération d'étincelle car une étincelle doit distribuer vos données et votre code au cluster, puis rassembler les résultats. Faire quelque chose de simple comme ça dans pyspark ne sera jamais aussi rapide que de faire quelque chose d'aussi simple en python sur votre machine locale. –

Répondre

0

Vous pouvez utiliser le hachage sensible à la localité (minhashLSH) pour trouver le voisin le plus proche et vérifier s'il est identique ou non. Comme vos données contiennent des chaînes, vous devez les traiter avant d'appliquer LSH. Nous allons utiliser le module fonction de pyspark ml

Commencez avec stringIndexing et onehotencoding

df= spark.createDataFrame([('C','B',21,'H'),('D','B',21,'J'),('E','c',21,'K'),('A','B',24,'J')], ["attr1","attr2","attr3","attr4"]) 


for col_ in ["attr1","attr2","attr4"]: 

    stringIndexer = StringIndexer(inputCol=col_, outputCol=col_+"_") 
    model = stringIndexer.fit(df) 
    df = model.transform(df) 
    encoder = OneHotEncoder(inputCol=col_+"_", outputCol="features_"+col_, dropLast = False) 
    df = encoder.transform(df) 


df = df.drop("attr1","attr2","attr4","attr1_","attr2_","attr4_") 
df.show() 


+-----+--------------+--------------+--------------+ 
|attr3|features_attr1|features_attr2|features_attr4| 
+-----+--------------+--------------+--------------+ 
| 21| (4,[2],[1.0])| (2,[0],[1.0])| (3,[1],[1.0])| 
| 21| (4,[0],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])| 
| 21| (4,[3],[1.0])| (2,[1],[1.0])| (3,[2],[1.0])| 
| 24| (4,[1],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])| 
+-----+--------------+--------------+--------------+ 

Ajouter id et monterons caractéristiques vecteurs

from pyspark.sql.functions import monotonically_increasing_id 

df = df.withColumn("id", monotonically_increasing_id()) 
df.show() 


assembler = VectorAssembler(inputCols = ["features_attr1", "features_attr2", "features_attr4", "attr3"] 
          , outputCol = "features") 
df_ = assembler.transform(df) 
df_ = df_.select("id", "features") 
df_.show() 


+----------+--------------------+ 
|  id|   features| 
+----------+--------------------+ 
|   0|(10,[2,4,7,9],[1....| 
|   1|(10,[0,4,6,9],[1....| 
|8589934592|(10,[3,5,8,9],[1....| 
|8589934593|(10,[1,4,6,9],[1....| 
+----------+--------------------+ 

Créez votre modèle minHashLSH et recherche pour les plus proches voisins

mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345) 
model = mh.fit(df_) 
model.transform(df_) 
key = df_.select("features").collect()[0]["features"] 
model.approxNearestNeighbors(df_, key, 1).collect() 

sortie

[Row(id=0, features=SparseVector(10, {2: 1.0, 4: 1.0, 7: 1.0, 9: 21.0}), hashes=[DenseVector([-1272095496.0])], distCol=0.0)]