J'essaye d'écrire une paire rdd à Elastic Search sur Elastic Cloud sur la version 2.4.0
. J'utilise le plugin elasticsearch-spark_2.10-2.4.0
pour écrire sur ES. Voici le code que je utilise pour écrire ES:l'écriture rdd de spark à Elastic Search échoue
def predict_imgs(r):
import json
out_d = {}
out_d["pid"] = r["pid"]
out_d["other_stuff"] = r["other_stuff"]
return (r["pid"], json.dumps(out_d))
res2 = res1.map(predict_imgs)
es_write_conf = {
"es.nodes" : image_es,
#"es.port" : "9243",
"es.resource" : "index/type",
"es.nodes.wan.only":"True",
"es.write.operation":"upsert",
"es.mapping.id":"product_id",
"es.nodes.discovery" : "false",
"es.net.http.auth.user": "username",
"es.net.http.auth.pass": "pass",
"es.input.json": "true",
"es.http.timeout":"1m",
"es.scroll.size":"10",
"es.batch.size.bytes":"1mb",
"es.http.retries":"1",
"es.batch.size.entries":"5",
"es.batch.write.refresh":"False",
"es.batch.write.retry.count":"1",
"es.batch.write.retry.wait":"10s"}
res2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
L'erreur que je reçois est la suivante:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
La partie intéressante est que cela fonctionne quand je fais une prise sur les quelques premiers éléments sur RDD2 puis faire une nouvelle RDD hors de lui et d'écrire à ES, il fonctionne parfaitement:
x = sc.parallelize([res2.take(1)])
x.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
J'utilise Elastic cloud (Offering nuage élastique Recherche) et Databricks (nuage de Finger d'Apache Spark) Se pourrait-il que ES ne soit pas capable de suivre l'écriture de Spark en ES? J'ai augmenté notre taille Elastic Cloud de 2 Go de RAM à 8 Go de RAM.
Y a-t-il des config recommandés pour le es_write_conf
que j'ai utilisé ci-dessus? Tout autre confs
que vous pouvez penser? La mise à jour vers ES 5.0 aide-t-elle?
Toute aide est appréciée. J'ai lutté avec ça pendant quelques jours maintenant. Je vous remercie.