2017-08-08 1 views
0

J'ai une instance Filebeat qui envoie les journaux d'accès Apache à Logstash. pipeline Logstash transforme le fichier et charge les champs traités dit (field1, field2 & field3) à elastic search à un indice indexA. Le flux est simple travail &. Voici mon pipeline.confLogstash génère différents champs pour différents index de recherche élastique

input{ 
    beats{ 
     port => "5043" 
    } 
} 
filter 
{ 

    grok 
    { 
     patterns_dir => ["/usr/share/logstash/patterns"] 
     match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}(\"%{DATA:[referrer]}\")?(\"%{DATA:[user_agent]}\")?", 
        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
       } 
     remove_field => "@version" 
     remove_field => "beat" 
     remove_field => "input_type" 
     remove_field => "source" 
     remove_field => "type" 
     remove_field => "tags" 
     remove_field => "http_version" 
     remove_field => "@timestamp" 
     remove_field => "message" 
    } 
    mutate 
    { 
     add_field => { "field1" => "%{access_time}" } 
     add_field => { "field2" => "%{host}" } 
     add_field => { "field3" => "%{read_timestamp}" } 
    } 
} 
output { 
    elasticsearch{ 
     hosts => ["localhost:9200"] 
     index => "indexA" 
    } 
} 

Maintenant ce que je veux faire est d'ajouter trois autres champs field4 et field5 et les ajouter à un index séparé nommé indexB. Ainsi, à la fin indexA détient field1 field2 et field3 tout indexB détient field4 et field5

Jusqu'à présent, c'est le pipeline.conf modifié, qui ne semble pas fonctionner.

input{ 
    beats{ 
     port => "5043" 
    } 
} 
filter 
{ 

    grok 
    { 
     patterns_dir => ["/usr/share/logstash/patterns"] 
     match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}(\"%{DATA:[referrer]}\")?(\"%{DATA:[user_agent]}\")?", 
        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
       } 
     remove_field => "@version" 
     remove_field => "beat" 
     remove_field => "input_type" 
     remove_field => "type" 
     remove_field => "http_version" 
     remove_field => "@timestamp" 
     remove_field => "message" 
    } 
    mutate 
    { 
     add_field => { "field1" => "%{access_time}" } 
     add_field => { "field2" => "%{host}" } 
     add_field => { "field3" => "%{read_timestamp}" } 
    } 
} 
output { 
    elasticsearch{ 
     hosts => ["localhost:9200"] 
     index => "indexA" 
    } 
} 
filter 
{ 
    mutate 
    { 
     add_field => { "field4" => "%{source}" } 
     add_field => { "field5" => "%{tags}" } 
     remove_field => "field1" 
     remove_field => "field2" 
     remove_field => "field3" 
    } 
} 
output { 
    elasticsearch{ 
     hosts => ["localhost:9200"] 
     index => "indexB" 
    } 
} 

Quelqu'un peut-il s'il vous plaît indiquer où je me trompe ou une alternative à la solution.

Répondre

1

Vous devez dupliquer vos événements à l'aide du clone filter. Ensuite, vous pouvez ajouter les champs désirés à chaque événement respectif et les enfoncer dans deux indices ES différents:

input{ 
    beats{ 
     port => "5043" 
    } 
} 
filter 
{ 

    grok 
    { 
     patterns_dir => ["/usr/share/logstash/patterns"] 
     match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}(\"%{DATA:[referrer]}\")?(\"%{DATA:[user_agent]}\")?", 
        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
       } 
     remove_field => "@version" 
     remove_field => "beat" 
     remove_field => "input_type" 
     remove_field => "type" 
     remove_field => "http_version" 
     remove_field => "@timestamp" 
     remove_field => "message" 
    } 
    clone { 
     clones => ["log1", "log2"] 
    } 
    if [type] == "log1" { 
     mutate 
     { 
      add_field => { "field1" => "%{access_time}" } 
      add_field => { "field2" => "%{host}" } 
      add_field => { "field3" => "%{read_timestamp}" } 
     } 
    } else { 
     mutate 
     { 
      add_field => { "field4" => "%{source}" } 
      add_field => { "field5" => "%{tags}" } 
     } 
    } 
} 
output { 
    if [type] == "log1" { 
     elasticsearch{ 
      hosts => ["localhost:9200"] 
      index => "indexA" 
     } 
    } else { 
     elasticsearch{ 
      hosts => ["localhost:9200"] 
      index => "indexB" 
     } 
    } 
} 
+0

Ceci aide mon cas parfaitement. Merci d'avoir modifié le code. –

+0

il a aidé impressionnant, heureux! – Val