2017-10-16 15 views
0

Utilisation de spark java J'ai créé une image sur un fichier source de délimiteur de virgule. Dans le fichier source, si la dernière colonne contient une valeur vide, son erreur arrayindexoutofbound de lancement. Ci-dessous est un exemple de données et de code. Je peux gérer cette erreur parce qu'il y a beaucoup de chance d'obtenir des valeurs vides dans la dernière colonne. En dessous de l'exemple de données 4ème ligne provoquant problème.Spark SQL Dataframe-java.lang.ArrayIndexOutOfBoundsException erreur

Données-échantillon

1, viv, chn, 34

2, homme, GNT, 56

3, anu, jeu de mots, 22

** 4, raj, coup , *

JavaRDD<String> dataQualityRDD = spark.sparkContext().textFile(inputFile, 1).toJavaRDD(); 
    String schemaString = schemaColumns; 
    List<StructField> fields = new ArrayList<>(); 
    for (String fieldName : schemaString.split(" ")) { 
     StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); 
     fields.add(field); 
    } 
    StructType schema = DataTypes.createStructType(fields); 

    JavaRDD<Row> rowRDD = dataQualityRDD.map((Function<String, Row>) record -> { 
       // String[] attributes = record.split(attributes[0], attributes[1].trim()); 
       Object[] items = record.split(fileSplit); 

       // return RowFactory.create(attributes[0], attributes[1].trim()); 
         return RowFactory.create(items); 
      }); 




    } 
} 

Répondre

1

je étincelle 2.0 et a pu lire le csv sans exception:

les lignes que vous avez
 SparkSession spark = SparkSession.builder().config("spark.master", "local").getOrCreate(); 
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); 

    JavaRDD<Row> csvRows = spark.read().csv("resources/csvwithnulls.csv").toJavaRDD(); 

    StructType schema = DataTypes.createStructType(
      new StructField[] { new StructField("id", DataTypes.StringType, false, Metadata.empty()), 
        new StructField("fname", DataTypes.StringType, false, Metadata.empty()), 
        new StructField("lname", DataTypes.StringType, false, Metadata.empty()), 
        new StructField("age", DataTypes.StringType, false, Metadata.empty()) }); 

    Dataset<Row> newCsvRows = spark.createDataFrame(csvRows, schema); 
    newCsvRows.show(); 

Utilisé exactement et il a bien fonctionné: voir la sortie:

enter image description here

enter image description here

+0

ref: https://medium.com/@mrpowers/sparks-treatment -of-empty-strings-et-null-values-in-csv-files-80748893451f – skvyas

+0

Basé sur le fichier source, je transmettrai une requête sql qui générera dynamiquement par une autre requête job.sql m'aidera à connaître les informations de colonne avec erreur enregistrements. – vivman

+0

Je suis désolé, si j'ai mal compris votre question. Mais, dans les deux cas, vous pouvez créer un ensemble de données comme ci-dessus et ensuite passer votre requête que vous avez déjà dans l'un des paramètres je pense? D'autres transformations/actions peuvent-elles être effectuées en enregistrant l'ensemble de données en tant que table temporaire? Donc, en un mot, vous devez maintenir le fichier source à la requête + autre carte params ou quelque chose? – skvyas