Je tente d'exécuter un cluster EMR avec une simple exécution de l'étape Spark et une erreur que je ne parviens pas à résoudre s'affiche. Le programme fonctionne quand je l'exécute localement dans Eclipse, mais pas quand je l'exécute sur un cluster EMR. Le programme essaie simplement de convertir un fichier CSV sur S3 en format Parquet.Erreur avec spark-csv sur le cluster Amazon EMR
Quand je lance dans DME, je reçois l'erreur suivante:
Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:
Je n'ai des champs sur la limite 1000000. J'ai essayé de lire à partir des emplacements s3, s3n et s3a.
import org.apache.spark.SparkSession
import org.apache.spark.sql.types._
object TestEMR {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
val schema = StructType(
Array(
StructField("field1", StringType ,nullable = true),
StructField("field2", IntegerType ,nullable = true),
StructField("field3", IntegerType ,nullable = true),
StructField("field4", TimestampType ,nullable = true),
StructField("field5", TimestampType ,nullable = true),
StructField("field6", StringType ,nullable = true),
StructField("field7", StringType ,nullable = true),
StructField("field8", StringType ,nullable = true),
StructField("field9", StringType ,nullable = true),
StructField("field10", StringType ,nullable = true),
StructField("field11", StringType ,nullable = true),
StructField("field12", StringType ,nullable = true),
StructField("field13", StringType ,nullable = true),
StructField("field14", StringType ,nullable = true),
StructField("field15", StringType ,nullable = true),
StructField("field16", StringType ,nullable = true),
StructField("field17", StringType ,nullable = true),
StructField("field18", StringType ,nullable = true),
StructField("field19", StringType ,nullable = true),
StructField("field20", StringType ,nullable = true)
)
)
val df = spark.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load("s3://mybucket/input/myfile.csv")
df.write.mode("append").parquet("s3://mybucket/output/myfile")
spark.stop
}
}
Le fichier est correct. L'opération de chargement ne divise pas le fichier sur newline. J'ai été capable de convertir le code en sc.texfile (myfile) et il a bien lu le fichier. –
intéressant. FWIW Spark 2 dispose d'un analyseur CSV intégré, ce qui signifie que vous pouvez déposer des fichiers JIRA contre l'équipe d'étincelles. Les tests d'intégration s3a que je cours fonctionnent avec un fichier .csv.gz sur s3 avec ce module –