1

j'ai une obligation de valider une opération d'acquisition, Bassically, j'ai deux grands fichiers dans HDFS, on est Avro formaté (fichiers ingérés), un autre est en parquet formaté (fichier consolidé).La meilleure façon de gagner en performance lorsque vous faites une jointure comptage en utilisant étincelle et scala

fichier Avro a ce schéma:

nom de fichier, date, compter, afield1, afield2, afield3, afield4, afield5, afield6, ... afieldN

fichier Parquet a ce schéma:

fileName, anotherField1, anotherField1, anotherField2, anotherFiel3, anotherField14, ..., anotherFieldN

Si j'essaie de charger les deux fichiers dans un DataFrame, puis d'utiliser une jointure naïve, le travail sur ma machine locale prend plus de temps. que 24 heures!, ce qui est unacept capable.

ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count() 

Quelle est la meilleure façon d'y parvenir? ¿Déposer des colonnes depuis le DataFrame avant de faire le join-where-count? ¿Calculer les comptes par dataframe puis rejoindre et additionner?

PD

Je lisais sur la technique joint-carte côté, mais il semble que cette technique fonctionnerait pour moi s'il y avait un petit fichier capable de tenir en mémoire vive, mais je ne peux pas assurer que, donc, je aimerait savoir quel est le moyen préféré de la communauté pour y parvenir.

http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

+0

est-ce que vous ne pouvez pas calculer les comptes par trame de données, puis vous joindre et faire une somme? – mtoto

+0

Je suppose que je peux, @mtoto, mais, d'abord, je voudrais savoir quelle est la meilleure façon d'y parvenir. En fait, j'ai l'exécution de cette phrase ingestedDF.join (consolidatedDF) .où ($ "filename" === $ "fileName"). Count() afin de connaître le nombre. Lorsque le travail est terminé, je vais essayer votre suggestion. Comment écrire ce code? – aironman

+0

Vous ne savez pas quelle est la question: Voulez-vous seulement connaître le nombre de noms de fichiers communs dans les deux ensembles de données? ou la différence? – maasg

Répondre

1

J'aborder ce problème en dépouillant les données que le terrain, je suis intéressé par (filename), ce qui rend un ensemble unique du nom de fichier avec la source, il vient de (l'ensemble de données d'origine) . À ce stade, les deux ensembles de données intermédiaires ont le même schéma, nous pouvons donc les unir et compter. Cela devrait être des ordres de grandeur plus rapides que l'utilisation d'un join sur les données complètes.

// prepare some random dataset 
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish")) 
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap")) 

val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data") 
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data") 

// select only the column we are interested in and tag it with the source. 
// Lets make it distinct as we are only interested in the unique file count 
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct 
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct 

// union both dataframes 
val union = df1Filenames.union(df2Filenames).toDF("filename","source") 

// let's count the occurrences of filename, by using a groupby operation 
val occurrenceCount = union.groupBy("filename").count 

// we're interested in the count of those files that appear in both datasets (with a count of 2) 
occurrenceCount.filter($"count"===2).count 
+0

je dois le prouver. Je posterai les chiffres de ma solution naïve (pas encore terminée) confrontés à votre solution. – aironman

+1

@aironman Quereis apostar una cerveza? :-) – maasg

+1

@aironman le voir aussi sous forme de bloc-notes: https://gist.github.com/maasg/824e60cc522deada0986169dae733549 – maasg