2017-04-25 3 views
0

J'ai la classe suivante qui charge un fichier CSV sans en-tête à l'aide de l'API de données Spark. Le problème que j'ai est que je ne peux pas obtenir le SparkSession pour accepter un schéma StructType qui devrait définir chaque colonne. Dataframe résultante est colonnes unamed de type StringApache Spark Dataset API - N'accepte pas le schéma StructType

public class CsvReader implements java.io.Serializable { 

public CsvReader(StructType builder) { 
     this.builder = builder; 
    } 
private StructType builder; 

SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local"); 
// create Spark Context 
SparkContext context = new SparkContext(conf); 
// create spark Session 
SparkSession sparkSession = new SparkSession(context); 

Dataset<Row> df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg 

public void printSchema() { 
    System.out.println(builder.length()); 
    df.printSchema(); 
} 

public void printData() { 
    df.show(); 
} 

public void printMeters() { 
    df.select("meter").show(); 
} 

public void printMeterCountByGeocode_result() { 
    df.groupBy("geocode_result").count().show(); 
} 

public Dataset getDataframe() { 
      return df; 
} 

} 

résultant schéma dataframe est:

root 
|-- _c0: string (nullable = true) 
|-- _c1: string (nullable = true) 
|-- _c2: string (nullable = true) 
|-- _c3: string (nullable = true) 
|-- _c4: string (nullable = true) 
|-- _c5: string (nullable = true) 
|-- _c6: string (nullable = true) 
|-- _c7: string (nullable = true) 
|-- _c8: string (nullable = true) 
|-- _c9: string (nullable = true) 
|-- _c10: string (nullable = true) 
|-- _c11: string (nullable = true) 
|-- _c12: string (nullable = true) 
|-- _c13: string (nullable = true) 

Débogueur montre que le StrucType 'Builder' est correctement défini:

0 = {[email protected]} "StructField(geocode_result,DoubleType,false)" 
1 = {[email protected]} "StructField(meter,StringType,false)" 
2 = {[email protected]} "StructField(orig_easting,StringType,false)" 
3 = {[email protected]} "StructField(orig_northing,StringType,false)" 
4 = {[email protected]} "StructField(temetra_easting,StringType,false)" 
5 = {[email protected]} "StructField(temetra_northing,StringType,false)" 
6 = {[email protected]} "StructField(orig_address,StringType,false)" 
7 = {[email protected]} "StructField(orig_postcode,StringType,false)" 
8 = {[email protected]} "StructField(postcode_easting,StringType,false)" 
9 = {[email protected]} "StructField(postcode_northing,StringType,false)" 
10 = {[email protected]} "StructField(distance_calc_method,StringType,false)" 
11 = {[email protected]} "StructField(distance,StringType,false)" 
12 = {[email protected]} "StructField(geocoded_address,StringType,false)" 
13 = {[email protected]} "StructField(geocoded_postcode,StringType,false)" 

ce que je fais faux? Toute aide massivement appréciée!

Répondre

2

Définissez la variable Dataset<Row> df et déplacez le bloc de code pour lire le fichier CSV à l'intérieur de la méthode getDataframe() comme ci-dessous.

private Dataset<Row> df = null; 

public Dataset getDataframe() { 
    df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("src/main/java/resources/test.csv"); //TODO: CMD line arg 
     return df; 
} 

Maintenant vous pouvez l'appeler comme ci-dessous.

CsvReader cr = new CsvReader(schema); 
    Dataset df = cr.getDataframe(); 
    cr.printSchema(); 

Je vous suggère de revoir votre classe. Une option est que vous pouvez passer df, à d'autres méthodes, en tant que paramètre. Si vous utilisez Spark 2.0, vous n'avez pas besoin de SparkConf. Veuillez vous référer à documentation pour créer SparkSession.

+0

Excellent! Merci beaucoup! –

0

Vous devriez mettre votre df dans la fonction constructeur si vous voulez l'initialiser par builder. Ou vous pouvez le mettre dans une fonction membre.