2016-10-11 1 views
0

Je l'indice suivant:Index ElasticSearch Résultats Utilisation Logstash

POST /cars/transactions/_bulk 
{ "index": {}} 
{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" } 
{ "index": {}} 
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" } 
{ "index": {}} 
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" } 
{ "index": {}} 
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" } 
{ "index": {}} 
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" } 
{ "index": {}} 
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" } 
{ "index": {}} 
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" } 
{ "index": {}} 
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" } 

Et j'effectue la recherche suivante:

GET /cars/transactions/_search 
{ 
    "size" : 0, 
    "aggs" : { 
     "popular_colors" : { 
      "terms" : { 
       "field" : "color" 
      } 
     } 
    } 
} 

La réponse que je reçois est la suivante:

{ 
    "took": 2, 
    "timed_out": false, 
    "_shards": { 
    "total": 5, 
    "successful": 5, 
    "failed": 0 
    }, 
    "hits": { 
    "total": 8, 
    "max_score": 0, 
    "hits": [] 
    }, 
    "aggregations": { 
    "popular_colors": { 
     "doc_count_error_upper_bound": 0, 
     "sum_other_doc_count": 0, 
     "buckets": [ 
     { 
      "key": "red", 
      "doc_count": 4 
     }, 
     { 
      "key": "blue", 
      "doc_count": 2 
     }, 
     { 
      "key": "green", 
      "doc_count": 2 
     } 
     ] 
    } 
    } 
} 

Ma question est, comment puis-je ré-indexer ce document dans un index différent?

J'ai essayé:

input { 
    elasticsearch { 
    hosts => "localhost" 
    index => "cars" 
    query => '{ 
    "size" : 0, 
    "aggs" : { 
     "popular_colors" : { 
      "terms" : { 
       "field" : "color" 
      } 
     } 
    } 
}' 
    size => 500 
    scroll => "5m" 
    docinfo => true 
    } 
} 

Mais ça ne marche pas parce que le search_type du plug-in est scan et il ne supporte pas l'agrégation.

J'ai aussi essayé:

input { 
file { 
    path => "C:\ELK-STACK\logstash-2.3.4\bin\out.json" 
    start_position => "beginning" 
    codec => json_lines } 
    } 

Lorsque la teneur en out.json est:

{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":8,"max_score":1.0,"hits":[{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7l","_score":1.0,"_source":{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7m","_score":1.0,"_source":{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7p","_score":1.0,"_source":{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7o","_score":1.0,"_source":{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7n","_score":1.0,"_source":{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7q","_score":1.0,"_source":{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7r","_score":1.0,"_source":{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }},{"_index":"cars","_type":"transactions","_id":"AVexGB7_99OIq3MORm7s","_score":1.0,"_source":{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" }}]}}

Mais il n'a produit aucune sortie après

Réglages: Par défaut travailleurs des pipelines: 8

principal Pipeline a

Je suppose que cela est parce que le fichier JSON est pas préparé pour le plug-in JSON et que je dois faire un peu de préparation (comme l'utilisation de l'API Java), mais je voudrais éviter que, si possible, .

Merci!

Répondre

0

Comme vous l'avez remarqué, le plug-in d'entrée elasticsearch ne supporte pas les agrégations. Il est possible d'utiliser le plugin d'entrée http_poller afin d'envoyer une requête d'agrégation à Elasticsearch à intervalles réguliers (ou seulement une fois par jour). Puis, en utilisant une sortie elasticsearch, vous pouvez à nouveau envoyer les agrégations résultantes à ES.

La configuration va essentiellement comme ceci (notez que la requête à ES d'agrégation doit être envoyé et URL encodée utilisant le source=... parameter).

input { 
    http_poller { 
    urls => { 
     test1 => 'http://localhost:9200/cars/transactions/_search?source=%7B%22size%22%3A0%2C%22aggs%22%3A%7B%22popular_colors%22%3A%7B%22terms%22%3A%7B%22field%22%3A%22color%22%7D%7D%7D%7D' 
    } 
    # checking once per day 
    interval => 86400 
    codec => "json" 
    } 
} 
filter { 
} 
output { 
    elasticsearch { 
    hosts => ["localhost:9200"] 
    index => "my_aggs" 
    } 
}