J'utilise le code suivant pour stocker la sortie de Spark-Streaming
à ElasticSearch
. Je veux mapper la sortie de spark-streaming au nom correct i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
. Mais comme vous pouvez le voir actuellement, il est mappé dans ES comme _1 ou _2 etc. En outre, je veux mettre un filtre par exemple (if PlatFormName = "ubuntu" then index the data)
avant d'indexer les données dans ES. Alors, comment je fais ça?Mappage des noms de champ de la sortie de Spark-Streaming à Elastic Search
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
Sortie en ElasticSearch:
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
En vous remerciant. J'ai mis en place votre deuxième suggestion. Pourriez-vous, s'il vous plaît, donner plus de détails sur ce que vous voulez dire dans votre première suggestion avec l'exemple que je n'ai pas compris. De plus, ce bug ne semble pas se produire lorsque vous soumettez votre travail à l'étincelle, non? – Naresh
@Naresh, le dans la première option, je faisais référence aux méthodes égales & hashCode de remplacement dans votre classe existante (si nécessaire) comme (ce fil suggère) [http://stackoverflow.com/questions/7681183/how-can- i-define-a-custom-égalité-operation-that-will-be-used-by-immutable-set]. Et oui, le bug est seulement dans spark-shell pas quand vous courez sur le cluster. –
Pourriez-vous s'il vous plaît m'aider avec ceci. Je suis coincé ici 'http: // stackoverflow.com/questions/39363586/question-en-stockage-données-de-spark-streaming-to-cassanadra' – Naresh