2017-01-24 2 views
0

Je tente d'exécuter un cluster EMR avec une simple exécution de l'étape Spark et une erreur que je ne parviens pas à résoudre s'affiche. Le programme fonctionne quand je l'exécute localement dans Eclipse, mais pas quand je l'exécute sur un cluster EMR. Le programme essaie simplement de convertir un fichier CSV sur S3 en format Parquet.Erreur avec spark-csv sur le cluster Amazon EMR

Quand je lance dans DME, je reçois l'erreur suivante:

Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:

Je n'ai des champs sur la limite 1000000. J'ai essayé de lire à partir des emplacements s3, s3n et s3a.

import org.apache.spark.SparkSession 
    import org.apache.spark.sql.types._ 

    object TestEMR { 
     def main(args: Array[String]) { 
     val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate() 
     val schema = StructType(
      Array(
       StructField("field1", StringType ,nullable = true), 
       StructField("field2", IntegerType ,nullable = true), 
       StructField("field3", IntegerType ,nullable = true), 
       StructField("field4", TimestampType ,nullable = true), 
       StructField("field5", TimestampType ,nullable = true), 
       StructField("field6", StringType ,nullable = true), 
       StructField("field7", StringType ,nullable = true), 
       StructField("field8", StringType ,nullable = true), 
       StructField("field9", StringType ,nullable = true), 
       StructField("field10", StringType ,nullable = true), 
       StructField("field11", StringType ,nullable = true), 
       StructField("field12", StringType ,nullable = true), 
       StructField("field13", StringType ,nullable = true), 
       StructField("field14", StringType ,nullable = true), 
       StructField("field15", StringType ,nullable = true), 
       StructField("field16", StringType ,nullable = true), 
       StructField("field17", StringType ,nullable = true), 
       StructField("field18", StringType ,nullable = true), 
       StructField("field19", StringType ,nullable = true), 
       StructField("field20", StringType ,nullable = true) 
      ) 
     ) 

     val df = spark.read 
      .format("com.databricks.spark.csv") 
      .schema(schema) 
      .option("nullValue","") 
      .option("treatEmptyValuesAsNulls","true") 
      .load("s3://mybucket/input/myfile.csv") 
     df.write.mode("append").parquet("s3://mybucket/output/myfile") 
     spark.stop 
     } 
    } 

Répondre

0

sonne comme il est de ne pas trouver la fin de la ligne, donc lit en permanence jusqu'à ce qu'il atteigne cette limite de 10K caractères sur une seule ligne.

Comme on dit: vérifier que la nouvelle ligne de fichier

+0

Le fichier est correct. L'opération de chargement ne divise pas le fichier sur newline. J'ai été capable de convertir le code en sc.texfile (myfile) et il a bien lu le fichier. –

+0

intéressant. FWIW Spark 2 dispose d'un analyseur CSV intégré, ce qui signifie que vous pouvez déposer des fichiers JIRA contre l'équipe d'étincelles. Les tests d'intégration s3a que je cours fonctionnent avec un fichier .csv.gz sur s3 avec ce module –

0

Cette question est toujours ouverte à spark-csv jira. Ils ont fourni une solution de contournement comme l'utilisation de l'analyseur csv ouvert si vous n'avez pas de problème de données ou de lecture en tant que RDD, puis en créant une image de données.

val rdd = sc.textFile("file.csv") 
// Here, filtering or transformation 
//val filteredRDD = rdd.filter.. 
//val transformedRDD = rdd.map.. 

val df = new CsvParser().csvRdd(sqlContext, transformedRDD)