2016-11-11 4 views
3

J'essaye d'écrire une paire rdd à Elastic Search sur Elastic Cloud sur la version 2.4.0. J'utilise le plugin elasticsearch-spark_2.10-2.4.0 pour écrire sur ES. Voici le code que je utilise pour écrire ES:l'écriture rdd de spark à Elastic Search échoue

def predict_imgs(r): 
    import json 
    out_d = {} 
    out_d["pid"] = r["pid"] 
    out_d["other_stuff"] = r["other_stuff"] 

    return (r["pid"], json.dumps(out_d)) 

res2 = res1.map(predict_imgs) 

es_write_conf = { 
"es.nodes" : image_es, 
#"es.port" : "9243", 
"es.resource" : "index/type", 
"es.nodes.wan.only":"True", 
"es.write.operation":"upsert", 
"es.mapping.id":"product_id", 
"es.nodes.discovery" : "false", 
"es.net.http.auth.user": "username", 
"es.net.http.auth.pass": "pass", 
"es.input.json": "true", 
"es.http.timeout":"1m", 
"es.scroll.size":"10", 
"es.batch.size.bytes":"1mb", 
"es.http.retries":"1", 
"es.batch.size.entries":"5", 
"es.batch.write.refresh":"False", 
"es.batch.write.retry.count":"1", 
"es.batch.write.retry.wait":"10s"} 

res2.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

L'erreur que je reçois est la suivante:

Py4JJavaError: An error occurred while calling  z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 

La partie intéressante est que cela fonctionne quand je fais une prise sur les quelques premiers éléments sur RDD2 puis faire une nouvelle RDD hors de lui et d'écrire à ES, il fonctionne parfaitement:

x = sc.parallelize([res2.take(1)]) 
x.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

J'utilise Elastic cloud (Offering nuage élastique Recherche) et Databricks (nuage de Finger d'Apache Spark) Se pourrait-il que ES ne soit pas capable de suivre l'écriture de Spark en ES? J'ai augmenté notre taille Elastic Cloud de 2 Go de RAM à 8 Go de RAM.

Y a-t-il des config recommandés pour le es_write_conf que j'ai utilisé ci-dessus? Tout autre confs que vous pouvez penser? La mise à jour vers ES 5.0 aide-t-elle?

Toute aide est appréciée. J'ai lutté avec ça pendant quelques jours maintenant. Je vous remercie.

Répondre

2

Il ressemble à un problème avec les calculs de pyspark, pas nécessairement processus d'épargne elasticsearch. Assurez-vous vos RDD sont OK par:

  1. Performing count() sur RDD1 (à "matérialiser" résultats)
  2. scène count() sur RDD2

Si le nombre sont OK, essayez avec les résultats de la mise en cache avant d'enregistrer en ES :

res2.cache() 
res2.count() # to fill the cache 
res2.saveAsNewAPIHadoopFile(... 

Il le problème persiste, essayez de regarder exécuteurs morts stderr et stdout (vous pouvez les trouver sur l'onglet exécuteurs dans S ParkUI).

J'ai également remarqué la très petite taille de lot en es_write_conf, essayez de l'augmenter à 500 ou 1000 pour obtenir de meilleures performances.