2017-05-15 2 views
0

J'ai besoin d'aide avec Logstash. J'ai actuellement la configuration de Logstash ci-dessous qui fonctionne. Lorsque la balise [message] a "Échec de la validation du jeton", il envoie un message indiquant le problème d'authentification.Elkstack Logstash - Comment envoyer une alerte de seuil par email

input { 

    tcp { 
    codec => "json" 
    port => 5144 
    tags => ["windows","nxlog"] 
    type => "nxlog-json" 
    } 

} # end input 

filter { 

    if [type] == "nxlog-json" { 
    date { 
     match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"] 
     timezone => "Europe/London" 
    } 
    mutate { 
     rename => [ "AccountName", "user" ] 
     rename => [ "AccountType", "[eventlog][account_type]" ] 
     rename => [ "ActivityId", "[eventlog][activity_id]" ] 
     rename => [ "Address", "ip6" ] 
     rename => [ "ApplicationPath", "[eventlog][application_path]" ] 
     rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ] 
     rename => [ "Category", "[eventlog][category]" ] 
     rename => [ "Channel", "[eventlog][channel]" ] 
     rename => [ "Domain", "domain" ] 
     rename => [ "EventID", "[eventlog][event_id]" ] 
     rename => [ "EventType", "[eventlog][event_type]" ] 
     rename => [ "File", "[eventlog][file_path]" ] 
     rename => [ "Guid", "[eventlog][guid]" ] 
     rename => [ "Hostname", "hostname" ] 
     rename => [ "Interface", "[eventlog][interface]" ] 
     rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ] 
     rename => [ "InterfaceName", "[eventlog][interface_name]" ] 
     rename => [ "IpAddress", "ip" ] 
     rename => [ "IpPort", "port" ] 
     rename => [ "Key", "[eventlog][key]" ] 
     rename => [ "LogonGuid", "[eventlog][logon_guid]" ] 
     rename => [ "Message", "message" ] 
     rename => [ "ModifyingUser", "[eventlog][modifying_user]" ] 
     rename => [ "NewProfile", "[eventlog][new_profile]" ] 
     rename => [ "OldProfile", "[eventlog][old_profile]" ] 
     rename => [ "Port", "port" ] 
     rename => [ "PrivilegeList", "[eventlog][privilege_list]" ] 
     rename => [ "ProcessID", "pid" ] 
     rename => [ "ProcessName", "[eventlog][process_name]" ] 
     rename => [ "ProviderGuid", "[eventlog][provider_guid]" ] 
     rename => [ "ReasonCode", "[eventlog][reason_code]" ] 
     rename => [ "RecordNumber", "[eventlog][record_number]" ] 
     rename => [ "ScenarioId", "[eventlog][scenario_id]" ] 
     rename => [ "Severity", "level" ] 
     rename => [ "SeverityValue", "[eventlog][severity_code]" ] 
     rename => [ "SourceModuleName", "nxlog_input" ] 
     rename => [ "SourceName", "[eventlog][program]" ] 
     rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ] 
     rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ] 
     rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ] 
     rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ] 
     rename => [ "System", "[eventlog][system]" ] 
     rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ] 
     rename => [ "TargetLogonId", "[eventlog][target_logonid]" ] 
     rename => [ "TargetUserName", "[eventlog][target_user_name]" ] 
     rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ] 
     rename => [ "ThreadID", "thread" ] 

    } 
    mutate { 
     remove_field => [ 
        "CurrentOrNextState", 
        "Description", 
        "EventReceivedTime", 
        "EventTime", 
        "EventTimeWritten", 
        "IPVersion", 
        "KeyLength", 
        "Keywords", 
        "LmPackageName", 
        "LogonProcessName", 
        "LogonType", 
        "Name", 
        "Opcode", 
        "OpcodeValue", 
        "PolicyProcessingMode", 
        "Protocol", 
        "ProtocolType", 
        "SourceModuleType", 
        "State", 
        "Task", 
        "TransmittedServices", 
        "Type", 
        "UserID", 
        "Version" 
        ] 
    } 
    } 

} 

output { 
    elasticsearch { 
    hosts => ["localhost:9200"] 
    } 

if "Token validation failed" in [message] { 

email { 

address => "smtp01.domain.com" 
to => "[email protected]" 
from => "[email protected]" 
subject => "Auth Issue" 
body => "Auth Issue" 
port => 25 
use_tls => false 
via => "smtp" 

} 
} 

} # end output 

Je voudrais savoir comment obtenir le courrier électronique pour envoyer uniquement si la balise message « jeton de validation a échoué » 10 fois en une minute. S'il a 9 entrées ou moins, il n'enverra aucun email. Quelle configuration dois-je configurer pour que cela fonctionne?

Répondre

0

Il y a plusieurs façons d'y parvenir.

A. Vous pouvez utiliser XPack Alerting (anciennement appelé Watcher) ou ElastAlert comme décrit dans this answer

B. Vous pouvez utiliser le aggregate Logstash filter afin de garder une trace et compter les messages « jeton de validation a échoué » comme décrit dans this answer. Il vous suffit de

aggregate { 
    task_id => "%{[eventlog][target_logonid]}" 
    code => "map['failed_count'] ||= 0; map['failed_count'] += 1;" 
    push_map_as_event_on_timeout => true 
    timeout => 60 # 1 minute timeout 
    timeout_tags => ['_aggregatetimeout'] 
    timeout_code => "event.set('token_failed', event.get('failed_count') >= 10)" 
    } 

Ensuite, vous pouvez envoyer votre e-mail uniquement if [token_failed]

C. Vous pouvez utiliser le ruby Logstash filter pour compter et mettre en cache le nombre de fois que la « jeton de validation a échoué » message a eu lieu. C'est fondamentalement la même chose que B mais en implémentant vous-même la logique dans le code Ruby.

D. Vous pouvez utiliser metrics Logstash filter afin de calculer le taux d'événements ayant "Échec de la validation du jeton" dans le champ du message.

metrics { 
    meter => [ "message" ] 
    rates => [ 1 ] 
    add_tag => "metric" 
    } 

Ensuite, dans votre sortie, vous pouvez simplement utiliser les informations mesurée comme ceci:

if "metric" in [tags] and [Token validation failed][count] >= 10 { 
    email { 
     ... 
    } 
    } 

Notez que des solutions B et C vous ne pouvez pas lancer Logstash avec more than one worker (à savoir -w 1). J'ai classé un enhancement request pour "résoudre" ce problème, mais puisque l'équipe de Logstash a déjà un énorme pipeline de TODO, nous verrons ce qui se passe.