2016-04-13 2 views
2

Je dois parcourir le contenu d'un DF avec diverses instructions SELECT dans une boucle foreach, en écrivant la sortie dans des fichiers texte. Toute instruction SELECT dans la boucle foreach renvoie une exception NullPointerException. Je suis incapable de voir pourquoi c'est. Une instruction SELECT à l'intérieur d'une boucle "for" ne renvoie pas cette erreur.Spark scala: SELECT dans une boucle foreach renvoie java.lang.NullPointerException

Ceci est le cas de test.

// step 1 of 6: create the table and load two rows 
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1  varchar(4) 
,username varchar(5) 
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""") 

// step 2 of 6: confirm that the data is queryable 
vc.sql("SELECT * FROM TEST1").show() 
+----+--------+-------+ 
| c1|username|numeric| 
+----+--------+-------+ 
|col1| USER1|  0| 
|col1| USER2|  1| 
+----+--------+-------+ 

// Step 3 of 6: create a dataframe for the table 
var df=vc.sql("""SELECT * FROM TEST1""") 


// step 4 of 6: create a second dataframe that we will use as a loop iterator 
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """) 

// step 5 of 6: first foreach loop works ok: 
df_usernames.foreach(t => 
    { 
     println("(The First foreach works ok: loop iterator t is " + t(0).toString()) 
    } 
) 
(The First foreach works ok: loop iterator t is USER1 
(The First foreach works ok: loop iterator t is USER2 

// step 6 of 6: second foreach with any embedded SQL returns an error 
df_usernames.foreach(t => 
    { 
     println("(The second foreach dies: loop iterator t is " +  t(0).toString()) 
     vc.sql("""SELECT c1 FROM TEST1""").show() 
    } 
)  
The second foreach dies: loop iterator t is USER1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158  in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 

Répondre

1

Cela ne peut pas être fait. Vous ne pouvez pas commencer requête SQL à l'intérieur foreach sans appeler recueillir d'abord

>>> df_usernames.collect.foreach(
... lambda x: sqlContext.sql("""SELECT c1 FROM TEST1""").show()) 
+1

Ce n'est pas la même pensée foreach que celui de l'OP utilisait et ce n'est pas une bonne pratique de recueillir sans connaître le cardinalité et la taille des données. Il ne sera pas à l'échelle si vous avez laissé dire 2M utilisateurs par exemple. – eliasah

+0

Y at-il un moyen d'y parvenir sans utiliser collect? Pour chaque "ligne" dans le RDD, j'ai besoin de comparer avec les données existantes (que je peux charger depuis SparkSession.sql). – KangarooWest