首页 > 其他分享 >Logstash Filter插件

Logstash Filter插件

时间:2023-10-28 21:04:29浏览次数:31  
标签:grok 插件 http timestamp Filter 18 message Logstash geoip

数据从源传输到存储的过程中,Logstash 的 filter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;
利用 Grok 从非结构化数据中派生出结构
利用 geoip 从 IP 地址分析出地理坐标
利用 useragent 从 请求中分析操作系统、设备类型等

Logstash Filter插件_css

3.1 Grok插件

3.1.1 grok如何出现的?

我们希望将如下非结构化的数据解析成 json 结构化数据格式
120.27.74.166 - - [30/Dec/2019:11:59:18+0800] "GET / HTTP/1.1" 302 154 "-""Mozilla/5.0 (Macintosh; Intel Mac OS X10_14_1) Chrome/79.0.3945.88Safari/537.36"

需要使用非常复杂的正则表达式;
\[([^]]+)]\s\[(\w+)]\s([^:]+:\s\w+\s\w+\s[^:]+:\S+\s[^:]+:\S+\s\S+).*\[([^]]+)]\s\
[(\w+)]\s([^:]+:\s\w+\s\w+\s[^:]+:
\S+\s[^:]+:\S+\s\S+).*\[([^]]+)]\s\
[(\w+)]\s([^:]+:\s\w+
\s\w+\s[^:]+:\S+\s[^:]+:\S+\s\S+).*

3.1.2 grok解决什么问题

grok其实是带有名字的正则表达式集合。grok 内置了很多 pattern 可以直接使用;
grok介绍
https://www.elastic.co/cn/blog/do-you-grok-grok
grok语法生成器
http://grokdebug.herokuapp.com/

3.1.3 grok语法示意图

Logstash Filter插件_字段_02

3.1.4 grok语法示例

grok示例:使用 grok pattern 将 Nginx 日志格式化为 json 格式;
input {
        http {
                port => 5656
        }
}
filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
}
output {

        stdout {
                codec => rubydebug
        }
}
 
结果示例
{
           "auth" => "-",
        "request" => "/fonts/icons/icon.woff",
      "timestamp" => "30/Oct/2021:10:53:18 +0800",
          "bytes" => "43852",
       "referrer" => "\"http://elk.bertwu.net/css/style.css\"",
           "host" => "10.0.0.1",
       "@version" => "1",
        "headers" => {
         "request_method" => "POST",
              "http_host" => "10.0.0.151:5656",
            "http_accept" => "*/*",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/",
         "content_length" => "269",
           "http_version" => "HTTP/1.1"
    },
       "response" => "200",
       "clientip" => "10.0.0.1",
    "httpversion" => "1.1",
          "ident" => "-",
     "@timestamp" => 2021-10-30T10:18:38.505Z,
          "agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36 Edg/95.0.1020.38\"",
        "message" => "10.0.0.1 - - [30/Oct/2021:10:53:18 +0800] \"GET /fonts/icons/icon.woff HTTP/1.1\" 200 43852 \"http://elk.bertwu.net/css/style.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36 Edg/95.0.1020.38\" \"-\" \"-\"",
           "verb" => "GET"
}

3.2 geoip插件

geoip 插件:根据 ip 地址提供的对应地域信息,比如经纬度、城市名等、方便进行地理数据分析;
input {
        http {
                port => 5656
        }
}
filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
        geoip {
                source => "clientip" #提取clientip字段,获取地域信息
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

结果分析: 对服务器发送 POST 请求,提供一个公网 ip 地址;
 "geoip" => {
         "country_code2" => "CN",
              "timezone" => "Asia/Shanghai",
              "location" => {
            "lon" => 104.0667,
            "lat" => 30.6667
        },
           "region_name" => "Sichuan",
         "country_code3" => "CN",
           "region_code" => "SC",
        "continent_code" => "AS",
             "longitude" => 104.0667,
          "country_name" => "China",
              "latitude" => 30.6667,
                    "ip" => "112.192.179.108"
    },

3.3 fields字段

输出内容太多,可以通过 fileds 选项选择自己需要的信息;
input {
        http {
                port => 5656
        }
}
filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
        geoip {
                source => "clientip"
                fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"] # 提取想要的字段
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

结果
"geoip" => {
         "country_code2" => "CN",
              "timezone" => "Asia/Shanghai",
             "longitude" => 104.0667,
          "country_name" => "China",
              "latitude" => 30.6667,
        "continent_code" => "AS"
    },

3.4 Date插件

date插件:将日期字符串解析为日志类型。然后替换@timestamp 字段或指定的其他字段。
(datazone时间:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html)
match 类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式
target 类型为字符串,用于指定赋值的字段名,默认是 @timestamp
timezone 类型为字符串,用于指定时区域

input {
        http {
                port => 5656
        }
}
filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
        geoip {
                source => "clientip"
                fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"]
        }
# date处理时间 将timestamp 请求的时间覆盖写入时间 30/Dec/2019:11:59:18 +0800   后期需要通过@timestamp作为时间过滤器
        date {
                match => ["timestamp" , "dd/MMM/yyyy:HH:mm:ss Z"]
                target => "nginx_date"
                timezone => "Asia/Shanghai"
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

结果
           "auth" => "-",
        "request" => "/fonts/icons/icon.woff",
      "timestamp" => "30/Oct/2021:10:53:18 +0800",
          "bytes" => "43852",
       "referrer" => "\"http://elk.bertwu.net/css/style.css\"",
     "nginx_date" => 2021-10-30T02:53:18.000Z,

3.5 useragent插件

useragent插件:根据请求中的 user-agent 字段,解析出浏览器设备、操作系统等信息;
input {
        http {
                port => 5656
        }
}
filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
        geoip {
                source => "clientip"
                fields => ["country_name","country_code2","timezone","longitude","latitude","continent_code"]
        }
        date {
                match => ["timestamp" , "dd/MMM/yyyy:HH:mm:ss Z"]
                target => "nginx_date"
                timezone => "Asia/Shanghai"
        }
        #提取agent字段,进行解析
        useragent {
                source => "agent" # 从哪个字段提取
                target => "useragent" # 重命名为新的字段
        }
}

output {
        stdout {
                codec => rubydebug
        }
}

结果
{
           "auth" => "-",
        "request" => "/fonts/icons/icon.woff",
      "timestamp" => "30/Oct/2021:10:53:18 +0800",
          "bytes" => "43852",
       "referrer" => "\"http://elk.bertwu.net/css/style.css\"",
      "useragent" => {
             "os" => "Windows",
          "patch" => "4638",
          "build" => "",
           "name" => "Chrome",
          "minor" => "0",
        "os_name" => "Windows",
         "device" => "Other",
          "major" => "95"
    },

3.6 mutate 插件

mutate 主要是对字段进行、类型转换、删除、替换、更新等操作;
remove_field 删除字段
split 字符串切割(awk取列)
add_field 添加字段
convert 类型转换
gsub 字符串替换

3.6.1 remove_field

mutate 删除无用字段,比如:headers、message、agent
filter {
...
#mutate 删除操作
mutate {
	remove_field => ["headers","message", "agent"]
	}
...
}

3.6.2 split

mutate 中的 split 字符切割, 指定 | 为字段分隔符。
测试数据:5607|提交订单|2019-12-28 03:18:31
...
filter {
	...
	mutate {
        split => { "message" => "|" }
        }
}
...

结果展示
{
    "@timestamp" => 2021-10-30T12:40:15.202Z,
          "tags" => [
        [0] "_grokparsefailure",
        [1] "_geoip_lookup_failure"
    ],
       "message" => [
        [0] "5607",
        [1] "提交订单",
        [2] "2019-12-28 03:18:31"
    ],
          "host" => "10.0.0.1",
      "@version" => "1",
       "headers" => {
         "request_method" => "POST",
              "http_host" => "10.0.0.151:5656",
            "http_accept" => "*/*",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/",
         "content_length" => "52",
           "http_version" => "HTTP/1.1"
    }
}

3.6.3 add_field

mutate 中 add_field,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析
...
filter {
        mutate {
                split => { "message" => "|" }
                #将分割后的字段添加到指定的字段名称
                add_field => {
                        "UserID" => "%{[message][0]}"
                        "Action" => "%{[message][1]}"
                        "Date" => "%{[message][2]}"
                }
        }
}
...

结果展示
{
          "Date" => "2019-12-28 03:18:31",
        "Action" => "提交订单",
    "@timestamp" => 2021-10-30T12:46:37.558Z,
          "tags" => [
        [0] "_grokparsefailure",
        [1] "_geoip_lookup_failure"
    ],
       "message" => [
        [0] "5607",
        [1] "提交订单",
        [2] "2019-12-28 03:18:31"
    ],
        "UserID" => "5607",
          "host" => "10.0.0.1",
      "@version" => "1",
       "headers" => {
         "request_method" => "POST",
              "http_host" => "10.0.0.151:5656",
            "http_accept" => "*/*",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/",
         "content_length" => "37",
           "http_version" => "HTTP/1.1"
    }
}

3.6.4 convert

mutate 中的 convert类型转换。 支持转换integer、float、string等类型;
...
filter {
        mutate {
                split => { "message" => "|" }
                #将分割后的字段添加到指定的字段名称
                add_field => {
                        "UserID" => "%{[message][0]}"
                        "Action" => "%{[message][1]}"
                        "Date" => "%{[message][2]}"
                }
                #对新添加字段进行格式转换
                convert => {
                        "UserID" => "integer"
                        "Action" => "string"
                        "Date" => "string"
                }
                #移除无用的字段
                remove_field => ["headers","message"]
        }
}
...


{
          "Date" => "2019-12-28 03:18:31",
        "Action" => "提交订单",
    "@timestamp" => 2021-10-30T12:52:38.695Z,
          "tags" => [
        [0] "_grokparsefailure",
        [1] "_geoip_lookup_failure"
    ],
        "UserID" => "5607",
          "host" => "10.0.0.1",
      "@version" => "1"
}

标签:grok,插件,http,timestamp,Filter,18,message,Logstash,geoip
From: https://blog.51cto.com/u_13236892/8072868

相关文章

  • PyQt5简介及Designer、Pyuic插件安装
    PyQt5简介及Designer、Pyuic插件安装swallowsonny关注IP属地:湖北0.6972019.08.0712:36:33字数765阅读15,117英文参考文档中文参考文档简介PyQt5是什么Qt是一组跨平台的c++库,实现了访问现代桌面和移动系统许多方面的高级api。这些包括定位和定位服务、多媒体、......
  • C# Webapi Filter 过滤器 - 生命周期钩子函数 - Exception Filter 基础
    什么是Filter?1.切面编程机制,在ASP.NETCore特定的位置执行我们自定义的代码;2.ASP.NETCore中的Filter五种类型,Authorization,filter,resourcefilter,actionfilter,exceptionfitler,resultfilter;3.所有的筛选器都有异步和同步两种版本;eg:IActionFilter,IAsyncActionF......
  • 文心一言+pycharm添加自己的单词本插件
    学习教程:https://yiyan.baidu.com/developer/doc#Dllaifmrc我的单词本插件:必坑:python的版本一定要在3.7以上 ......
  • vs code markdown mermaid预览插件安装
    安装预览插件预览指令使用control+shift+p效果......
  • Ubuntu18.04下安装私人网盘服务NextCloud插件
    一、在线安装插件1.1:浏览器打开NextCloud访问地址1.2:使用管理员账号登录,进入应用管理1.3:找到自己需要的应用,点击安装和启用二、离线安装插件2.1:在应用商店找到需要的应用Allapps-AppStore-NextcloudTheNextcloudAppStore-Uploadyourappsandinstallnewappsontoyo......
  • vue 使用filter 把无限极分类遍历为树形结构
    <scriptsetuplang="ts">interfacelistType{id:numberurl:string}constdata=[{id:1,url:'/_nuxt/assets/images/america.png'},{id:2,url:'/_nuxt/assets/image......
  • Logstash input插件
    input插件用于指定输入源,一个pipeline可以有多个input插件,我们主要围绕下面几个input插件进行介绍stdinfilebeatkafkahttp2.1stdin插件从标准输入读取数据,从标准输出中输出内容cat/etc/logstash/conf.d/stdin_logstash.conf#从终端中输入,输出到中端input{ stdi......
  • Springboot+Mybatis+Mybatisplus 框架中增加自定义分页插件和sql 占位符修改插件
    一、Springboot简介springboot是当下最流行的web框架,SpringBoot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置,让这些框架集成在一起变得更加简单,简化了我......
  • Netfilter日志记录
    iptables-traw-IPREROUTING-ptcp--dport80-jLOG#iptables-traw-IPREROUTING-ptcp--dport80-jLOG--log-level3--log-prefix"ipt-err:" 可以指定log级别日志级别可通过syslog定义进行查看。另外LOG目标还可指定参数:–log-tcp-sequence,–log-tcp-options,–......
  • 金蝶KIS VB插件 老单据如何插入多行值,老单获取基础资料内码、代码、名称
    转自:https://blog.csdn.net/ssyyll/article/details/16804273WhileNotrs.EOF '填充对应的行 Withm_BillTransfer '如果超过两行以上的值,需要先用 .BillForm.InsertRow '插入一行 .SetGridTextLRow,dicFieldEntry("FItemID"),rs("FNumber") .SetGridText......