2017-10-19 5 views
0

Je suis très nouveau pour Scala et étincelle et je ne sais pas comment commencer.Spark Scala Supprimer les lignes dans un RDD basé sur les colonnes d'un autre RDD

J'ai une RDD qui ressemble à ceci:

1,2,3,11 
2,1,4,12 
1,4,5,13 
3,5,6,12 

Un autre qui ressemble à ceci:

2,1 
1,2 

Je veux filtrer la première RDD telle qu'elle supprimera toutes les lignes qui sont correspondant les deux premières colonnes du deuxième RDD. La sortie devrait ressembler à:

1,4,5,13 
3,5,6,12 

Répondre

1
// input rdds 
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12))) 
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1))) 

// manipulate the 2 rdds as a key, val pair 
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields 
// the key of the second rdd is a tuple of first two fields, the val is just null 
// then we could perform joins on their key 
val rdd1_key = rdd1.map(record => ((record._1, record._2), record)) 
val rdd2_key = rdd2.map(record => (record, null)) 

// 1. perform left outer join, the record become (key, (val1, val2)) 
// 2. filter, keep those records which do not have a join 
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step 
// 3. get val1 
rdd1_key.leftOuterJoin(rdd2_key) 
    .filter(record => record._2._2 == None) 
    .map(record => record._2._1) 
    .collect().foreach(println(_)) 

// result 
(1,4,5,13) 
(3,5,6,12) 

Merci

1

Personnellement, je préfère dataframe/dataset comme ils sont optimisés formes de rdd et avec plus fonctions et similaires encastrables aux bases de données traditionnelles.

Voici la façon dataframe:

La première étape serait de convertir à la fois de la rdds à dataframes

import sqlContext.implicits._ 
val df1 = rdd1.toDF("col1", "col2", "col3", "col4") 
val df2 = rdd2.toDF("col1", "col2") 

Deuxième étape serait d'ajouter une nouvelle column en dataframe2 pour condition de filtrage vérification

import org.apache.spark.sql.functions._ 
val tempdf2 = df2.withColumn("check", lit("check")) 

Et la dernière étape serait de join les deux dataframes, filter et drop les inutiles rows et columns.

val finalDF = df1.join(tempdf2, Seq("col1", "col2"), "left") 
          .filter($"check".isNull) 
          .drop($"check") 

Vous devriez avoir finale dataframe comme

+----+----+----+----+ 
|col1|col2|col3|col4| 
+----+----+----+----+ 
|3 |5 |6 |12 | 
|1 |4 |5 |13 | 
+----+----+----+----+ 

Maintenant, vous pouvez convertir en rdd en utilisant finalDF.rdd ou vous pouvez continuer votre traitement ultérieur avec dataframe lui-même. Je souhaite que la réponse est utile