数据从源传输到存储的过程中,Logstash 的 filter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;
利用 Grok 从非结构化数据中派生出结构
利用 geoip 从 IP 地址分析出地理坐标
利用 useragent 从 请求中分析操作系统、设备类型等
3.1 Grok插件
3.1.1 grok如何出现的?
我们希望将如下非结构化的数据解析成 json 结构化数据格式
120.27.74.166 - - [30/Dec/2019:11:59:18+0800] "GET / HTTP/1.1" 302 154 "-""Mozilla/5.0 (Macintosh; Intel Mac OS X10_14_1) Chrome/79.0.3945.88Safari/537.36"
需要使用非常复杂的正则表达式;
\[([^]]+)]\s\[(\w+)]\s([^:]+:\s\w+\s\w+\s[^:]+:\S+\s[^:]+:\S+\s\S+).*\[([^]]+)]\s\
[(\w+)]\s([^:]+:\s\w+\s\w+\s[^:]+:
\S+\s[^:]+:\S+\s\S+).*\[([^]]+)]\s\
[(\w+)]\s([^:]+:\s\w+
\s\w+\s[^:]+:\S+\s[^:]+:\S+\s\S+).*
3.1.2 grok解决什么问题
grok其实是带有名字的正则表达式集合。grok 内置了很多 pattern 可以直接使用;
grok介绍
https://www.elastic.co/cn/blog/do-you-grok-grok
grok语法生成器
http://grokdebug.herokuapp.com/
3.1.3 grok语法示意图
3.1.4 grok语法示例
grok示例:使用 grok pattern 将 Nginx 日志格式化为 json 格式;
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
output {
stdout {
codec => rubydebug
}
}
结果示例
{
"auth" => "-",
"request" => "/fonts/icons/icon.woff",
"timestamp" => "30/Oct/2021:10:53:18 +0800",
"bytes" => "43852",
"referrer" => "\"http://elk.bertwu.net/css/style.css\"",
"host" => "10.0.0.1",
"@version" => "1",
"headers" => {
"request_method" => "POST",
"http_host" => "10.0.0.151:5656",
"http_accept" => "*/*",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/",
"content_length" => "269",
"http_version" => "HTTP/1.1"
},
"response" => "200",
"clientip" => "10.0.0.1",
"httpversion" => "1.1",
"ident" => "-",
"@timestamp" => 2021-10-30T10:18:38.505Z,
"agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36 Edg/95.0.1020.38\"",
"message" => "10.0.0.1 - - [30/Oct/2021:10:53:18 +0800] \"GET /fonts/icons/icon.woff HTTP/1.1\" 200 43852 \"http://elk.bertwu.net/css/style.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36 Edg/95.0.1020.38\" \"-\" \"-\"",
"verb" => "GET"
}
3.2 geoip插件
geoip 插件:根据 ip 地址提供的对应地域信息,比如经纬度、城市名等、方便进行地理数据分析;
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip" #提取clientip字段,获取地域信息
}
}
output {
stdout {
codec => rubydebug
}
}
结果分析: 对服务器发送 POST 请求,提供一个公网 ip 地址;
"geoip" => {
"country_code2" => "CN",
"timezone" => "Asia/Shanghai",
"location" => {
"lon" => 104.0667,
"lat" => 30.6667
},
"region_name" => "Sichuan",
"country_code3" => "CN",
"region_code" => "SC",
"continent_code" => "AS",
"longitude" => 104.0667,
"country_name" => "China",
"latitude" => 30.6667,
"ip" => "112.192.179.108"
},
3.3 fields字段
输出内容太多,可以通过 fileds 选项选择自己需要的信息;
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"] # 提取想要的字段
}
}
output {
stdout {
codec => rubydebug
}
}
结果
"geoip" => {
"country_code2" => "CN",
"timezone" => "Asia/Shanghai",
"longitude" => 104.0667,
"country_name" => "China",
"latitude" => 30.6667,
"continent_code" => "AS"
},
3.4 Date插件
date插件:将日期字符串解析为日志类型。然后替换@timestamp 字段或指定的其他字段。
(datazone时间:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html)
match 类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式
target 类型为字符串,用于指定赋值的字段名,默认是 @timestamp
timezone 类型为字符串,用于指定时区域
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"]
}
# date处理时间 将timestamp 请求的时间覆盖写入时间 30/Dec/2019:11:59:18 +0800 后期需要通过@timestamp作为时间过滤器
date {
match => ["timestamp" , "dd/MMM/yyyy:HH:mm:ss Z"]
target => "nginx_date"
timezone => "Asia/Shanghai"
}
}
output {
stdout {
codec => rubydebug
}
}
结果
"auth" => "-",
"request" => "/fonts/icons/icon.woff",
"timestamp" => "30/Oct/2021:10:53:18 +0800",
"bytes" => "43852",
"referrer" => "\"http://elk.bertwu.net/css/style.css\"",
"nginx_date" => 2021-10-30T02:53:18.000Z,
3.5 useragent插件
useragent插件:根据请求中的 user-agent 字段,解析出浏览器设备、操作系统等信息;
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"]
}
date {
match => ["timestamp" , "dd/MMM/yyyy:HH:mm:ss Z"]
target => "nginx_date"
timezone => "Asia/Shanghai"
}
#提取agent字段,进行解析
useragent {
source => "agent" # 从哪个字段提取
target => "useragent" # 重命名为新的字段
}
}
output {
stdout {
codec => rubydebug
}
}
结果
{
"auth" => "-",
"request" => "/fonts/icons/icon.woff",
"timestamp" => "30/Oct/2021:10:53:18 +0800",
"bytes" => "43852",
"referrer" => "\"http://elk.bertwu.net/css/style.css\"",
"useragent" => {
"os" => "Windows",
"patch" => "4638",
"build" => "",
"name" => "Chrome",
"minor" => "0",
"os_name" => "Windows",
"device" => "Other",
"major" => "95"
},
3.6 mutate 插件
mutate 主要是对字段进行、类型转换、删除、替换、更新等操作;
remove_field 删除字段
split 字符串切割(awk取列)
add_field 添加字段
convert 类型转换
gsub 字符串替换
3.6.1 remove_field
mutate 删除无用字段,比如:headers、message、agent
filter {
...
#mutate 删除操作
mutate {
remove_field => ["headers","message", "agent"]
}
...
}
3.6.2 split
mutate 中的 split 字符切割, 指定 | 为字段分隔符。
测试数据:5607|提交订单|2019-12-28 03:18:31
...
filter {
...
mutate {
split => { "message" => "|" }
}
}
...
结果展示
{
"@timestamp" => 2021-10-30T12:40:15.202Z,
"tags" => [
[0] "_grokparsefailure",
[1] "_geoip_lookup_failure"
],
"message" => [
[0] "5607",
[1] "提交订单",
[2] "2019-12-28 03:18:31"
],
"host" => "10.0.0.1",
"@version" => "1",
"headers" => {
"request_method" => "POST",
"http_host" => "10.0.0.151:5656",
"http_accept" => "*/*",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/",
"content_length" => "52",
"http_version" => "HTTP/1.1"
}
}
3.6.3 add_field
mutate 中 add_field,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析
...
filter {
mutate {
split => { "message" => "|" }
#将分割后的字段添加到指定的字段名称
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
}
}
...
结果展示
{
"Date" => "2019-12-28 03:18:31",
"Action" => "提交订单",
"@timestamp" => 2021-10-30T12:46:37.558Z,
"tags" => [
[0] "_grokparsefailure",
[1] "_geoip_lookup_failure"
],
"message" => [
[0] "5607",
[1] "提交订单",
[2] "2019-12-28 03:18:31"
],
"UserID" => "5607",
"host" => "10.0.0.1",
"@version" => "1",
"headers" => {
"request_method" => "POST",
"http_host" => "10.0.0.151:5656",
"http_accept" => "*/*",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/",
"content_length" => "37",
"http_version" => "HTTP/1.1"
}
}
3.6.4 convert
mutate 中的 convert类型转换。 支持转换integer、float、string等类型;
...
filter {
mutate {
split => { "message" => "|" }
#将分割后的字段添加到指定的字段名称
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
#对新添加字段进行格式转换
convert => {
"UserID" => "integer"
"Action" => "string"
"Date" => "string"
}
#移除无用的字段
remove_field => ["headers","message"]
}
}
...
{
"Date" => "2019-12-28 03:18:31",
"Action" => "提交订单",
"@timestamp" => 2021-10-30T12:52:38.695Z,
"tags" => [
[0] "_grokparsefailure",
[1] "_geoip_lookup_failure"
],
"UserID" => "5607",
"host" => "10.0.0.1",
"@version" => "1"
}
标签:grok,插件,http,timestamp,Filter,18,message,Logstash,geoip
From: https://blog.51cto.com/u_13236892/8072868