1.为Logstash添加一个配置文件dnsquery.conf,如下
input { kafka { add_field => { "es_index_name" => "dns-query" } bootstrap_servers => "10.0.8.1:9092" topics => "dns-query" consumer_threads => 9 codec => json { charset => "UTF-8" } } } filter { if [es_index_name] == "ex-iis" { grok { match => { "message" => "%{DATA:date_time}\s*%{IPV4:s_ip}\s*%{WORD:cs_method}\s*%{URIPATH:cs_uri_stem}\s*(?<cs_uri_query>([=-_;&\S+]*))\s*%{BASE10NUM:s_port}\s*(?<cs_username>([.\S+]*))\s*%{IPV4:c_ip}\s*(?<cs_user_agent>([=-_;&/\S+]*))\s*(?<cs_referer>([=-_;&/\S+]*))\s*%{BASE10NUM:cs_status}\s*%{BASE10NUM:cs_substatus}\s*%{BASE10NUM:cs_win32_status}\s*%{BASE10NUM:time_taken}" } } mutate { remove_field => ["agent.ephemeral_id", "agent.id", "_score", "_id", "agent.type", "agent.version", "log.offset"] remove_field => ["message"] } } else if [es_index_name] == "dns-query" { grok { match => { "message" => "(?<date_time>\d{4}/\d{1,2}/\d{1,2}\s*[0-9:]{7,8})\s*(?<ThreatID>[0-9A-Za-z]{4})\s*%{NOTSPACE:Context}\s*(?<PacketId>\S+)\s*%{NOTSPACE:UDPTCP}\s*%{NOTSPACE:SendReceive}\s*%{IP:ClientIP}\s*(?<Xid>\S+)\s*(?<QR>\S+)\s*(?<Opcode>\S+)\s*(?<Flags>(\[.*?\]))\s*(?<ResponseCode>\S+)\s*(?<QueryContent>\S+)" } } mutate { gsub => ["QueryContent","\(.*?\)",".","QueryContent","^.","","QueryContent",".$",""] #多次替换,先替换所有小括号为点,然后替换开头的点为空,再替换结尾的点为空 remove_field => ["agent.ephemeral_id", "agent.id", "_score", "_id", "agent.type", "agent.version", "log.offset"] remove_field => ["message"] } } else { mutate { remove_field => ["beat", "@version"] } } } output { # 调试控制台输出 stdout { codec => rubydebug { metadata => true } } }
2. 启动一个logstash进程,指定该配置文件 /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/dnsquery.conf ,在前台运行
3.客户端正常收集到日志后,logstash前台会有日志显示
grok正则调试:
标签:grok,logstash,agent,id,field,正则,remove,cs,Logstash From: https://www.cnblogs.com/dreamer-fish/p/17088333.html