首页 > 其他分享 >Logstash 学习使用

Logstash 学习使用

时间:2022-11-03 11:14:05浏览次数:66  
标签:index Logstash 学习 add 使用 output message type logstash

一、概要

1.1、官网地址

https://www.elastic.co/cn/logstash/

1.2、介绍

Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。也就是一个采集-------> 过滤------> 输出的过程,从哪里采集,采集后到哪里去,中间要不要过滤掉一下东西。

1.3、input输入

input 就是Logstash的入口,数据首先需要经过这一步。它支持采集很多输入选择,比如syslog、redis、file、es、exec、log4j、jdbc、kafka等。

1.4、filter过滤

filter是个负责筛选的部分,数据从源传输到输出的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。它支持很多过滤库,如下:

 

 

 

.5、输出

Elasticsearch 是首选输出方向,但也支持很多选择(与输入差不多),可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

 

1.6、应用场景

Logstash最常用于ELK(elasticsearch + logstash + kibane)中作为日志收集器使用。可同步mysql数据到es等。这里我的案例是订阅kafka到es的过称。

二、安装

2.1、下载地址

Download Logstash Free | Get Started Now | Elastic​www.elastic.co/cn/downloads/logstash

2.2、启动

logstash是基于JDK的,所以必须先安装Jdk环境

在安装目录下执行命令./bin/logstash,但一般需要指定配置文件,例如:

nohup ./bin/logstash -f conf.conf &

 

 

 

logstash参数配置

input配置:

file:读取文件

input {
file{
path => ["/var/log/*.log","/var/log/message"]
type => "system"
start_position => "beginning"
}
}

start_position 代表logstash初次采集文件内容的位置,。取值:beginning或者end。end代表每次都是从文件尾部开始。
start_position:logstash从什么位置读取文件数据,默认是结束的位置,也就是说logstash会以类似tail -f的形式运行。
如果需要导入原始数据,需要把这个设定为:"beginnning",logstash就从头开始读取.

stdin:标准输入

input {
stdin {
add_filed =>{"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}

input {
stdin {
type => "web"
}
}
filter{
if[type] == "web"{
gork{
match => ["message",%{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca{
nagios_status => "1"
}
}else {
elasticsearch {
}
}
}

filter配置:

data:时间处理

%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删除这个字段保留自己的字段,而是应该用filter/date转换后删除自己的字段.

filter {
grok {
match => ["message","%{HTTPDATE:logdate}"]
}
date {
match => ["logstash","dd/MMM/yyyy:HH:mm:ss Z"]
}
}
时区偏移量使用Z.

output配置:

elasticsearch:

output {
elasticsearch {
hosts => ["192.168.0.2:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
flush_size => 20000
idle_flush_time => 10
sniffing => true
template_overwrite => true
}
}

?logstash在有多个conf文件的情况下,进入es的数据会重复,几个conf数据就会重复几次.
!output段顺序执行,没有对日志type进行判断的各插件配置都会全部执行一次.

output {
if [type] == "nginxaccess" {
elasticsearch { }
}
}

email:发送邮件

output {
email {
to => "[email protected],[email protected]"
cc => "[email protected]"
via => "smtp"
subject => "Warning: %{title}"
options => {
smtpIporHost => "localhost",
port => 25,
domain => 'localhost.localdomain',
userName => nil,
password => nil,
authenticationType => nil, # (plain, login and cram_md5)
starttls => true
}
htmlbody => ""
body => ""
attachments => ["/path/to/filename"]
}
}

注意:option参数在logstash2.0以后已经被移除.

output {
email {
port => "25"
address => "smtp.126.com"
username => "[email protected]"
password => ""
authentication => "plain"
use_tls => true
from => "[email protected]"
subject => "Warning: %{title}"
to => "[email protected]"
via => "smtp"
body => "%{message}"
}
}

file:保存成文件

output {
file {
path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz"
message_format => "%{message}"
gzip => true
}
}

  

三、简单配置使用

3.1、简单配置(控制台输出,只要用于调试)

创建一个test.conf的文件

input{ 
    stdin {}
} 
output { 
    stdout {}
}

  启动

./bin/logstash -f ./test.conf

  启动后,在控制台输入什么就会发现它message也输出内容,其他字段是它模版自带的

# 输入
sfghfghfgh
​
# 输出
{
       "message" => "sfghfghfgh",
          "host" => "iZ2ze7b12m7993dl7fyi22Z",
    "@timestamp" => 2021-08-11T03:07:34.719Z,
      "@version" => "1"
}

  

3.2、简单配置添加过滤器

在刚才的文件里加入一个过滤器,输入字母的小写转化为大写

我的心得:修改数据后不能使用新的字段接收,是重新赋值覆盖旧的数据而已。
input{ 
    stdin {}
} 
filter {
   mutate {
         uppercase => [ "message" ]
       
    }
}
output { 
    stdout { }
}
# 输入
asdf
​
# 输出
{
       "message" => "ASDF",
          "host" => "iZ2ze7b12m7993dl7fyi22Z",
    "@timestamp" => 2021-08-11T03:07:34.719Z,
      "@version" => "1"
}

  

3.3、读取文件

input{ 
    file {
      # 收集的文件,多个用,隔开,用绝对路径
      path => [ "/data/test/*.log"]
      start_position => "end"
  }
} 
filter {
   mutate {
          add_field => { "add_msg" => "%{[message]}" }
    }
}
output { 
    stdout { }
}

  start_position 代表logstash初次采集文件内容的位置,。取值:beginning或者end。end代表每次都是从文件尾部开始。add_field为添加输出字段

如果想输出到文件里,那么就是

output {
  file {
      path =>  "/data/test2/test.log"
      codec => line { format => "格式化后数据: %{message}"}
  }
}

  这是一个简单的一个输出,codec 定义输出内容格式,默认为json_lines(已json格式输出)。

3.4、从文件输出到es

input{ 
    file {
      # 收集的文件,多个用,隔开,用绝对路径
      path => [ "/data/test/*.log"]
      start_position => "end"
  }
} 
output {
   elasticsearch {
     hosts => ["xxx:9200"]
     index => "log-%{+YYYY-MM}"
   }
}

  

3.5、与Redis、Kafka集成

 
input {
  kafka {
    #kafaka服务地址
    bootstrap_servers => "xx:9092"
    #订阅的topic名
    topics => ["logTopic"]
  }
}
output {
  redis {
    host => "127.0.0.1"
    port => 6379
    key => "test"
    # key的类型,list或者channel
    data_type => "list"
  }
}

  

3.6、总结

前面讲的主要都是demo级别的,作为理解,然后在升级到实战,这过程需要查看官方文档

四、logstash集成es,自定义模版字段

4.1、准备工作

kafka,Elaseicsearch 和 logstash,es和logstash版本尽量保持一致。

4.2、logstash配置

这三个中,其他两个比较好处理,所以先解决logstash,直接放配置文件
 
input {
  kafka {
    #kafaka服务地址
    bootstrap_servers => "xxx:9092"
    topics => ["logTopic"]
  }
}
​
filter {
  mutate{
        # 将message内容用|切割
        split => ["message","|"]    
        # 添加字段
        add_field => {"moduleName" => "%{[message][0]}"}
        add_field => {"value" => "%{[message][1]}"} 
        add_field => {"url" => "%{[message][2]}"}
        add_field => {"param" => "%{[message][3]}"}
        add_field => {"className" => "%{[message][4]}"}
        add_field => {"methodName" => "%{[message][5]}"}
        add_field => {"returnResult" => "%{[message][6]}"}
        add_field => {"elapsedTime" => "%{[message][7]}"}
        add_field => {"errorMsg" => "%{[message][8]}"}
        
        
        remove_field => "message"
        remove_field => "@version"
   }
}
​
output {
   elasticsearch {
     hosts => ["xxx:9200"]
     # 在es的索引
     index => "log_record-%{+YYYY-MM}"
   }
  #打印,调试的时候用,默认为rubydebug
  stdout { codec => json }
}

  

注意点就是message的内容,将内容推到fafka时就规定好格式,然后使用分隔符进行切割,赋值到其他字段上。 也可以使用其他方式,比如json解析的方式,logstash支持json解析赋值,这里我使用了分隔符切割的方式。

 

4.3、kafka内容

在推送到kafka上就规定格式使用 | 来分割,因为到logstash订阅是我使用|来分割。

 

4.4、elasticsearch 模版配置

是用logstash传输到es会使用默认的一套模版,具体是用哪个模版是更具索引名来确定的。

  • 查看模版信息
#GET请求方式  
es地址:9200/_template/logstash
  • 添加新的模版
注意这里的索引名称和logstash在es创建的索引名要有联系才行。注意我是用的是log_record*
 # PUT请求方式 , log_record 为模版名称
 http://123.56.97.130:9200/_template/log_record

{
    "template": "log_record*",
    "order": 1,
    "version": 1,
    "index_patterns": [
        "*"
    ],
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "@timestamp": {
                "type": "date",
                "format": "dateOptionalTime",
                "doc_values": true
            },
            "className": {
                "index": false,
                "type": "text"
            },
            "createTime": {
                "format": "strict_date_optional_time||epoch_millis",
                "type": "date",
                "index": "false"
            },
            "elapsedTime": {
                "type": "long",
                "index": true
            },
            "methodName": {
                "index": false,
                "type": "text"
            },
            "moduleName": {
                "index": false,
                "type": "text"
            },
            "param": {
                "search_analyzer": "ik_smart",
                "analyzer": "ik_max_word",
                "store": true,
                "type": "text",
                "index": "true",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    }
                }
            },
            "returnResult": {
                "type": "text",
                "search_analyzer": "ik_max_word",
                "analyzer": "ik_max_word",
                "index": true
            },
            "url": {
                "type": "text",
                "index": false
            },
            "value": {
                "search_analyzer": "ik_max_word",
                "analyzer": "ik_max_word",
                "store": true,
                "type": "text",
                "index": true
            },
            "errorMsg": {
                "search_analyzer": "ik_max_word",
                "analyzer": "ik_max_word",
                "store": true,
                "type": "text",
                "index": true
            }
        }
    }
}

  

配完模版后,就可以测试了

 

五、总结

logstash对接其他组件是非常方便的,但灵活使用就稍微有些麻烦,因为需要根据场景定制化一些字段或者过滤一些数据这才是难点。这次的案例也只是我收集日志的一次分享,希望能帮助到大家!

 

标签:index,Logstash,学习,add,使用,output,message,type,logstash
From: https://www.cnblogs.com/Jeffrey1172417122/p/16853761.html

相关文章

  • js库收集任务 个人收集使用
    具有库分类:https://www.cnblogs.com/suanyunyan/p/16144405.html常用js库汇总:http://wiki.i-fanr.com/2021/04/01/%E5%B8%B8%E7%94%A8js%E5%BA%93%E6%B1%87%E6%80%BB/前......
  • Django生成静态页面和使用缓存
       生成并使用静态页面importosfromdjango.shortcutsimportrenderfromdjango.template.loaderimportrender_to_stringdefmy_view(request):co......
  • stream的具体使用
    将字符串12,45,89转成List/***将字符串12,45,89转成List<Long>*/@Testpublicvoidtest2(){Stringstr="fghj,48,drftyguhji,,";List<Long>ids......
  • ECharts学习笔记
    目录ECharts学习笔记原文链接ECharts配置语法第一步:创建HTML页面第二步:为ECharts准备一个具备高宽的DOM容器第三步:设置配置信息图形示例饼图柱状图下钻柱状图E......
  • 学习vue3(五)(插槽slot,Teleport传送组件,keep-alive缓存组件,依赖注入Provide / Inject)
    插槽slot插槽就是子组件中的提供给父组件使用的一个占位符,用<slot></slot>表示,父组件可以在这个占位符中填充任何模板代码,如HTML、组件等,填充的内容会替换子组件的<slot......
  • 使用深度强化学习改进POMDP
    论文提出一种ADRQN架构来增强在部分可观测领域的学习表现,架构的特点在于同时考虑动作和观测作为模型的输入。如下图中的模型所示,我们的动作和观测在经过相关的维度变换之......
  • springboot 线程池的使用
    线程池可以用于解决单线程干某件事情比较慢的问题AsyncConfigurer:通过实现AsyncConfigurer自定义线程池,包含异常处理实现AsyncConfigurer接口对异常线程池更加细粒度的控制......
  • cloud stream 使用案例
    实现生产者发送消息,消费者接收消息的demo在你的项目中加入<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-ra......
  • java 简单使用线程池
    定义一个ThreadPoolExecutorprivatefinalBlockingQueue<Runnable>taskQueue=newLinkedBlockingDeque<>();privatefinalThreadPoolExecutorexecutor=newThreadPo......
  • springboot javax.servlet.Filter使用
    请求拦截器优点:1、拦截非法请求重定向2、验证用户token下面是demo程序,有问题的可以在评论区留言@WebFilter(filterName="authenticationFilter",urlPatterns={"/user/*......