首页 > 其他分享 >Logstash关于日志清洗记录学习及应用

Logstash关于日志清洗记录学习及应用

时间:2023-05-15 18:13:57浏览次数:35  
标签:mm MM ip Logstash field json 清洗 日志 match

logstash日志清洗:

日志清洗任务,给了我乱七八糟资料之后看的一脸懵,最后决定自己动手整理整理这样有头绪点,记录一下自己学习过程。其中涉及到一些ip内容的我都改为ip,其他保持一致,欢迎大家一起讨论,我也是刚开始学习这个,如有写的不对请多指教。
一、学习阶段
1、首先理解logstash中各个语法:
事件:Logstash 每读取一次数据的行为叫做事件。
1)**Input:**输入插件,允许一个特定的事件源可以读取到 Logstash 管道中。
支持多种数据源:stdin,file,tcp/udp,jdbc,redis,syslog,beat。本使用手册中介绍beat的配置
2)**Filter:**过滤插件,logstash中最常用,也是最复杂的插件,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式。
date:日志解析
grok:正则匹配解析
dissect:分隔符解析
mutate:对字段做处理,比如重命名、删除、替换等
json:按照json解析字段内容到指定字段中
geoip:增加地理位置数据
ruby:利用ruby代码来动态修改logstash event

3)Output:输出插件,每个数据流经过input和filter后,最终要由output输出
常用输出插件有:stdout,Elasticsearch,Redis,File,TCP,本实用教程中介绍标准控制台输出stdout和Elasticsearch输出

2、Grok:将非结构化事件数据分析到字段中
Grok原理:使用的是正则表达式来对字段进行拆分。
match:匹配某个字段,根据正则表达式的规则拆分字符串

3、dissect:基于分隔符原理解析数据,解决grok解析时消耗过多cpu资源问题
使用分隔符将非结构化事件数据提取到字段中,解析过滤器不适用正则表达式,速度非常快,但是如果数据的结构因行而异,grok过滤更合适
应用:有一定局限性,主要使用与每行格式相似且分隔符明确简单的场景
语法:字段(field)和分隔符(delimiter)


4、mutate部分

convert :实现字段类型的转换,类型为hash,仅支持转换为integer、float、string和boolean
gsub:对字段内容进行替换,类型为数组,每3项为一个替换配置 * split:将字符串切割为数组
join:将数组拼接为字符串
merge:将两个数组合并为一个数组,字符串会被转为1个元素的数组进行操作
rename:字段重命名
update/replace:更新字段内容,区别于update只在字段存在时生效,而replace在字段不存在时会执行新增字段的操作

 

5、json:将字段内容为json格式的数据进行解析

source =>“message” #要解析的字段名
target =>“msg_json”
#解析后的存储字段,默认和message同级别

6、geoip:根据ip地址提供对应的地域信息,比如经纬度、城市名等,方便进行地理数据分析
7、ruby:最灵活的插件,可以和ruby语言来随心所欲的修改logstash event对象
8、条件判断:使用条件来决定filter和outup处理特定事件,条件支持if、else if 、else语句,可以嵌套

比较:相等==、!=、<、>、<=、>=
正则:=(匹配正则)、!(不匹配正则)
包含:in(包含)、not
in(不包含)
布尔:and(与),or(或)、nand(非与)、xor(非或)
一元运算符:!(取反)、()(复合表达式)、!() (对复合表达式结果取反)

二、应用阶段
1、首先根据需要提取的字段在es上对grok部分进行正则编写
2、编写单独测试
日志内容:
1)有json嵌套

2021-07-30T10:57:03.572999+08:00 120.0.0.1 ||json{"fieldValues": {"lid":20761143179600942,"cvendor":"micsoft","cproduct":"Server 4.0 ","cdevname":"WIN-N3LDQJCFQD8","ieventpri":3,"cresult":"success","cdevip":"ip","model":"event","ccolletorip":"ip","isport":49935,"cdevtype":"/server/windows","cregex":"OS_Windows_100","cprogram":"Microsoft-Windows-Security-Auditing","loccurtime":1627611741000,"coperation":"detect","ccollectsource":"ip:UDP:514","corilog":"<190>Jul 30 10:22:21 csw Server 4.0 \tSecurity\tWIN-N3LDQJCFQD8\tMicrosoft-Windows-Security-Auditing\tNone\tUnknown User\tFri Jul 30 10:22:21 2021\t5\t5158\t\" Windows筛选平台已允许绑定本地端口。应用程序信息:\t进程ID:\t\t664\t应用程序名称:\t\\device\\harddiskvolume2\\windows\\system32\\svchost.exe网络信息:\t源地址:\t\t0.0.0.0\t源端口:\t\t49935\t协议:\t\t17筛选器信息:\t筛选器运行时ID:\t0\t层名称:\t\t%%14608\t层运行时},"sid":"20761143179600942"}#015

相对应清洗应该是这样:

input {
  stdin {
  }
}
filter{
        grok {
              match => { "message" => "(?<datetime>.*?) .*? \|\|json(?<jsonLog>.*)#" }
              }
        json {
              source => "jsonLog"
              }
        mutate {
              remove_field => ["fieldValues",'@version','host',"jsonLog"]
              rename => {
                           "[fieldValues][cdevname]" => "device_name"
                           "[fieldValues][cdevip]" => "ip"
                       }
              add_field => {"[handle_time]" => "%{+YYYY.MM.dd.HH.mm.ss}"}
               }
       date{
              match => ["datetime", "yyyy-MM-dd HH:mm:ss"]
              target => "datetime"
              timezone => "Etc/UTC"
            }
      }
output {
    stdout{
        codec => rubydebug {
        metadata => true
        }
    }
}

  2)日志内容:

2021-07-30T10:43:06+08:00 LDB_DBA DBFW: CEF:ip:514|发生时间:2021-07-30 10:43:00|服务器IP:ip内容|服务器端口:9901|数据库实例名:mysql|数据库名:shjs-cdrms|数据库版本:4000|客户端IP:ip内容|客户端端口:33986|客户端MAC:005056887A82|客户端工具:MySQL Connector/J|客户端操作系统用户:N/A|应用用户:N/A|数据库用户:ROOT|风险类型:N/A|风险级别:N/A|引擎动作:放行|规则名称:N/A|操作类型:SELECT |访问对象:ACT_RU_TIMER_JOB|响应时间:4042|执行结果:成功|影响行数:0|SQL语句:SELECT RES.*FROM ACT_RU_TIMER_JOB RES WHERE SCOPE_TYPE_='cmmn' AND DUEDATE_<='2021-07-30 10:56:37.094' AND LOCK_OWNER_ IS NULL LIMIT 1 OFFSET 0

input {
  stdin {
  }
}
filter{
    grok {
           match => {
               "message" => "(?<datetime>.*?) .*?\|客户端IP:(?<客户端IP>.*?)\|.*?\|SQL语句:(?<SQL语句>.*)"
                     }
         }
          mutate {
             remove_field => ['@version','host']
             add_field => {"[handle_time]" => "%{+YYYY.MM.dd.HH.mm.ss}"}
             }
     date{
        match => ["datetime", "yyyy-MM-dd HH:mm:ss"]
        target => "datetime"
        timezone => "Etc/UTC"
            }
      }
output {
    stdout{
        codec => rubydebug {
        metadata => true
        }
    }
}

  3、单独测试通过之后合并一起

input {
  stdin {
  }
}

filter{
       if "ip" in [message]{
        grok {
        match => { "message" => "(?<datetime>.*?) .*? \|\|json(?<jsonLog>.*)#" }
              }
        json {
              source => "jsonLog"
              }
        mutate {
              remove_field => ["fieldValues",'@version','host',"jsonLog"]
              rename => {
                           "[fieldValues][cdevname]" => "device_name"
                           "[fieldValues][cdevip]" => "ip"
                       }
              add_field => {"[handle_time]" => "%{+YYYY.MM.dd.HH.mm.ss}"}
               }
        date{
              match => ["datetime", "yyyy-MM-dd HH:mm:ss"]
              target => "datetime"
              timezone => "Etc/UTC"
               }
          }
        else if "LDB_DBA" in [message] {
        grok {
           match => {
               "message" => "(?<datetime>.*?) .*?\|客户端IP:(?<客户端IP>.*?)\|.*?\|SQL语句:(?<SQL语句>.*)"
                     }
         }
          mutate {
             remove_field => ['@version','host']
             add_field => {"[handle_time]" => "%{+YYYY.MM.dd.HH.mm.ss}"}
             }
       date{
             match => ["datetime", "yyyy-MM-dd HH:mm:ss"]
             target => "datetime"
             timezone => "Etc/UTC"
            }
      }

output {
    stdout{
        codec => rubydebug {
        metadata => true
        }
    }
}

  

 

标签:mm,MM,ip,Logstash,field,json,清洗,日志,match
From: https://www.cnblogs.com/xiedy001/p/17402710.html

相关文章

  • elk日志收集之rsyslog软连接监控文件深度坑
    业务中通过rsyslog监控本地文件收集了一些redis和mc的慢日志,推到elk集群分析,这些日志一天一个文件,每晚零点5分通过计划任务用软连接的方式将新的文件固定到指定文件下,但是最近发现日志丢了很多,分析中发现了一个深坑,先说下现有的配置:rsyslog的配置如下,监控固定的文件:local6.*......
  • Tomcat日志大
    #LicensedtotheApacheSoftwareFoundation(ASF)underoneormore#contributorlicenseagreements.SeetheNOTICEfiledistributedwith#thisworkforadditionalinformationregardingcopyrightownership.#TheASFlicensesthisfiletoYouundertheA......
  • JDBC学习日志五,JDBC操作事务
    事务:要么成功,要么失败ACID原则原子性:要么全部完成,要么都不完成一致性:最终的结果总数据不发生改变隔离性:多个进程互不干扰持久性:数据一旦提交不可逆,持久化到数据库模拟转账事务//模拟转账publicclassTransferDemo{publicstaticvoidmain(String[]args)th......
  • Java日志体系
    转载:https://juejin.cn/post/6905026199722917902前言对于一个应用程序来说日志记录是必不可少的一部分。线上问题追踪,基于日志的业务逻辑统计分析等都离不日志。java领域存在多种日志框架,目前常用的日志框架包括Log4j1,Log4j2,CommonsLogging,Slf4j,Logback,Jul。但是在我们的......
  • JDBC学习日志四,PreparedStatement
    PreparedStatement可以防止sql注入问题,效率更高先进行预编译sql,将要设置的字段值使用占位符本质:预编译会将传递进来的参数包裹成字符,而单引号会被转义字符转换为空内容,有效的防止sql注入的问题CRUD--SELECTStringsql="select*fromuserswhereid=?";st=......
  • python-flask 技能点使用-03 请求钩子实现审计日志
    场景分析     使用pythonflask开发web系统,该系统是基于用户认证鉴权的web系统,系统中涉及到关键数据的操作,因此需要针对业务操作进行记录(也就是审计日志),便于管理员后期查看,在基于java的Spring系列框架中我们可以借助于AOP面向切面的编程来完成,在使用Flask时可以借助......
  • JDBC学习日志二,第一个JDBC程序与JDBC对象解释
    第一步,创建user表,测试数据库CREATEDATABASE`jdbcstudy`/*!40100DEFAULTCHARACTERSETutf8mb3*//*!80016DEFAULTENCRYPTION='N'*/usejdbcstudy;CREATETABLE`users`(`id`intNOTNULL,`NAME`varchar(40)DEFAULTNULL,`PASSWORD`varchar(40......
  • MySQL学习日志十四,数据库的备份
    数据库备份必要性1.保证重要数据不丢失2.数据转移3.MySQL数据库备份方法mysqldump备份工具1.数据库管理工具,如SQLyog2.直接拷贝数据库文件和相关配置文件3.mysqldump客户端作用:转储数据库搜集数据库进行备份将数据转移到另一个SQL服务器,不一定是MySQL服务器--导......
  • 小知识:设置archive_lag_target参数强制日志切换
    为客户测试一个ADG场景问题,发现测试环境的日志切换频率过低,总是需要定期手工切换,这非常影响测试心情。实际上,可以设置archive_lag_target参数强制日志切换。比如设置:altersystemsetarchive_lag_target=1800;这样即使库没任何压力,半小时也会切换一次日志。该设置同时也适......
  • MySQL学习日志十三,索引
    一、索引的作用1.提高查询速度2.确保数据的唯一性3.可以加速表和表之间的连接,实现表与表之间的参照完整性4.使用分组和排序子句进行数据检索时,可以显著减少分组和排序的时间5.全文检索字段进行搜索优化.二、分类1.主键索引(PrimaryKey)2.唯一索引(Unique)3.常规......