2016-05-19 1 views
1

J'utilise le code suivant pour stocker la sortie de Spark-Streaming à ElasticSearch. Je veux mapper la sortie de spark-streaming au nom correct i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count). Mais comme vous pouvez le voir actuellement, il est mappé dans ES comme _1 ou _2 etc. En outre, je veux mettre un filtre par exemple (if PlatFormName = "ubuntu" then index the data) avant d'indexer les données dans ES. Alors, comment je fais ça?Mappage des noms de champ de la sortie de Spark-Streaming à Elastic Search

val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_) 

      val pageCounts = realTimeAgg.map  
      pageCounts.foreachRDD{ x => 
        if (x.toLocalIterator.nonEmpty) {  
         EsSpark.saveToEs(x, "spark/ElasticSearch") 
        } 
       } 

      ssc.start() 
      ssc.awaitTermination() 

Sortie en ElasticSearch:

{ 
      "_index": "spark", 
      "_type": "ElasticSearch", 
      "_id": "AVTH0JPgzgtrAOUg77qq", 
      "_score": 1, 
      "_source": { 
       "_1": { 
        "_3": "Amiga", 
        "_2": "AmigaOS 1.3", 
        "_6": "SeaMonkey", 
        "_1": "Usedcar", 
        "_4": 0, 
        "_5": 0 
       }, 
       "_2": 1013 
      } 
     } 

Répondre

1

Les clés de documents de recherche élastique sont _1, _2, etc. parce que vous stockez un PairRDD avec (Tuple6, Long) types de données.

Pour conserver les clés, vous devez utiliser la classe de cas comme clé.

val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_) 

Je suppose que la classe de l'objet x est une classe de cas et que vous souhaitez utiliser tous les champs de cette classe pour faire la réduction (par exemple pour le contrôle de l'égalité des 2 instances de classe de cas). Si tous les champs de cette classe ne sont pas la clé naturelle de la classe à utiliser pour l'égalité, alors vous avez deux options -

  1. surchargent equals et hashCode pour votre classe de cas
  2. Créer une autre classe de cas qui a seulement la champs clés (les champs que vous avez utilisés dans votre ligne - (x.key, x.os, x.platform, x.mobile, x.browser)) et mapper à cette classe de cas au lieu d'un Tuple dans les premières lignes. map {x => ...}.

Vous pouvez ajouter un filtre avant d'écrire dans ElasticSearch.

pageCounts.foreachRDD { x => 
         if (x.toLocalIterator.nonEmpty) { 
          val y = x.filter(z => z._1.platform == "ubuntu")  
          EsSpark.saveToEs(y, "spark/ElasticSearch") 
        } 
       } 

PS. Si vous testez la paire RDD avec (classe de cas, Long) classe de cas comme une clé comme je l'ai suggéré lines.map (x => (x, 1)) reduceByKey (_ + _). Il existe un bogue spécifiquement lié à Spark Shell: les classes de cas ne fonctionnent pas comme des classes de clés correctement pour les opérations de réduction - jira issue

+0

En vous remerciant. J'ai mis en place votre deuxième suggestion. Pourriez-vous, s'il vous plaît, donner plus de détails sur ce que vous voulez dire dans votre première suggestion avec l'exemple que je n'ai pas compris. De plus, ce bug ne semble pas se produire lorsque vous soumettez votre travail à l'étincelle, non? – Naresh

+0

@Naresh, le dans la première option, je faisais référence aux méthodes égales & hashCode de remplacement dans votre classe existante (si nécessaire) comme (ce fil suggère) [http://stackoverflow.com/questions/7681183/how-can- i-define-a-custom-égalité-operation-that-will-be-used-by-immutable-set]. Et oui, le bug est seulement dans spark-shell pas quand vous courez sur le cluster. –

+0

Pourriez-vous s'il vous plaît m'aider avec ceci. Je suis coincé ici 'http: // stackoverflow.com/questions/39363586/question-en-stockage-données-de-spark-streaming-to-cassanadra' – Naresh