1.背景
1.1 简介
Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助业务做解析,丰富,转换和缓冲来自各种来源的数据。Logstash 是一个数据流引擎
- 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
- 在几分钟内建立数据流管道
- 具有水平可扩展及韧性且具有自适应缓冲
- 不可知的数据源
- 具有 200 多个集成和处理器的插件生态系统
- 使用 Elastic Stack 监视和管理部署
Logstash 几乎可以摄入各种类别的数据
它可以摄入日志,文件,指标或者网路真实数据。经过 Logstash 的处理,变为可以使用的 Web Apps 可以消耗的数据,也可以存储于数据中心,或变为其它的流式数据。Logstash 相关概念
- Logstash 实例是一个正在运行的 Logstash 进程。建议在 Elasticsearch 的单独主机上运行 Logstash,以确保两个组件有足够的计算资源可用。
- 管道(pipeline)是配置为处理给定工作负载的插件集合。一个 Logstash 实例可以运行多个管道。(彼此独立)
- 输入插件(input plugins)用于从给定的源系统中提取或接收数据。 Logstash 参考指南中提供了支持的输入插件列表:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
- 过滤器插件(filter plugin)用于对传入事件应用转换和丰富。 Logstash 参考指南中提供了支持的过滤器插件列表:Filter plugins | Logstash Reference [8.3] | Elastic
- 输出插件(output plugin)用于将数据加载或发送到给定的目标系统。 Logstash 参考指南中提供了支持的输出插件列表:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
1.2 学习参考
- Logstash官方文档:《Logstash官方文档》
- 中国社区官方博客:《Logstash入门教程》
- 其他参考技术博客:《通过Logstash实现mysql数据定时增量同步到ES》
- Logstash解析:《解析插件-Grok》
1.3 本例测试版本
[root@dev1613 study]# sudo -u logstash ../bin/logstash --version Using bundled JDK: /opt/logstash/jdk logstash 7.12.1
2.功能应用
2.1 基础测试
输入测试命令,../bin为当前执行命令所在文件夹,与logstash安装后bin的相对目录位置。sudo -u logstash ../bin/logstash -e 'input { stdin { } } output { stdout {} }'
执行命令后,输出结果如图:
2.2 Logstash解析日志文件
最原始的 Log 数据,经过 Logstash 的处理,可以把非结构化的数据变成结构化的数据。甚至可以使用 Logstash 强大的 Filter 来对数据继续加工。最终将加工后的数据存储下来,用于分析和搜索。日志原始内容
2022-07-06 18:48:37.453 ERROR 14677 --- [ dispatcher 108] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104506000&endTime=1657104512000&refetch=false>: socket timeout 2022-07-06 18:48:44.439 ERROR 14677 --- [ dispatcher 109] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104513000&endTime=1657104519000&refetch=false>: socket timeout 2022-07-06 18:48:51.514 ERROR 14677 --- [ dispatcher 110] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104520000&endTime=1657104526000&refetch=false>: socket timeout
Logstash配置文件
编写日志解析配置文件,并解析时间,错误级别,错误行,错误信息。提取出来变为结构化数据。编写配置文件如下: 配置相关节点参考官方文档:《plugins-inputs-file》input { file { path => "/opt/logstash/study/outlog.log" start_position => "beginning" stat_interval => "3" type => "sentinel-log" } } filter { grok { match => ["message","%{TIMESTAMP_ISO8601:datetime} %{LOGLEVEL:loglevel} %{NUMBER:textid} %{GREEDYDATA:errormsg}"] } json { source => "request" } } output { stdout { codec => rubydebug } }
Grok日志解析在线测试
基于elastic在线网页,可编写解析日志测试demo。日志解析结构化输出
运行命令:sudo -u logstash ../bin/logstash -f study-file-es.conf 运行logstash加载配置文件命令,启动测试输出结构化内容如下:2.3 Logstash-数据库同步
本例将MySql数据表中的数据,基于修改时间同步到es数据存储中心。基础数据内容
数据源-mysql数据表建表语句:CREATE TABLE `study_logstash_es` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增主键', `study_code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '编码', `study_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名称', `study_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '标签', `study_level` smallint NOT NULL DEFAULT '0' COMMENT '等级,如1,2,3', `is_delete` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0 未删除 1 删除', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `operate_user` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作人', PRIMARY KEY (`id`), UNIQUE KEY `uniq_study_code` (`study_code`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='学习-logstash同步msql数据到es';目标源-es索引创建脚本:
PUT /study_logstash_es { "settings": { "index": { "number_of_shards": 1, "number_of_replicas": 1 } }, "mappings": { "properties": { "id": { "type": "integer" }, "study_code": { "type": "text" }, "study_name": { "type": "text" }, "operate_user": { "type": "text" }, "study_tag": { "type": "keyword" }, "is_delete": { "type": "integer" }, "study_level": { "type": "integer" }, "mark_time": { "type": "date", "format": "epoch_millis" }, "update_time": { "type": "date" } } } }
Logstash配置文件
本例测试的数据库地址,es地址,已经基于xxx脱敏。更多jdbc的配置,请参考官方文档:《plugins-inputs-jdbc》。 jdbc_driver_library:为mysql连接包,可在Maven上下载,下载地址参考:《mysql-connector-java.jar 包下载》。input { jdbc { jdbc_driver_library => "/opt/logstash/study/mysql-connector-java-8.0.30.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxx.xxx.xx.x:3306/study_database?serverTimezone=Asia/Shanghai&allowMultiQueries=true&characterEncoding=utf-8" jdbc_user => "root" jdbc_password => "xxxxx" jdbc_paging_enabled => true jdbc_page_size => "2" use_column_value => true tracking_column => "mark_time" tracking_column_type => "numeric" schedule => "* * * * *" statement => "SELECT id,study_code,study_name,study_tag,study_level,operate_user,update_time,UNIX_TIMESTAMP(update_time) as mark_time from study_logstash_es where UNIX_TIMESTAMP(update_time)>:sql_last_value AND update_time < NOW()" } } output{ elasticsearch{ hosts => ["xxx.xxx.16.4:9200","xxx.xxx.16.xx:9200","192.xxx.xx.xx:9200"] index => "study_logstash_es" timeout => 300 user => "xxx" password => "xxxxx" } }
数据同步es
运行命令:sudo -u logstash ../bin/logstash -f study-mysql-es.conf 运行logstash加载配置文件命令,启动运行日志,es同步的数据如下: es数据查询如下:2.4 Logstash-kafka消息同步
Logstash的输入项可以监听kafka消息,消费消息记录。input { kafka { bootstrap_servers => "xxx.xxx.xx.4:9092,xxx.xxx.16.4:9093,xxx.xxx.16.4:9094" #kafka服务器地址 topics => "xxxlog" # batch_size => 5 codec => "json" group_id => "logstash" consumer_threads => 3 } } filter { # 丢弃所有的header请求 if [request][method] == "HEAD" { drop { } } # 因为[request][querystring]这个玩意中的字段类型可能不一样,所以全部干成字符串 ruby { code => "event.set('[request][querystring]', event.get('[request][querystring]').to_s) if event.get('[request][querystring]')" } if [request][uri] =~ "^/ucenter-admin-view/v3(.*)" { mutate { add_field => { "log_source" => "用户中心管理后台" } add_field => { "log_source_id" => "1" } } } else if [request][uri] =~ "^/ucenter-org-view/v3/(.*)" { mutate { add_field => { "log_source" => "用户中心工作台" } add_field => { "log_source_id" => "2" } } } else if [request][uri] =~ "^/safety-admin-api(.*)" { mutate { add_field => { "log_source" => "安全管理平台" } add_field => { "log_source_id" => "3" } } } else{ mutate { add_field => { "log_source" => "其他" } add_field => { "log_source_id" => "0" } } } grok { match => { "[request][uri]" => "%{URIPATH:[request][path]}" } named_captures_only => false } } output{ # stdout { # codec => json # } elasticsearch{ hosts => ["xxx.xxx.xx.4:9200","xxx.xxx.16.13:9200","xxx.xxx.16.14:9200"] index => "apisixlog" timeout => 300 user => "elastic" password => "HApn2xCJMuRlg0UOIV0P" }
3.总结
- Logstash基于 输入(inputs),过滤器(filters)和输出(outputs)可以方便快捷的处理数据,将一些非结构化数据,处理为结构化数据。Logstash支持数据中转,数据同步等场景的应用。本例只是简要测试,在实际业务使用时,可基于某一个输入插件/输出插件参考官方文档,结合项目使用。
- Logstash收集大量日志时,存在耗内存的情况,建议参考官方推荐的FileBeat模式。详情参考文档:《开源日志管理方案 ELK 和 EFK 的区别》,《通过Filebeat把日志传入到Elasticsearch》。
- Logstash在配置文件调整后,启动命令,可能出现如下报错:
- Logstash启动命名常用如下:
sudo -u logstash ../bin/logstash -f study-file-es.conf 表示当前窗口启动,关闭或退出命令行时,logstash实例关闭。 sudo -u logstash ../bin/logstash -f study-file-es.conf --config.reload.automatic 表示当前窗口启动,配置文件变化时,不用重新启动实例,可自动加载。关闭或退出命令行时,logstash实例关闭。 sudo -u logstash ../bin/logstash -f study-mysql-es.conf & test.out --config.reload.automatic 表示后台启动,关闭退出命令,实例在后台一直运行。 ps -ef|grep logstash kill-9 进程号, 关闭对应的实例
- Logstash运行日志查看