一、概要
1.1、官网地址
https://www.elastic.co/cn/logstash/
1.2、介绍
Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。也就是一个采集-------> 过滤------> 输出的过程,从哪里采集,采集后到哪里去,中间要不要过滤掉一下东西。
1.3、input输入
input 就是Logstash的入口,数据首先需要经过这一步。它支持采集很多输入选择,比如syslog、redis、file、es、exec、log4j、jdbc、kafka等。
1.4、filter过滤
filter是个负责筛选的部分,数据从源传输到输出的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。它支持很多过滤库,如下:
.5、输出
Elasticsearch 是首选输出方向,但也支持很多选择(与输入差不多),可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
1.6、应用场景
Logstash最常用于ELK(elasticsearch + logstash + kibane)中作为日志收集器使用。可同步mysql数据到es等。这里我的案例是订阅kafka到es的过称。
二、安装
2.1、下载地址
Download Logstash Free | Get Started Now | Elasticwww.elastic.co/cn/downloads/logstash2.2、启动
logstash是基于JDK的,所以必须先安装Jdk环境
在安装目录下执行命令./bin/logstash,但一般需要指定配置文件,例如:
nohup ./bin/logstash -f conf.conf &
logstash参数配置
input配置: file:读取文件 input { file{ path => ["/var/log/*.log","/var/log/message"] type => "system" start_position => "beginning" } } start_position 代表logstash初次采集文件内容的位置,。取值:beginning或者end。end代表每次都是从文件尾部开始。 start_position:logstash从什么位置读取文件数据,默认是结束的位置,也就是说logstash会以类似tail -f的形式运行。 如果需要导入原始数据,需要把这个设定为:"beginnning",logstash就从头开始读取. stdin:标准输入 input { stdin { add_filed =>{"key" => "value"} codec => "plain" tags => ["add"] type => "std" } } input { stdin { type => "web" } } filter{ if[type] == "web"{ gork{ match => ["message",%{COMBINEDAPACHELOG}] } } } output { if "_grokparsefailure" in [tags] { nagios_nsca{ nagios_status => "1" } }else { elasticsearch { } } } filter配置: data:时间处理 %{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删除这个字段保留自己的字段,而是应该用filter/date转换后删除自己的字段. filter { grok { match => ["message","%{HTTPDATE:logdate}"] } date { match => ["logstash","dd/MMM/yyyy:HH:mm:ss Z"] } } 时区偏移量使用Z. output配置: elasticsearch: output { elasticsearch { hosts => ["192.168.0.2:9200"] index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" flush_size => 20000 idle_flush_time => 10 sniffing => true template_overwrite => true } } ?logstash在有多个conf文件的情况下,进入es的数据会重复,几个conf数据就会重复几次. !output段顺序执行,没有对日志type进行判断的各插件配置都会全部执行一次. output { if [type] == "nginxaccess" { elasticsearch { } } } email:发送邮件 output { email { to => "[email protected],[email protected]" cc => "[email protected]" via => "smtp" subject => "Warning: %{title}" options => { smtpIporHost => "localhost", port => 25, domain => 'localhost.localdomain', userName => nil, password => nil, authenticationType => nil, # (plain, login and cram_md5) starttls => true } htmlbody => "" body => "" attachments => ["/path/to/filename"] } } 注意:option参数在logstash2.0以后已经被移除. output { email { port => "25" address => "smtp.126.com" username => "[email protected]" password => "" authentication => "plain" use_tls => true from => "[email protected]" subject => "Warning: %{title}" to => "[email protected]" via => "smtp" body => "%{message}" } } file:保存成文件 output { file { path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz" message_format => "%{message}" gzip => true } }
三、简单配置使用
3.1、简单配置(控制台输出,只要用于调试)
创建一个test.conf的文件
input{ stdin {} } output { stdout {} }
启动
./bin/logstash -f ./test.conf
启动后,在控制台输入什么就会发现它message也输出内容,其他字段是它模版自带的
# 输入 sfghfghfgh # 输出 { "message" => "sfghfghfgh", "host" => "iZ2ze7b12m7993dl7fyi22Z", "@timestamp" => 2021-08-11T03:07:34.719Z, "@version" => "1" }
3.2、简单配置添加过滤器
在刚才的文件里加入一个过滤器,输入字母的小写转化为大写
我的心得:修改数据后不能使用新的字段接收,是重新赋值覆盖旧的数据而已。
input{ stdin {} } filter { mutate { uppercase => [ "message" ] } } output { stdout { } } # 输入 asdf # 输出 { "message" => "ASDF", "host" => "iZ2ze7b12m7993dl7fyi22Z", "@timestamp" => 2021-08-11T03:07:34.719Z, "@version" => "1" }
3.3、读取文件
input{ file { # 收集的文件,多个用,隔开,用绝对路径 path => [ "/data/test/*.log"] start_position => "end" } } filter { mutate { add_field => { "add_msg" => "%{[message]}" } } } output { stdout { } }
start_position 代表logstash初次采集文件内容的位置,。取值:beginning或者end。end代表每次都是从文件尾部开始。add_field为添加输出字段
如果想输出到文件里,那么就是
output { file { path => "/data/test2/test.log" codec => line { format => "格式化后数据: %{message}"} } }
这是一个简单的一个输出,codec 定义输出内容格式,默认为json_lines(已json格式输出)。
3.4、从文件输出到es
input{ file { # 收集的文件,多个用,隔开,用绝对路径 path => [ "/data/test/*.log"] start_position => "end" } } output { elasticsearch { hosts => ["xxx:9200"] index => "log-%{+YYYY-MM}" } }
3.5、与Redis、Kafka集成
input { kafka { #kafaka服务地址 bootstrap_servers => "xx:9092" #订阅的topic名 topics => ["logTopic"] } } output { redis { host => "127.0.0.1" port => 6379 key => "test" # key的类型,list或者channel data_type => "list" } }
3.6、总结
前面讲的主要都是demo级别的,作为理解,然后在升级到实战,这过程需要查看官方文档
四、logstash集成es,自定义模版字段
4.1、准备工作
kafka,Elaseicsearch 和 logstash,es和logstash版本尽量保持一致。
4.2、logstash配置
这三个中,其他两个比较好处理,所以先解决logstash,直接放配置文件
input { kafka { #kafaka服务地址 bootstrap_servers => "xxx:9092" topics => ["logTopic"] } } filter { mutate{ # 将message内容用|切割 split => ["message","|"] # 添加字段 add_field => {"moduleName" => "%{[message][0]}"} add_field => {"value" => "%{[message][1]}"} add_field => {"url" => "%{[message][2]}"} add_field => {"param" => "%{[message][3]}"} add_field => {"className" => "%{[message][4]}"} add_field => {"methodName" => "%{[message][5]}"} add_field => {"returnResult" => "%{[message][6]}"} add_field => {"elapsedTime" => "%{[message][7]}"} add_field => {"errorMsg" => "%{[message][8]}"} remove_field => "message" remove_field => "@version" } } output { elasticsearch { hosts => ["xxx:9200"] # 在es的索引 index => "log_record-%{+YYYY-MM}" } #打印,调试的时候用,默认为rubydebug stdout { codec => json } }
注意点就是message的内容,将内容推到fafka时就规定好格式,然后使用分隔符进行切割,赋值到其他字段上。 也可以使用其他方式,比如json解析的方式,logstash支持json解析赋值,这里我使用了分隔符切割的方式。
4.3、kafka内容
在推送到kafka上就规定格式使用 | 来分割,因为到logstash订阅是我使用|来分割。
4.4、elasticsearch 模版配置
是用logstash传输到es会使用默认的一套模版,具体是用哪个模版是更具索引名来确定的。
- 查看模版信息
#GET请求方式
es地址:9200/_template/logstash
- 添加新的模版
注意这里的索引名称和logstash在es创建的索引名要有联系才行。注意我是用的是log_record*
# PUT请求方式 , log_record 为模版名称 http://123.56.97.130:9200/_template/log_record
{ "template": "log_record*", "order": 1, "version": 1, "index_patterns": [ "*" ], "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "@timestamp": { "type": "date", "format": "dateOptionalTime", "doc_values": true }, "className": { "index": false, "type": "text" }, "createTime": { "format": "strict_date_optional_time||epoch_millis", "type": "date", "index": "false" }, "elapsedTime": { "type": "long", "index": true }, "methodName": { "index": false, "type": "text" }, "moduleName": { "index": false, "type": "text" }, "param": { "search_analyzer": "ik_smart", "analyzer": "ik_max_word", "store": true, "type": "text", "index": "true", "fields": { "keyword": { "type": "keyword" } } }, "returnResult": { "type": "text", "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "index": true }, "url": { "type": "text", "index": false }, "value": { "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "store": true, "type": "text", "index": true }, "errorMsg": { "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "store": true, "type": "text", "index": true } } } }
配完模版后,就可以测试了
五、总结
logstash对接其他组件是非常方便的,但灵活使用就稍微有些麻烦,因为需要根据场景定制化一些字段或者过滤一些数据这才是难点。这次的案例也只是我收集日志的一次分享,希望能帮助到大家!
标签:index,Logstash,学习,add,使用,output,message,type,logstash From: https://www.cnblogs.com/Jeffrey1172417122/p/16853761.html