首页 > 其他分享 >logstash详解

logstash详解

时间:2024-10-16 19:32:18浏览次数:1  
标签:7.17 hs Logstash 详解 root logstash es

logstash详解

文章目录

Logstash

官网

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的存储库中。

官网



核心概念

Pipeline

  • 包含了input—filter—output三个阶段的处理流程
  • 插件生命周期管理
  • 队列管理

Logstash Event

  • 数据在内部流转时的具体表现形式。数据在input 阶段被转换为Event,在 output被转化成目标格式数据
  • Event 其实是一个Java Object,在配置文件中,可以对Event 的属性进行增删改查

Codec (Code / Decode)

  • 编码与解码。将原始数据decode成Event; 将Event encode成目标数据

在这里插入图片描述



数据传输原理

  1. 数据采集与输入

    Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据。

  2. 实时解析和数据转换

    通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中。

  3. 存储与数据导出

    Logstash提供多种输出选择,可以将数据发送到指定的地方。

Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件,input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理。

在这里插入图片描述



安装

需要jdk环境

官方文档

下载并解压logstash

下载地址 , 选择7.17.3版本

#下载Logstash
#windows
https://artifacts.elastic.co/downloads/logstash/logstash-7.17.3-windows-x86_64.zip
#linux
https://artifacts.elastic.co/downloads/logstash/logstash-7.17.3-linux-x86_64.tar.gz
  • 1
  • 2
  • 3
  • 4
  • 5



测试:运行最基本的logstash管道

[root@hs-es-node1 ~]# cd logstash-7.17.3
#-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试    stdin { }表示的是标准控制台
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
  • 1
  • 2
  • 3



Logstash配置文件结构

参考:https://www.elastic.co/guide/en/logstash/7.17/configuration.html

Logstash的管道配置文件对每种类型的插件都提供了一个单独的配置部分,用于处理管道事件。

[root@hs-es-node1 logstash-7.17.3]# vim logstash-demo.conf
input {
  stdin { }			# 控制台输入
}
filter {
  grok {		# grok 正则匹配解析
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  date {		# 日期解析 
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}
output {
  elasticsearch { hosts => ["localhost:9200"]}  			# 输出到本机的es中
  stdout { codec => rubydebug }			# stdout表示控制台,使用 rubydebug 输出详细格式
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

每个配置部分可以包含一个或多个插件。例如,指定多个filter插件,Logstash会按照它们在配置文件中出现的顺序进行处理。

#运行 通过-f指定配置文件  也可以加上-t参数 检查配置文件是否存在语法错误
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f logstash-demo.conf -t
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f logstash-demo.conf
  • 1
  • 2
  • 3




在这里插入图片描述

在这里插入图片描述



output部分做以下修改

output {
  elasticsearch { 
  	hosts => ["localhost:9200"]
  	index => "hs_test"		# 指定数据写入到哪一个index索引中
  	user => "elastic"
    password => "123456"
  }  			
  stdout { codec => rubydebug }			
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述

在这里插入图片描述



Input Plugins

官方文档

这里列举出来很多的插件,直接参考官方文档的案例即可

在这里插入图片描述



一个 Pipeline可以有多个input插件

  • Stdin / File

  • Beats / Log4J /Elasticsearch / JDBC / Kafka /Rabbitmq /Redis

  • JMX/ HTTP / Websocket / UDP / TCP

  • Google Cloud Storage / S3

  • Github / Twitter



Output Plugins

官方文档

在这里插入图片描述

将Event发送到特定的目的地,是 Pipeline 的最后一个阶段。

常见 Output Plugins:

  • Elasticsearch
  • Email / Pageduty
  • Influxdb / Kafka / Mongodb / Opentsdb / Zabbix
  • Http / TCP / Websocket



Codec Plugins

官方文档

将原始数据decode成Event; 将Event encode成目标数据

内置的Codec Plugins:

  • Line / Multiline
  • JSON / Avro / Cef (ArcSight Common Event Format)
  • Dots / Rubydebug



Codec Plugin测试

# 输入的一行line的情况
# 如下图一所示
bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
# 使用json进行编码,如果我还是就输出一个hushang字符串就会警告 如下图二所示
bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

在这里插入图片描述



Codec Plugin —— Multiline 多行输入的情况

设置参数:

  • pattern: 设置行匹配的正则表达式 正则表达式基本语法

  • what : 如果匹配成功,那么匹配行属于上一个事件还是下一个事件

    • previous / next
  • negate : 是否对pattern结果取反

    • true / false
# 多行数据,异常。这些几行数据应该是要当做一条数据来看的,不能把他们拆分成多条数据。
Exception in thread "main" java.lang.NullPointerException
        at com.example.myproject.Book.getTitle(Book.java:16)
        at com.example.myproject.Author.getBookTitles(Author.java:25)
        at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
  • 1
  • 2
  • 3
  • 4
  • 5
[root@hs-es-node1 logstash-7.17.3]# vim config/multiline-exception.conf
input {
  stdin {
    codec => multiline {
      pattern => "^\s"			# 匹配空格   \s 是匹配所有空白符,包括换行     \S 非空白符,不包括换行。
      what => "previous"		# 如果匹配成功,匹配行属于上一个事件
    }
  }
}
filter {}
output {
  stdout { codec => rubydebug }		# 控制台输出
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
#执行管道
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f config/multiline-exception.conf
  • 1
  • 2

在这里插入图片描述



Filter Plugins

官方文档

Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换

  • date: 日期解析

  • dissect: 分割符解析

  • grok: 正则匹配解析

  • mutate: 对字段做各种操作

    • convert : 类型转换
    • gsub : 字符串替换
    • split /join /merge: 字符串切割,数组合并字符串,数组合并数组
    • rename: 字段重命名
    • update / replace: 字段内容更新替换
    • remove_field: 字段删除
  • ruby: 利用Ruby 代码来动态修改Event



Logstash Queue

设置logstash处理的数据是否要进行持久化,默认是内存机制。

  • In Memory Queue

    进程Crash,机器宕机,都会引起数据的丢失

  • Persistent Queue

    机器宕机,数据也不会丢失; 数据保证会被消费; 可以替代 Kafka等消息队列缓冲区的作用

[root@hs-es-node1 logstash-7.17.3]# vim config/pipelines.yml
queue.type: persisted (默认是memory)
queue.max_bytes: 4gb
  • 1
  • 2
  • 3



案例 Logstash导入csv数据到ES

测试数据集下载:https://grouplens.org/datasets/movielens/

title这一列中括号中保存的是年份,genres这一列可以转换为数组

在这里插入图片描述

准备logstash-movie.conf配置文件

input file插件详情filter csv插件详情filter mutate插件详情output elasticsearch插件

input {
	file {
		path => "/root/movies.csv"			# 必选项,指定文件路径
		start_position => "beginning"		# 非必选项
		sincedb_path => "/dev/null"			# 非必选项
	}
}
filter {
	csv {
		separator => ","					# 非必选项 定义列分隔符值。如果未指定,则默认为逗号
		columns => ["id","content","genre"]	# 定义一个列名列表。如果没有配置列,或者没有指定足够的列,默认的列名是“column1”、“column2”等。
	}
	mutate {
		split => { "genre" => "|" }			# 对genre列使用 | 分割
		remove_field => ["path", "host","@timestamp","message"]		# 移除一些不需要的字段
	}
	mutate {
		split => { "content" => "("}  			# 使用 ( 进行分割
		add_field => { "title" => "%{[content][0]}"}		# 添加两个字段  取值分别为分割后数组对应位置的值
    	add_field => { "year" => "%{[content][1]}"}
	}
	mutate {
		convert => {
			"year"  => "integer"
		}
		strip => ["year"] 		# 去除头尾空格
		remove_field => ["path", "host","@timestamp","message","content"]  # 移除不需要的字段
	}
}
output {
	elasticsearch {
        hosts => "http://localhost:9200"
        index => "movies"
        document_id => "%{id}"		# 指定文档id, %{}取字段中的值  id为字段名
        user => "elastic"
     	password => "123456"
    }
 	stdout {}		# 控制台也输出看看
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44



运行logstash

# 检查配置文件是否存在语法错误 如果没问题会输出  Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -t -f logstash-movie.conf
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f logstash-movie.conf
  • 1
  • 2
  • 3
  • -t 或 --config.test_and_exit : 检查配置文件的有效语法,然后退出。
  • –config.reload.automatic: 启用自动配置加载,就是配置文件更改后会自动加载更改后的配置

运行结果,因为我在最后添加了stdout {}同时在控制台输出,输入结果如下图所示

在这里插入图片描述



案例 同步数据库数据到Elasticsearch

需求: 将数据库中的数据同步到ES,借助ES的全文搜索,提高搜索速度

  • 需要把新增用户信息同步到Elasticsearch中
  • 用户信息Update 后,需要能被更新到Elasticsearch
  • 支持增量更新
  • 用户注销后,不能被ES所搜索到



实现思路

  • 基于canal同步数据(全量更新)

  • 借助JDBC Input Plugin将数据从数据库读到Logstash (增量更新) 官方文档

    • 需要自己提供所需的 JDBC Driver;

    • JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler,其扩展了 Cron,使用 Cron 的语法可以完成任务的触发;

    • JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新;



JDBC Input Plugin实现步骤

拷贝jdbc依赖到logstash-7.17.3/drivers目录下

[root@hs-es-node1 logstash-7.17.3]# mkdir drivers
[root@hs-es-node1 logstash-7.17.3]# ll ./drivers/
total 2088
-rw-r--r--. 1 root root 2134905 Aug 16 13:16 mysql-connector-java-8.0.15.jar
  • 1
  • 2
  • 3
  • 4



准备mysql-demo.conf配置文件

[root@hs-es-node1 logstash-7.17.3]# vim mysql-demo.conf
  • 1
input {
    jdbc {
    	jdbc_driver_library => "/root/logstash-7.17.3/drivers/mysql-connector-java-8.0.15.jar"
    	jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://192.168.5.49:3306/test?serverTimezone=UTC&useSSL=false&useUnicode=true"
        jdbc_user => "root"
        jdbc_password => "1234"
        #启用追踪,如果为true,则需要指定tracking_column
    	use_column_value => true
    	#指定追踪的字段,
    	tracking_column => "update_time"
        #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
    	tracking_column_type => "timestamp"
        # 记录最后一次运行的结果
    	record_last_run => true
    	# 上面运行结果的保存位置  这个文件中保存着最后一次查询,update_time字段的值,下一次查询就可以用这个值来做where条件
    	last_run_metadata_path => "jdbc-position.txt"
    <span class="token comment"># 因为我上面使用的是timestamp追踪字段类型,sql_last_value这里开始的值是1970-01-01 00:00:00</span>
    <span class="token comment"># 之后sql_last_value 的值就是最后一次查询 追踪字段的值,这个值保存在上方定义的 jdbc-position.txt文件中</span>
    statement <span class="token operator">=</span><span class="token operator">&gt;</span> <span class="token string">"SELECT * from sys_user where update_time &gt;:sql_last_value"</span>
    <span class="token comment"># cron 定时  每秒执行一次</span>
    schedule <span class="token operator">=</span><span class="token operator">&gt;</span> <span class="token string">" * * * * * *"</span>
<span class="token punctuation">}</span>

}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "user"
document_id => "%{id}" # 指定文档id, %{}取字段中的值 id为字段名
user => "elastic"
password => "123456"
}
# 控制台也输出看看
stdout {
codec => rubydebug
}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40



运行logstash

# -t 检查配置文件是否存在语法错误
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -t -f mysql-demo.conf 
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f mysql-demo.conf 
  • 1
  • 2
  • 3

因为我数据表里面实现就有两条数据,所以这里就直接查询出来了

在这里插入图片描述

之后每隔1秒执行一次查询语句,即使重启之后这个where查询条件的时间还是保存了,因为我们上方配置文件中配置了 记录最后一次运行的结果

在这里插入图片描述

数据表中事先就存在两条数据

在这里插入图片描述

在这里插入图片描述



测试新增

INSERT INTO `test`.`sys_user`(`id`, `username`, `password`, `name`, `description`, `status`, `create_time`, `update_time`) VALUES (1819212905845895170, 'deimkf_test', '$2a$10$jyQQSxxVp', 'test', 'test', 1, NOW(), NOW());
  • 1

可以看到,这里之后的查询时间就使用的最新的

在这里插入图片描述



测试修改

UPDATE `test`.`sys_user` SET `username` = 'deimkf_update',  `update_time` = now() WHERE `id` = 1819212905845895170;
  • 1

在这里插入图片描述
在这里插入图片描述



测试逻辑删除

# 修改status字段的值
UPDATE `test`.`sys_user` SET `status` = 0,  `update_time` = now() WHERE `id` = 1819212905845895170;
  • 1
  • 2

在这里插入图片描述

此时,直接ES查询是还能查询到数据的

在这里插入图片描述

#ES中查询
# 创建 alias,只显示没有被标记 deleted的用户,也就是status=1的用户
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "user",
        "alias": "sys_user",
        "filter": {			# 这里就过滤掉了status为0的数据
          "term": {
            "status": 1
          }
        }
      }
    }
  ]
}
# 根据别名查询
GET /sys_user/_search
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这里插入图片描述

原文链接:https://blog.csdn.net/qq_44027353/article/details/141267038

标签:7.17,hs,Logstash,详解,root,logstash,es
From: https://www.cnblogs.com/sunny3158/p/18470610

相关文章

  • Sharding-JDBC标准模式详解
    Sharding-JDBC标准模式详解一.为什么要使用标准模式?Sharding-JDBC的标准模式就配置而言比inline模式繁琐多了,那为什么要使用标准模式呢Sharding-JDBC作为ApacheShardingSphere生态中的一款轻量级Java框架,提供了数据分片、读写分离、分布式事务和数据库治理等核心功......
  • 【Golang】Go 语言中的 time 包详解:全面掌握时间处理与应用
    在Go语言中,time包提供了强大的时间处理功能,适用于各种场景:获取当前时间、格式化和解析时间、计算时间间隔、设置定时器、处理超时等。在开发过程中,熟练掌握time包能够帮助我们轻松处理时间相关的操作,尤其是定时任务、超时控制等非常常见的功能。文章目录一、`time.......
  • 强大灵活的文件上传库:FilePond 详解
    文件上传是Web开发中常见的功能,尤其是对于图片、视频、文档等大文件的处理,如何既保证用户体验,又兼顾安全和性能,是每位开发者关心的问题。在这样的背景下,FilePond作为一款灵活强大的文件上传库,逐渐在前端开发者中脱颖而出。FilePond提供了优雅的用户界面和全面的功能,同时兼容多......
  • Vulhub Bob: 1.0.1靶机详解
    项目地址https://download.vulnhub.com/bob/Bob_v1.0.1.ova实验过程开启靶机虚拟机使用nmap进行主机发现,获取靶机IP地址nmap192.168.47.1-254根据对比可知Bob:1.0.1的一个ip地址为192.168.47.173扫描Bob:1.0.1的操作系统,端口及对应服务nmap-A-p-192.168.......
  • 【数据结构】时间、空间复杂度详解
    大家有没有遇到过,为什么有些程序跑得飞快,而有些程序却慢得让人抓狂?我们可能都是这样认为的:他写的程序效率高等等,确实如此。但这背后隐藏着两个重要的概念:时间复杂度和空间复杂度。它们就像程序的“效率指标”,帮助我们评估程序的性能。一、时间复杂度:衡量速度的尺子简单来说,时......
  • (接上篇问题回答)OWASP Top 10 漏洞详解:基础知识、面试常问问题与实际应用
    1.SQL注入面试常见问题什么是SQL注入? SQL注入是一种网络安全漏洞,攻击者通过向SQL查询插入恶意代码,来干扰应用程序的数据库查询,导致未授权的数据访问或数据操纵。如何防止SQL注入? 防止SQL注入的方法包括:使用预编译的SQL语句(PreparedStatements)。使用ORM工具。严格验证和......
  • [整理]C#反射(Reflection)详解
    [整理]C#反射(Reflection)详解本人理解:装配件:Assembly(程序集)晚绑定:后期绑定MSDN:反射(C#编程指南)-----------------原文如下--------1、什么是反射2、命名空间与装配件的关系3、运行期得到类型信息有什么用4、如何使用反射获取类型5、如何根据类型来动态创建对象6、......
  • 前端新手教程:HTML、CSS 和 JavaScript 全面详解及实用案例
    一、引言在当今数字化的时代,前端开发扮演着至关重要的角色,它决定了用户与网页和应用程序交互的体验。HTML、CSS和JavaScript作为前端开发的核心技术,分别负责网页的结构、样式和交互。本教程将为前端新手全面深入地介绍HTML、CSS和JavaScript的知识点,并通过实用案例帮助......
  • Nuxt.js 应用中的 modules:done 事件钩子详解
    title:Nuxt.js应用中的modules:done事件钩子详解date:2024/10/16updated:2024/10/16author:cmdragonexcerpt:modules:done是Nuxt.js中一个重要的生命周期钩子,在Nuxt应用初始化期间触发。该钩子允许开发者在用户定义的模块安装完成后执行特定操作,如初始化后续配......
  • 《纪元1800》遭遇dll丢失问题无法启动:msvcr71.dll丢失详解与定制化解决方案
    《纪元1800》是一款非常受欢迎的城市建设和经济策略游戏,但有时玩家可能会遇到msvcr71.dll丢失的问题,导致游戏无法启动。msvcr71.dll是MicrosoftVisualC++运行库的一部分,负责支持许多应用程序的运行。以下是对msvcr71.dll丢失问题的详细解释及定制化解决方案。问题原......