2017-08-23 1 views
0

J'utilise logstash et elasticsearch pour collecter des tweets en utilisant le plugin Twitter. Mon problème est que je reçois un document de la part de Twitter et que je voudrais faire un pré-traitement avant d'indexer mon document. Disons que je présente à la suite de documents de twitter:Comment pré-traiter un document avant l'indexation?

{ 
    "tweet": { 
     "tweetId": 1025, 
     "tweetContent": "Hey this is a fake document for stackoverflow #stackOverflow #elasticsearch", 
     "hashtags": ["stackOverflow", "elasticsearch"], 
     "publishedAt": "2017 23 August", 
     "analytics": { 
      "likeNumber": 400, 
      "shareNumber": 100, 
     } 
    }, 
    "author":{ 
     "authorId": 819744, 
     "authorAt": "the_expert", 
     "authorName": "John Smith", 
     "description": "Haha it's a fake description" 
    } 
} 

maintenant sur ce document que Twitter me envoie, je voudrais générer deux documents: le premier sera indexé sur Twitter/Tweet/1025:

# The id for this document should be the one from tweetId `"tweetId": 1025` 
{ 
    "content": "Hey this is a fake document for stackoverflow #stackOverflow #elasticsearch", # this field has been renamed 
    "hashtags": ["stackOverflow", "elasticsearch"], 
    "date": "2017/08/23", # the date has been formated 
    "shareNumber": 100 # This field has been flattened 
} 

Le second sera indexé sur Twitter/auteur/819744:

# The id for this document should be the one from authorId `"authorId": 819744 ` 
{ 
    "authorAt": "the_expert", 
    "description": "Haha it's a fake description" 
} 

J'ai défini ma sortie comme suit:

output { 
    stdout { codec => dots } 
    elasticsearch { 
    hosts => [ "localhost:9200" ] 
    index => "twitter" 
    document_type => "tweet" 
    } 
} 

Comment puis-je traiter les informations de Twitter?

EDIT:

donc mon dossier complet de configuration devrait ressembler à:

input { 
    twitter { 
     consumer_key => "consumer_key" 
     consumer_secret => "consumer_secret" 
     oauth_token => "access_token" 
     oauth_token_secret => "access_token_secret" 
     keywords => [ "random", "word"] 
     full_tweet => true 
     type => "tweet" 
    } 
} 
filter { 
    clone { 
    clones => ["author"] 
    } 
    if([type] == "tweet") { 
    mutate { 
     remove_field => ["authorId", "authorAt"] 
    } 
    } else { 
    mutate { 
     remove_field => ["tweetId", "tweetContent"] 
    } 
    } 
} 
output { 
    stdout { codec => dots } 
    if [type] == "tweet" { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[tweetId]}" 
    } 
    } else { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "author" 
     document_id => "%{[authorId]}" 
    } 
    } 
} 

Répondre

2

Vous pouvez utiliser le plug-in de filtre clone sur logstash.

Avec un fichier de configuration de logstash exemple qui prend une entrée JSON de stdin et montre simplement la sortie sur la sortie standard:

input { 
    stdin { 
    codec => json 
    type => "tweet" 
    } 
} 
filter { 
    mutate { 
     add_field => { 
     "tweetId" => "%{[tweet][tweetId]}" 
     "content" => "%{[tweet][tweetContent]}" 
     "date" => "%{[tweet][publishedAt]}" 
     "shareNumber" => "%{[tweet][analytics][shareNumber]}" 
     "authorId" => "%{[author][authorId]}" 
     "authorAt" => "%{[author][authorAt]}" 
     "description" => "%{[author][description]}" 
     } 
    } 
    date { 
     match => ["date", "yyyy dd MMMM"] 
     target => "date" 
    } 
    ruby { 
     code => ' 
     event.set("hashtags", event.get("[tweet][hashtags]")) 
    ' 
    } 
    clone { 
     clones => ["author"] 
    } 
    mutate { 
     remove_field => ["author", "tweet", "message"] 
    } 
    if([type] == "tweet") { 
     mutate { 
     remove_field => ["authorId", "authorAt", "description"] 
     } 
    } else { 
     mutate { 
     remove_field => ["tweetId", "content", "hashtags", "date", "shareNumber"] 
     } 
    } 
} 
output { 
    stdout { 
    codec => rubydebug 
    } 
} 

En utilisant comme entrée:

{"tweet": { "tweetId": 1025, "tweetContent": "Hey this is a fake document", "hashtags": ["stackOverflow", "elasticsearch"], "publishedAt": "2017 23 August","analytics": { "likeNumber": 400, "shareNumber": 100 } }, "author":{ "authorId": 819744, "authorAt": "the_expert", "authorName": "John Smith", "description": "fake description" } } 

vous obtiendrez ces deux documents:

{ 
      "date" => 2017-08-23T00:00:00.000Z, 
     "hashtags" => [ 
     [0] "stackOverflow", 
     [1] "elasticsearch" 
    ], 
      "type" => "tweet", 
     "tweetId" => "1025", 
     "content" => "Hey this is a fake document", 
    "shareNumber" => "100", 
    "@timestamp" => 2017-08-23T20:36:53.795Z, 
     "@version" => "1", 
      "host" => "my-host" 
} 
{ 
    "description" => "fake description", 
      "type" => "author", 
     "authorId" => "819744", 
    "@timestamp" => 2017-08-23T20:36:53.795Z, 
     "authorAt" => "the_expert", 
     "@version" => "1", 
      "host" => "my-host" 
} 

Vous pouvez également utiliser un script ruby ​​pour aplatir les champs, un nd puis utiliser renommer sur muter, si nécessaire. Si vous voulez que elasticsearch utilise authorId et tweetId, au lieu de l'ID par défaut, vous pouvez probablement configurer la sortie elasticsearch avec document_id.

output { 
    stdout { codec => dots } 
    if [type] == "tweet" { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[tweetId]}" 
    } 
    } else { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[authorId]}" 
    } 
    } 
} 
+0

Je modifie ma question pour vous montrer le fichier de configuration global. Comment gérer le renommage d'un champ ou l'aplatissement d'un champ? – mel

+0

J'ai édité ma réponse. J'espère que ça marche pour toi. – Imma

+0

Ouais ça ne fonctionne qu'une seule question avant de valider la réponse: add_field semble forcer mon tableau de hashtags. Existe-t-il un moyen de résoudre ce problème et d'avoir '[" stackOverflow "," elasticsearch "]' au lieu de '" stackOverflow, elasticsearch "' J'ai essayé add_tag ​​mais je n'ai pas semblé travailler comme add_field – mel