logstash 生产开发总结汇总
-
本文主要讲使用 Logstash 生产开发操作、遇到问题及处理
-
时间:20230424
-
logstash版本:logstash7.8.1
一、基础开发
简单的启动脚本
#!/bin/sh
pjname=test1
conf_file=../conf/${pjname}.conf
data_path=../path/${pjname}
log_file=../log/${pjname}.log
rm -rf ${data_path}
rm -rf ${log_file}
nohup /opt/app/logstash/bin/logstash -f ${conf_file} --path.data=${data_path} >> ${log_file} &
字段过滤
- drop
if '"project":"workWeiXin"' not in [message] {
drop {}
}
解析Json嵌套
- ruby
filter {
ruby {
path => "/home/streaming/kafkaToAds/wsy/kafka2Oracle_cover.rb"
script_params => {"message" => "%{message}"}
remove_field => ["message"]
}
}
- kafka2Oracle_cover.rb
def register(params)
@message = params["message"]
end
def filter(event)
require 'json'
txt = event.get('message')
begin
obj1 = JSON.parse(txt)
txt2 = obj1["CONTANT"]
type1 = obj1['MSG_TYPE']
type2 = obj1['MSG_SUBTYPE']
ensure
if type1 = 'PROD_RESERV_ORDER' and type1 ! = nil
if type2 = 'ADD_ORDER' and type2 ! = nil
txt2.each do |k,v|
if v != ''
event.set(k,v)
end
end
event.set("MSG_TYPR",type1)
event.set("MSG_SUBTYPE",type2)
return [event]
end
end
return []
end
end
时间转换类
1、@timestamp 时区相差8小时问题
- 下面重定向也可以将业务时间替换@timestamp
ruby {
code => "event.set('timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] #匹配timestamp字段
target => "@timestamp" #将匹配到的数据写到@timestamp字段中
}
template模板【mapping】
- zhfx_recent.json
curl -u elastic:123456 -H "Content-Type: application/json" -XPUT node01:7029/_template/recent_tem -d '
{
"index_patterns": ["recent*"],
"order":0,
"settings":{
"number_of_shards":1,
"number_of_replicas":15
},
"mappings":{
"dynamic":"strict",
"_source":{"enabled":true},
"properties":{
"tradedate":{"type":"integer"},
"recent":{"type":"keyword"},
"end_date":{"type":"keyword"}
}
}
}
'
二、程序开发及部署
1、conf配置文件【核心】
我将它拆分成三个部分来写 input、filter、output
input
- 将文件读完程序结束【实际生产使用,用于读离线csv文件】
input {
stdin{}
}
- 读csv文件
测试使用
input {
file {
path => "/home/es/details_cp.csv"
start_position => beginning
}
}
- 读kafka【生产实时使用】
input {
kafka {
bootstrap_servers => ["node01:9092,node02:9092,node03:9092"] #kafka地址
auto_offset_reset => "latest" # earliest表示从头消费kafka数据,latest表示从当前位置开始消费
group_id => "qw_details_topic_20220427_2357"
consumer_threads => 3 #消费kafka线程数
topics => ["event_topic"]
#优化参数
fetch_max_wait_ms => "1000"
fetch_min_bytes => "1000000"
}
}
filter
- 过滤实时数据
filter {
ruby {
path => "/home/common/wsy/logstash_project/rb/qw_details_kafka2es.rb"
script_params => {"message" => "%{message}"}
remove_field => ["message"]
}
ruby {
code => "event.set('time',Time.at((event.get('time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
}
mutate {
copy => {
"time" => "copytime"
}
}
date {
match => ["copytime","YYYY-MM-dd HH:mm:ss"]
target => "copytime"
}
ruby {
code => "event.set('date',Time.at((event.get('copytime').to_i)).strftime('%Y%m%d'))"
}
mutate {
convert => ["browse_time","integer"]
convert => ["date","integer"]
}
mutate {
remove_field => ["@timestamp","shop_id","open_id","path","shop_name","mobile","union_id","user_type","strict_dynamic_mapping_exception","host","tags","@version","$screen_height","$network_type","$os","$browser_version","$manufacturer","$os_version","$lib_version","$country","$city","$timezone_offset","$ip","title","$screen_width","platform_type","$is_first_day","studio_name","$app_id","$lib","$is_login_id","$province","$model","$latest_scene","$browser","product_name","$latest_share_url_path", "$latest_share_distinct_id", "$share_distinct_id","$latest_share_method","$is_first_time","$latest_share_depth","$scene","$share_url_path","$share_depth","$share_method","$url_path", "$url_query","copytime","$referrer","$title","$url","$app_version","$brand","$app_version","$url"]
}
}
- 过滤离线数据 【以处理qw_details.csv离线文件为例】
filter {
csv {
separator => "|$|"
}
ruby {
code => "event.set('check_in_time',Time.at((event.get('check_in_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
}
ruby {
code => "event.set('check_out_time',Time.at((event.get('check_out_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
}
date {
match => ["date","YYYY-MM-dd HH:mm:ss"]
target => "date"
}
ruby {
code => "event.set('date',Time.at((event.get('date').to_i)).strftime('%Y%m%d'))"
}
mutate {
convert => {"corp_id" => "string"}
convert => {"qw_staff_id" => "string"}
convert => {"studio_id" => "string"}
convert => {"event" => "string"}
convert => {"page_name" => "string"}
convert => {"seq" => "string"}
convert => {"date" => "integer"}
}
mutate {
remove_field => ["message","@timestamp","path","host","@version"]
}
}
output
- 输出到控制台【一般用于测试】
output {
stdout { codec => rubydebug }
}
-
输出到ES
-
离线读csv到ES,每天传参数启动
output { elasticsearch { hosts => ["node01:7920"] index => "qw_details-${date}" user => "elastic" password => "123456" } }
-
实时读kafka到ES,一只运行,需要自动创建索引
output { elasticsearch { hosts => ["node01:7920"] index => "qw_details-%{date}" user => "elastic" password => "123456" } }
-
2、rb过滤处理核心
需求:过滤条件
project = 'workWeiXin'
and
(
(page_name in ('资讯','产品') and event in ('jhsd_resource_detail','$MPShare'))
or (page_name='首页' and event='$MPShare')
or event = '$MPLaunch'
)
- kafka2es_qw_details.rb 【实时kafka数据过滤处理】
def register(params)
@message = params["message"]
end
def filter(event)
require 'json'
txt = event.get('message')
begin
obj1 = JSON.parse(txt)
txt2 = obj1['properties']
type1 = obj1['event']
type2 = obj1['properties']['page_name']
time = obj1['time']
type3 = obj1['project']
ensure
if type3 == 'workWeiXin' and(type1 == '$MPLaunch' and type1 != nil) or
(type1 == '$MPShare' and type2 == '首页' and type2 != nil and type1 != nil) or
( (type1 == 'jhsd_resource_detail' or type1 == '$MPShare' ) and (type2 == '资讯' or type2 == '产品' ) and type2 != nil and type1 != nil )
txt2.each do |k,v|
if v != ''
event.set(k,v)
end
end
event.set('event',type1)
event.set('time',time)
return [event]
end
return []
end
end
3、积累常见问题
1、jps不出来es
问题样式:
bash: jps:command not found ...
1.要知道jps跟jdk有关,也就是跟自己安装的java相关
2.知道自己java的安装路径 whereis java
$which java
/usr/bin/java
再找到/usr/bin/java的超链接位置发现还是超链接
$ls -lrt /usr/bin/java
lrwxrwxrwx 1 root root 22 Jul 27 11:43 /usr/bin/java -> /etc/alternatives/java
再来一次,发现最终位置
$ls -lrt /etc/alternatives/java
lrwxrwxrwx 1 root root 35 Jul 27 11:43 /etc/alternatives/java -> /usr/java/jdk1.8.0_111/jre/bin/java
最后的这个jdk位置就是目前用的java的jdk位置
/usr/java/jdk1.8.0_111/(这个是我的,你用你自己的)
在.bashrc里面
加上一句
export JAVA_HOME=你的java安装路径
再生产中发现是系统本身的 我们这里将软连接改成es带的jdk
- 在/etc/profile 中添加java 环境变量,并source /etc/profile
#修改环境变量
vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_172
export PATH=:$PATH:$JAVA_HOME/bin
#刷新
source /etc/profile
#配置软连接
ln -s /opt/jdk1.8.0_172/bin/java java
2、日期转换为时间戳
数据类型 日志里一般都会有时间,格式如“2020-09-04 10:08:08”,怎么转成毫秒呢,格式如“1598609188959”?
date{
match => ["requestTimestamp","YYYY-MM-dd HH:mm:ss"]
target =>"requestTimestamp"
}
ruby{
code => "event.set('requestTimestamp',event.get('requestTimestamp').to_i*1000)"
}
以上是时间格式为“YYYY-MM-dd HH:mm:ss”的情况,那么“YYYY-MM-dd HH:mm:ss SSS"的情况又如何呢?
date{
match => ["requestTimestamp","YYYY-MM-dd HH:mm:ss.SSS"]
target =>"requestTimestamp"
}
ruby{
code => "event.set('requestTimestamp',(event.get('requestTimestamp').to_f.round(3)*1000).to_i)"
}
3、将时间戳转换为日期
ruby {
code => "event.set('check_in_time',Time.at((event.get('check_in_time').to_i/1000)).strftime('%Y-%m-%d %H:%m:%S'))"
}
4、字段切割
- 切割字段 tradedate:20211105 ,取后四位:2021
grok {
match => {"tradedate" => "(?><tradedate_year>(?<=....)(.{4}))"}
}
- 切割字段 tradedate:20211105 ,取后两位位:05
grok {
match => {"tradedate" => "(?><tradedate_year>(?<=......)(.{2}))"}
}
- 切割字段 tradedate:20211105 ,取后前6位位:202111
mutate {
gsub => ["tradedate","(?<=......)(.*)",""]
}
5、logstash 插件离线安装
执行下面命令进行安装。
./bin/logstash-plugin install file:///root/logstash-output-jdbc.zip
6、关于@timestamp 时差相差8小时问题
ruby{
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby{
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
7、shell中截取字段
date=$1
var=$date
echo $date
date=${var:0:6}
echo $date
8、效率问题
先改完这个drop之后,你把这个rb都去掉,全部改成其他标准的filter组件。
你这个场景,要的数据,连万分之一都不到。
要先分析数据的
在过滤条件中添加
if "workWeiXin" not in [message] {
drop {}
}
9、数据有问题需要补当天数据
1、消费kafka获取满足当天条件的数据数据
/opt/app/kafka/bin/kafka-console-consumer.sh --topic qw_details_topic --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning |grep --line-buffered '"project":"workWeiXin"' > wsy.json
/opt/app/kafka/bin/kafka-console-consumer.sh --topic qw_details_topic --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning |grep --line-buffered '16496' >wsy_bs_1.json
2、读消费的文件补数,我数据加了document_id ,直接覆盖补数
input {
file {
path => "/home/es/details_cp.csv"
start_position => beginning
}
}
三、脚本及调度
1、启动脚本
- 测试文件是否正确
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/qw_details_kafka2es.conf --config.test_and_exit
实时启动脚本
需求:实时一张表一个启动脚本,离线3张数据导入一个启动脚本
实时明细表(qw_details)启动脚本
- 明细表(qw_details)【从kafka读】
脚本名称:qw_details_kafka_start.sh
执行:./qw_details_kafka_start.sh
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/qw_details_kafka2es.conf --path.data=/home/common/wsy/logstash_project/path/qw_details_kafka2es > /home/common/wsy/logstash_project/log/qw_details_kafka2es.out 2>&1 &
离线启动脚本
- 明细表(qw_details)、全量汇总表(qw_agg)、折线图-天(qw_line_day)、折线图-昨日时段(qw_line_lastday)【读csv文件】
- 需求:需要做统一传参
脚本名称:qw_csv_start.sh
执行:./qw_csv_start.sh index date
- index
- qw_details
- qw_agg
- qw_line_day
- qw_line_lastday
- date 具体的日期
例如:./qw_csv_start.sh qw_agg 20220428
index=$1
date=$2
export date=$date
filepath="/home/common/wsy/data/${index}.csv"
rm -rf /home/common/wsy/logstash_project/path/${index}_csv/
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/${index}_csv.conf < $filepath --path.data=/home/common/wsy/logstash_project/path/${index}_csv > /home/common/wsy/logstash_project/log/${index}_csv.out 2>&1 &
2、mapping模版脚本
样例:添加索引模版
脚本名称:qw_agg_mapping.json
权限:chmod 755 qw_agg_mapping.json
部署地址:/home/common/wsy/logstash_project/mapping
执行方式:$ ./qw_agg_mapping.json
curl -u elastic:123456 -H "Content-Type: application/json" -XPUT node01:7920/_template/qw_agg_tem -d '
{
"index_patterns" : {"qw_agg*"},
"order" : 1,
"settings" : {
"number_of_shards" : 1,
"number_of_replicas": 1
},
"mappings" : {
"dynamic" : "strict",
"_source" : {"enabled" : true},
"properties" : {
"corp_id": {"type": "keyword"}
}
}
}
'
3、别名调度脚本
方案一
离线表导入需要每天传参调度修改别名
全量汇总表(qw_agg)
折线图-天(qw_line_day)
折线图-昨日时段(qw_line_lastday)
样例:添加和删除别名
删除将 add 改为 remove
脚本名称:qw_details_alias_add.json
部署地址:/home/common/wsy/logstash_project/alias
权限:chmod 755 qw_details_alias_add.json
执行方式:$ ./qw_details_alias_add.json 20220428
date=$1
echo "The date is ${date} "
export date=$date
curl -u elastic:123456 -H "Content-Type: application/json" -XPOST node01:7920/_aliases -d '
{
"actions": [
{
"add": {
"index": "qw_details-'${date}'",
"alias": "qw_details"
}
}
]
}
'
方案二【最终版本】
领导要求将四个别名写成一个脚本,传入索引和日期
脚本名称:qiwei-index.sh
修改方式:qiwei-index.sh mode index date
- mode
- add 「添加别名」
- remove 「删除别名」
- index
- qw_details
- qw_agg
- qw_line_day
- qw_line_lastday
- date 具体的日期
例如: ./qiwei-index.sh remove qw_details 20220428
部署地址:/home/common/wsy/logstash_project/alias
mode=$1
index=$2
date=$3
curl -u elastic:123456 -H "Content-Type: application/json" -XPOST node01:7920/_aliases -d '
{
"actions": [
{
"'${mode}'": {
"index": "‘${index}’-'${date}'",
"alias": "‘${index}’"
}
}
]
}
'
4、索引生命周期脚本
https://www.cnblogs.com/wei325/p/16101258.html
https://blog.csdn.net/wangshuminjava/article/details/106430552
参考案例一
#/bin/bash
#设置定时任务(脚本路径需自行修改)
#crontab -e
#0 1 * * * /bin/bash /opt/data/zabbixclean.sh
#指定日期(7天前)
time=7
#指定IP端口
IP=XXXXXXX
#指定用户名密码
PASSWORD=XXXXXXXXX
#指定日志文件
log=/opt/data/zabbixclean.log
top=`curl -s -XGET "http://$PASSWORD$IP/_cat/indices/?v"|grep uint|wc -l`
echo `date` >>$log
for((time=$time;$time<$top;time++));
do
date=`date -d "$time days ago" +%Y-%m-%d`
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/uint_$date &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/dbl_$date &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/log_$date &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/text_$date &>/dev/null
curl -H "Content-Type:application/json" -XDELETE http://$PASSWORD$IP/str_$date &>/dev/null
if [ $? -eq 0 ];then
echo "The $date index clean completed" >>$log
fi
done
参考案例二
#/bin/bash
#指定日期(7天前)
DATA=`date -d "1 week ago" +%Y%m%d`
echo "开始清理 $DATA 索引"
#当前日期
time=`date`
#删除7天前的日志
curl --user account:pwd -XGET "http://your_ip:9200/_cat/indices/?v"|grep $DATA
if [ $? == 0 ];then
curl --user account:pwd -XDELETE "http://your_ip:9200/*_${DATA}"
echo "于 $time 清理 $DATA 索引!"
fi
这个脚本还是很简单的,我来介绍下:
1、第一句是脚本的开头,这个就不说了
2、首先确定你需要删除哪个时间以前的索引,我这里是删除的一周也就是7天前的,所以我这里DATA取的是当前时间一周以前的时间。
3、下面两句是为了打印,可以写也可以不写
4、查询匹配一周前的这个索引是否存在,这里需要注意添上你es的账号,密码,和部署es的机器ip,如果你没有设置es的账号和密码可以不加。
5、如果这个索引存在,那么就执行删除这个索引的操作,并进行打印,最后输出删除结果
最后你把这个脚本保存好,再设置一个定时脚本,定时脚本设置如下
0 1 * * * sh /data/shscript/ES-index-clear.sh > /data/shscript/log/es-index-clear.log
我这里是每天早上1点执行这个删除脚本,然后将执行的输出结果输出到日志文件中,就行了。
测试一
#/bin/bash
curl -u elastic:123456 -H "Content-Type:application/json" -XDELETE http://node01:7920/qw_details_`date -d "7 days ago" +%Y-%m-%d` &>/dev/null
测试二
脚本名称:Index_lifecycle.sh
#/bin/bash
#指定日期(7天前)
DATA=`date -d "7 day ago" +%Y%m%d`
log=/opt/project/qiWei_project/kafka2ES/log/Index_lifecycle.log
echo "开始清理 $DATA 索引" >>$log
#当前日期
time=`date`
#删除7天前的日志
curl --user elastic:123456 -H "Content-Type:application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA
if [ $? == 0 ];then
curl --user elastic:123456 -H "Content-Type:application/json" -XDELETE http://node01:7920/qw_details-$DATA
echo "于 $time 清理 qw_details-$DATA 索引!" >>$log
fi
生产
#/bin/bash
#当前日期
time=`date`
#指定日期(7天前)
DATA_qw_details=`date -d "7 ady ago" +%Y%m%d`
#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_details
if [ $? == 0 ];then
echo "开始清理 qw_details-$DATA_qw_details 索引"
curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_details
echo "于 $time 清理 $DATA_qw_details 索引!"
fi
DATA_qw_agg=`date -d "7 ady ago" +%Y%m%d`
#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_agg
if [ $? == 0 ];then
echo "开始清理 qw_details-$DATA_qw_agg 索引"
curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_agg
echo "于 $time 清理 $DATA_qw_agg 索引!"
fi
DATA_qw_line_day=`date -d "7 ady ago" +%Y%m%d`
#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_line_day
if [ $? == 0 ];then
echo "开始清理 qw_details-$DATA_qw_line_day 索引"
curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_line_day
echo "于 $time 清理 $DATA_qw_line_day 索引!"
fi
DATA_qw_line_lastday=`date -d "7 ady ago" +%Y%m%d`
#删除7天前的日志
curl -u elastic:123456 -H "Content-Type: application/json" -XGET "http://node01:7920/_cat/indices/?v"|grep $DATA_qw_line_lastday
if [ $? == 0 ];then
echo "开始清理 qw_details-$DATA_qw_line_lastday 索引"
curl -u elastic:123456 -H "Content-Type: application/json" -XDELETE http://node01:7920/qw_details-$DATA_qw_line_lastday
echo "于 $time 清理 $DATA_qw_line_lastday 索引!"
fi
- 执行
0 1 * * * sh /home/common/wsy/logstash_project/shell/Index_lifecycle.sh > /home/common/wsy/logstash_project/log/Index_lifecycle.log
5、调度执行参考
服务器:105.219/common
程序部署目录:
/home/common/wsy/logstash_project
程序启动
- 实时
bin/qw_details_kafka_start.sh
- 离线
bin/qw_csv_start.sh 索引名 日期
例子:
/home/common/wsy/logstash_project/bin/qw_csv_start.sh qw_agg 20220504
别名
实时离线可以统一更改
脚本名称:qiwei-index.sh
修改方式:qiwei-index.sh mode index date
- mode
- add 「添加别名」
- remove 「删除别名」
- index
- qw_details
- qw_agg
- qw_line_day
- qw_line_lastday
- date 具体的日期
例如: ./qiwei-index.sh remove qw_details 20220428
部署地址:/home/common/wsy/logstash_project/alias
6、需求脚本修改
根据需求,还要对上面的脚本进行修改
1、别名,需要一个脚本启动所有的改别名的程序,调度只需要传日期,并且删除生命周期外的别名
2、启动,一个脚本启动所有的离线启动脚本,删除旧索引
原来 qw_csv_start.sh
index=$1
date=$2
export date=$date
filepath="/home/common/wsy/data/${index}.csv"
rm -rf /home/common/wsy/logstash_project/path/${index}_csv/
/home/common/wsy/logstash-7.8.1/bin/logstash -f /home/common/wsy/logstash_project/conf/${index}_csv.conf < $filepath --path.data=/home/common/wsy/logstash_project/path/${index}_csv > /home/common/wsy/logstash_project/log/${index}_csv.out 2>&1 &
更改
传入月时间
qiwei_csv_start.sh 202205
#!/bin/bash
cd /home/common/wsy/logstash_project/bin
date=$1
sh qw_csv_start.sh qw_details $date
sh qw_csv_start.sh qw_agg $date
sh qw_csv_start.sh qw_line $date
sh qw_csv_start.sh qw_line_lastdat $date
添加 document_id
1、重定向去掉,添加 echo $?
2、蒋修改别名和删除历史索引写入一个脚本,统一传参
四、常用查询命令
1、查看索引
curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/_cat/indices?v
2、查看单个索引
curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/索引名/_search
3、查看索引mapping
curl -u elastic:sjjsZS9470_201 -XGET http://10.189.145.201:7920/索引名/_mapping?pretty
4、查看单个索引别名
http://node01:7920/qw_details-20220411/_alias/*
标签:20230424,qw,index,汇总,event,details,date,logstash
From: https://www.cnblogs.com/ChloeAilian/p/17351921.html