背景:
nginx日志没有使用json格式,直接通过flume输入到kafka,使用logstash读取kafka,将日志转换成json格式输入到es中。再从es中到prometheus。主要记录logstash实现转换过程记录。
input { # 输入组件 kafka { # 从kafka消费数据 bootstrap_servers => ["192.168.1.2:9292,192.168.1.3:9292,192.168.1.4:9292"] #topics => "%{[@metadata][topic]}" # 使用kafka传过来的topic #topics_pattern => "elk-.*" # 使用正则匹配topic topics => "pokekara_nginx_log" codec => plain { format => "%{message}" } # 数据格式,如果是json直接写json即可。 consumer_threads => 3 # 消费线程数量 decorate_events => true # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段 auto_offset_reset => "latest" # 自动重置偏移量到最新的偏移量 group_id => "logstash-groups1" # 消费组ID,多个有相同group_id的logstash实例为一个消费组 client_id => "logstash1" # 客户端ID fetch_max_wait_ms => "1000" # 指当没有足够的数据立即满足fetch_min_bytes时,服务器在回答fetch请求之前将阻塞的最长时间 } } filter { #grok { #match => { "message" => "%{IPORHOST:client_ip}:%{NUMBER:client_port}\t-\t%{HTTPDUSER:ident}\t%{HTTPDATE:timestamp}\t%{IPORHOST:server_ip}:%{NUMBER:server_port}\t%{NUMBER:request_duration}\t%{NUMBER:response_duration}\t%{WORD:request_method}\t%{URIPATHPARAM:request_path}\t%{NUMBER:http_version}\t%{NUMBER:response_code}\t%{NUMBER:response_size}\t-\t%{DATA:user_agent}\t%{IPORHOST:source_ip}\t%{WORD:country}\t%{NUMBER:city_code}\t%{WORD:city_name}\t%{NUMBER:unknown_field1}\t%{NUMBER:unknown_field2}\t%{IPORHOST:nginx_ip}\t%{NUMBER:nginx_port}\t%{WORD:destination_country}\t%{WORD:destination_city}" } #} ruby { code => ' keys = ["remote_addr","remote_user","time_local","upstream_addr","upstream_response_time","request_time","request","status","body_bytes_sent","http_referer","http_user_agent","http_x_forwarded_for","geoip_country_name","geoip_region","geoip_city","upstream_connect_time","upstream_header_time","server_addr","request_length","geoip2_data_country_name","geoip2_data_city_name"] values = event.get("message").split("\t") parts = values[6].split(" ") hmethod = parts[0] suri = parts[1].split("?") uri = suri[0]
#新增一个key keys << "hmethod"
#新增一个key的值 values << hmethod keys << "uri" values << uri merged_values = Hash[keys.zip(values)] event.set("[newmess][request]", merged_values["request"]) event.set("newmess", merged_values) #merget_values= Hash[keys.zip(values)] #custom_output = merget_values.map { |k, v| "#{k}:#{v}" }.join(",") event.remove("message") ' } mutate { #split => ["[newmess][request]"," "] #add_field => {"method" => "%{[newmess][request][0]}"} #split => ["%{[newmess][request][1]}","?"] #add_field => {"uri" => "%{[newmess][request][1][0]}"} remove_field => ["[@version]","[newmess][remote_addr]","[newmess][remote_user]","[newmess][time_local]","[newmess][upstream_addr]","[newmess][request]","[newmess][geoip_country_name]","[newmess][geoip_region]","[newmess][geoip_city]","[newmess][upstream_connect_time]","[newmess][upstream_header_time]","[newmess][request_length]"] } } output { elasticsearch { # 输出组件 # Logstash输出到es hosts => ["192.168.1.5:9200"] #index => "%{[fields][source]}-%{+YYYY-MM-dd}" # 直接在日志中匹配,索引会去掉elk index => "pokekara-nginx-%{+YYYY-MM-dd}" # 直接在日志中匹配,索引会去掉elk user => "elastic" password => "passwd" action => "create" #此处使用index模式报错,使用了create #document_type => "_doc" #document_id => "%{[@metadata][_id]}" } } #调试模式,输出到屏幕 #output { # stdout { # codec => rubydebug # } #}
标签:newmess,request,NUMBER,grafana,nginx,time,日志,t% From: https://www.cnblogs.com/i1991/p/17532949.html