首页 > 其他分享 >ELK 7.17.5 集群部署及使用

ELK 7.17.5 集群部署及使用

时间:2022-12-29 12:31:23浏览次数:67  
标签:ELK filebeat 7.17 log kafka nginx 集群 172.16 elasticsearch


文章目录

  • ​​一、ElasticSearch 安装​​
  • ​​1.elasticsearch 单节点安装​​
  • ​​2.elasticsearch 分布式集群安装​​
  • ​​3.elasticsearch 配置身份认证​​
  • ​​二、Elasticsearch cerebro 展示工具安装​​
  • ​​三、Kibana 安装​​
  • ​​四、Filebeat 安装(EFK 架构)​​
  • ​​1.Filebeat 的基础使用:​​
  • ​​2.filebeat 收集 nginx​​
  • ​​2.1.安装 nginx​​
  • ​​2.2.安装 filebeat​​
  • ​​2.2.1.收集 nginx 原生日志,推送给 es​​
  • ​​2.2.2.1.filebeat 收集单个文件​​
  • ​​2.2.2.2.filebeat 收集多个文件​​
  • ​​注意事项:​​
  • ​​2.2.2.3.kibana 展示 nginx 日志​​
  • ​​2.2.2.收集 nginx json 日志,推送给 es​​
  • ​​2.2.3.使用内置模块收集 nginx 日志 -- 不好用,生产不建议使用...​​
  • ​​2.2.4.收集 nginx 指定字段信息,忽略其他​​
  • ​​3.filebeat 收集 tomcat​​
  • ​​3.1.使用内置模块收集 tomcat 日志 -- 不好用,生产不建议使用...​​
  • ​​3.2.收集 tomcat 原生日志​​
  • ​​3.3.收集 tomcat json 日志​​
  • ​​3.4.收集 tomcat 多行匹配​​
  • ​​4.filebeat 收集 nginx 日志保存到本地​​
  • ​​五、Logstash 安装(ELFK 架构)​​
  • ​​1.单节点/分布式集群安装logstash​​
  • ​​2.修改 logstash 的配置文件​​
  • ​​3.logstash filter grok插件根据正则取出想要的字段​​
  • ​​4.logstash filter date插件修改写入时间​​
  • ​​5.filebeat 收集 nginx,tomcat日志推送给logstash,logstash发送es​​
  • ​​六、Kibana 自定义 dashboard​​
  • ​​1.统计PV(指标)​​
  • ​​2.统计客户端IP(指标)​​
  • ​​3.统计web下载带宽(指标)​​
  • ​​4.访问页面统计(水平条形图)​​
  • ​​5.IP的Top 5统计(饼图)​​
  • ​​6.统计后端IP服务访问高的Top 5(圆环图)​​
  • ​​7.最终效果图​​
  • ​​七、Kafka部署(ELKFK架构)​​
  • ​​1.kafka 单节点部署​​
  • ​​1.1.zookeeper 单节点​​
  • ​​1.2.kafka 单节点​​
  • ​​1.3.filebeat 收集 nginx 日志发送给kafka,logstash消费kafka消息发送给es,Kibana最终展示​​
  • ​​2.kafka 分布式集群部署​​
  • ​​2.1.zookeeper 集群部署​​
  • ​​2.2.kafka 集群部署​​
  • ​​2.3.filebeat 收集 tomcat 日志发送给kafka,logstash消费kafka消息发送给es,Kibana最终展示​​
  • ​​2.4.filebeat收集 nginx,tomcat,mysql-slow日志发送 kafka,logstash grok 分析 nginx,发送给es,kibana展示​​

环境

IP

ElasticSearch、Logstash、Kafka、Zookeeper

172.16.3.226/21

ElasticSearch、Logstash、Kafka、Zookeeper

172.16.3.227/21

ElasticSearch、Logstash、Kafka、Zookeeper

172.16.3.228/21

Kibana、FileBeat、Nginx、Tomcat

172.16.4.184/21

​https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.5-x86_64.rpm​

​https://artifacts.elastic.co/downloads/kibana/kibana-7.17.5-x86_64.rpm​

​https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.5-x86_64.rpm​

​https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-x86_64.rpm​

​https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz​

​https://downloads.apache.org/kafka/3.2.1/kafka_2.12-3.2.1.tgz​

​https://114-233-226-9.d.123pan.cn:30443/123-676/913c4533/1811652132-0/913c45332b22860b096217d9952c2ea4?v=3&t=1662523894&s=ac456641406e505eab6019bc617d3e28&i=d3a74ca9&filename=jdk-8u333-linux-x64.tar.gz&d=c1e7e2f9​

一、ElasticSearch 安装

1.elasticsearch 单节点安装

  • 3.226 机器上操作:
1、yum localinstall elasticsearch-7.17.5-x86_64.rpm -y

2、cd /etc/elasticsearch

3、备份一下 elasticsearch 默认配置文件
cp elasticsearch.yml{,.bak}
# systemctl cat elasticsearch 有兴趣可以查一下elasticsearch的启动信息

4、修改 elasticsearch 配置文件
egrep -v "^#|^$" elasticsearch.yml
cluster.name: chinaedu-elk
node.name: chinaedu-elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 172.16.3.226
http.port: 9200
discovery.seed_hosts: ["172.16.3.226"]

相关参数说明:
cluster.name:
集群名称,若不指定,则默认是"elasticsearch",⽇志⽂件的前缀也是集群名称。
node.name:
指定节点的名称,可以⾃定义,推荐使⽤当前的主机名,要求集群唯⼀。
path.data:
数据路径。
path.logs:
⽇志路径
network.host:
ES服务监听的IP地址
http.port:
ES服务对外暴露的端口
discovery.seed_hosts:
服务发现的主机列表,对于单点部署⽽⾔,主机列表和"network.host"字段配置相同
即可。

5、启动 elasticsearch
systemctl daemon-reload
systemctl start elasticsearch

2.elasticsearch 分布式集群安装

  • 3.226 操作
  • 在这需要注意所有主机都安装一下​​elasticsearch​
yum localinstall elasticsearch-7.17.5-x86_64.rpm -y

1、修改 elasticsearch 配置
cp /etc/elasticsearch/elasticsearch.yml{,.bak}
egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: chinaedu-elk
node.name: elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["172.16.3.226","172.16.3.227","172.16.3.228"]
cluster.initial_master_nodes: ["172.16.3.226","172.16.3.227","172.16.3.228"]

温馨提示:
"node.name"各个节点配置要区分清楚,建议写对应的主机名称。

2、将 3.226 上的elasticsearch配置文件同步到其他主机
scp /etc/elasticsearch/elasticsearch.yml [email protected]:/etc/elasticsearch/
scp /etc/elasticsearch/elasticsearch.yml [email protected]:/etc/elasticsearch/

3、3.227 配置
...
node.name: elk227

4、3.228 配置
...
node.name: elk228

5、所有节点启用 elasticsearch
# 在启动之前先删除 elasticsearch 生成的数据
rm -rf /var/{log,lib}/elasticsearch/* /tmp/*
systemctl daemon-reload
systemctl start elasticsearch

6、启动完成后可以验证一下elasticsearch 是否正常
curl 127.0.0.1:9200
curl 127.0.0.1:9200/_cat/nodes?v

3.elasticsearch 配置身份认证

elasticsearch7 中开始免费了账号密码认证功能,下面是xpack方式开启集群密码认证
1、在es的任一节点下生成p12文件,在es目录下执行命令
/usr/share/elasticsearch/bin/elasticsearch-certutil ca -out /etc/elasticsearch/cert/elastic-certificates.p12 -pass ""

2、生成p12文件后,将p12文件复制到其他节点的机器中,尽量保持p12的目录路径一致
scp -r /etc/elasticsearch/cert/ [email protected]:/etc/elasticsearch/cert/
scp -r /etc/elasticsearch/cert/ [email protected]:/etc/elasticsearch/cert/

3、所有主机修改 elastic-certificates.p12 权限以及属组
chown root.elasticsearch /etc/elasticsearch/cert/ -R && chmod 660 /etc/elasticsearch/cert/*

4、在所有节点的es config 目录下的elasticsearch.yml 文件新增如下配置(注意p12文件的目录路径):
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: p12文件的绝对目录路径
xpack.security.transport.ssl.truststore.path: p12文件的绝对目录路径

5、重启所有es
systemctl daemon-reload
systemctl restart elasticsearch

6、配置/自动生成密码,es中默认有5个用户
- 随机生成 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto
Changed password for user apm_system
PASSWORD apm_system = BKZDPuXJI2LCLkhueRLr

Changed password for user kibana_system
PASSWORD kibana_system = 8dOH6NAG6We7gtSMatgG

Changed password for user kibana
PASSWORD kibana = 8dOH6NAG6We7gtSMatgG

Changed password for user logstash_system
PASSWORD logstash_system = XrRbfLgxFYS8tvHPgaGh

Changed password for user beats_system
PASSWORD beats_system = DyOfdQ7XQWLcAtuZ99yV

Changed password for user remote_monitoring_user
PASSWORD remote_monitoring_user = i50tI88A8JS82i89n72A

Changed password for user elastic
PASSWORD elastic = wk9KI8qgCo5IDm2BLino

- 手动配置 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
会提示每个密码都要输入两遍

二、Elasticsearch cerebro 展示工具安装

​​其他安装方式可点击这里​​

  • 这里演示 Kubernetes 安装
kind: Deployment
apiVersion: apps/v1
metadata:
name: cerebro
labels:
k8s.kuboard.cn/name: cerebro
spec:
replicas: 1
selector:
matchLabels:
k8s.kuboard.cn/name: cerebro
template:
metadata:
labels:
k8s.kuboard.cn/name: cerebro
spec:
containers:
- name: cerebro
image: lmenezes/cerebro:latest
imagePullPolicy: IfNotPresent
restartPolicy: Always
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: cerebro-nginx
spec:
ports:
- name: yerc7y
protocol: TCP
port: 9000
targetPort: 9000
selector:
k8s.kuboard.cn/name: cerebro
type: NodePort

三、Kibana 安装

1、yum localinstall kibana-7.17.5-x86_64.rpm -y
2、cp /etc/kibana/kibana.yml{,.bak}
3、修改 Kibana 配置
egrep -v "^*#|^$" kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "8dOH6NAG6We7gtSMatgG"
i18n.locale: "zh-CN"
4、启动 Kibana
systemctl daemon-reload
systemctl start kibana.service
  • 浏览器访问Kibana: http://172.16.4.184:5601/

四、Filebeat 安装(EFK 架构)

1.Filebeat 的基础使用:


2.filebeat 收集 nginx

2.1.安装 nginx

1、配置 Nginx Yum源
yum install yum-utils -y
cat > /etc/yum.repos.d/nginx.repo << 'EOF'
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=1
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true

[nginx-mainline]
name=nginx mainline repo
baseurl=http://nginx.org/packages/mainline/centos/$releasever/$basearch/
gpgcheck=1
enabled=0
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
EOF

2、安装 Nginx
yum-config-manager --enable nginx-mainline
yum install nginx -y

3、启动 Nginx
systemctl start nginx

2.2.安装 filebeat

yum -y localinstall filebeat-7.17.5-x86_64.rpm 
cp /etc/filebeat/filebeat.yml{,.bak}
2.2.1.收集 nginx 原生日志,推送给 es
2.2.2.1.filebeat 收集单个文件
cat /etc/filebeat/filebeat.yml  # 收集单个日志
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: my-filestream-id
paths:
- /var/log/nginx/*.log
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-%{+yyyy.MM.dd}"

setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖

systemctl start filebeat.service

- 访问几次 Nginx服务产生一些日志;;;
curl 127.0.0.1

查看 elasticsearch 是否有nginx-access索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access"
2.2.2.2.filebeat 收集多个文件
cat /etc/filebeat/filebeat.yml    # 收集多个日志
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 新建 tags 字段可以用于判断
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 新建 tags 字段可以用于判断

output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖

systemctl start filebeat.service

- 访问几次 Nginx服务产生一些日志;;;
curl 127.0.0.1

查看 elasticsearch 是否有nginx-access、nginx-error索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | egrep "nginx-access|nginx-error"
注意事项:
7.17.5版本可能遇到的问题:
(1)input源配置⼀旦超过4个,写⼊ES时,就可能会复现出部分数据⽆法写⼊的问题;
有两种解决⽅案:
⽅案⼀: 拆成多个filebeat实例。运⾏多个filebeat实例时需要指定数据路径"--path.data"。
filebeat -e -c ~/config/23-systemLog-to-es.yml --path.data /tmp/filebeat

⽅案⼆: ⽇志聚合思路解决问题。
1)部署服务
yum -y install rsyslog

2)修改配置⽂件
vim /etc/rsyslog.conf
...
$ModLoad imtcp
$InputTCPServerRun 514
...
*.* /var/log/oldboyedu.log

3)重启服务并测试
systemctl restart rsyslog
logger "1111"
2.2.2.3.kibana 展示 nginx 日志

ELK 7.17.5 集群部署及使用_elasticsearch

ELK 7.17.5 集群部署及使用_elasticsearch_02

ELK 7.17.5 集群部署及使用_kafka_03

2.2.2.收集 nginx json 日志,推送给 es
1、修改 nginx 输出格式
vim /etc/nginx/nginx.conf
log_format oldboyedu_nginx_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"SendBytes":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"uri":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"tcp_xff":"$proxy_protocol_addr",'
'"http_user_agent":"$http_user_agent",'
'"status":"$status"}';

access_log /var/log/nginx/access.log oldboyedu_nginx_json;


2、定义Filebeat配置文件识别json格式
cat /etc/filebeat/filebeat.yaml
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-json-id
paths:
- /var/log/nginx/access.log
tags: ["access"]
# 以JSON格式解析message字段的内容
parsers:
- ndjson:
keys_under_root: true

output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-json-%{+yyyy.MM.dd}"


3、启动 Filebeat


4、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-json"
2.2.3.使用内置模块收集 nginx 日志 – 不好用,生产不建议使用…
1、还原 Nginx 日志默认配置;;;


2、cat /etc/filebeat/filebeat.yml
filebeat.config.modules:
# 指定模块配置文件路径,${path.config} 代表 /etc/filebeat
path: ${path.config}/modules.d/nginx.yml
# 是否开启热加载功能
reload.enabled: true

output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-modlues-%{+yyyy.MM.dd}"

setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true


3、filebeat -c filebeat.yml modules list # 查看支持的模块
4、filebeat -c filebeat.yml modules enable nginx # 启用 Nginx 模块
# 5、filebeat -c filebeat.yml modules disable nginx # 禁用 Nginx 模块


6、修改 nginx 模块配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/nginx.yml
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log"]
error:
enabled: false
var.paths: ["/var/log/nginx/error.log"]
ingress_controller:
enabled: false

7、启动Filebeat

8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-modlues"
2.2.4.收集 nginx 指定字段信息,忽略其他
1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 新建 tags 字段可以用于判断
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 新建 tags 字段可以用于判断
include_lines: ['\[error\]'] # 收集包含[error]字段的信息

output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖

2、启动filebeat

ELK 7.17.5 集群部署及使用_nginx_04

3.filebeat 收集 tomcat

3.1.使用内置模块收集 tomcat 日志 – 不好用,生产不建议使用…
1、这里安装 tomcat 步骤忽略


2、配置 Tomcat beat文件
egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.config.modules:
path: ${path.config}/modules.d/tomcat.yml
reload.enabled: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "tomcat-modlues-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true


3、filebeat -c filebeat.yml modules list # 查看支持的模块
4、filebeat -c filebeat.yml modules enable tomcat # 启用 tomcat 模块
# 5、filebeat -c filebeat.yml modules disable tomcat # 禁用 tomcat 模块


6、修改 Tomcat 模块配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/tomcat.yml
- module: tomcat
log:
enabled: true
var.input: file
var.paths:
- /data/logs/tomcat/catalina.out


7、启动 Filebea


8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "tomcat-modlues"
3.2.收集 tomcat 原生日志
1、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-tomcat-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true


2、启动 Filebeat
systemctl enable filebeat

3、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "catalina.out-tomcat"
3.3.收集 tomcat json 日志
  • 这里就不做测试了,原理跟Nginx json是一样的。
3.4.收集 tomcat 多行匹配
1、修改 server.xml 模拟 tomcat 报错
164 <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
165 prefix="localhost_access_log" suffix=".txt"
166 pattern="%h %l %u %t "%r" %s %b" />
167
168 </Host111111> # 在/Host后面新增一些内容


2、多启动几次 tomcat 生成一些报错日志,然后将 server.xml 配置还原,再起启动

3、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
parsers:
- multiline:
# 指定多行匹配的类型,可选值为"pattern","count"
type: pattern
# 指定匹配模式,匹配以2个数字开头的
pattern: '^\d{2}'
# 下面两个参数,参考官方架构图即可;
# https://www.elastic.co/guide/en/beats/filebeat/7.17/multiline-examples.html
negate: true
match: after
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-error-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true

4.filebeat 收集 nginx 日志保存到本地

1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["firewalld"]

output.file:
# 文件保存的路径
path: "/tmp/filebeat"
# 本地保存的文件名字
filename: filebeat-nginx-access.log
# 指定文件的滚动大小,默认为20M
rotate_every_kb: 102400
# 指定保存文件个数,默认是7个,有效值默为2-1024个
number_of_files: 7
# 指定文件的权限
permissions: 0600

2、启动Filebeat


ll /tmp/filebeat/
总用量 8
-rw------- 1 root root 5209 8月 26 15:25 filebeat-nginx-access.log

五、Logstash 安装(ELFK 架构)

1.单节点/分布式集群安装logstash

1、安装logstash
yum localinstall logstash-7.17.5-x86_64.rpm -y

2、创建软连接,在全局下可以执行logstash命令
ln -sv /usr/share/logstash/bin/logstash /usr/local/bin

2.修改 logstash 的配置文件

(1)编写配置⽂件
cat > conf.d/01-stdin-to-stdout.conf <<'EOF'
input {
stdin {}
}

output {
stdout {}
}
EOF

(2)检查配置⽂件语法
logstash -tf conf.d/01-stdin-to-stdout.conf

(3)启动logstash实例
logstash -f conf.d/01-stdin-to-stdout.conf

3.logstash filter grok插件根据正则取出想要的字段

注释: Nginx 输出日志的格式:
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'"$status" "$body_bytes_sent" "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'"$request_length" "$request_time" '
'"$host" "$upstream_addr" "$upstream_status" '
'"$upstream_response_length" "$upstream_response_time"';

access_log /var/log/nginx/access.log main;

(1)filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true

output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]

(2)logstash配置:
input {
beats {
port => 5044
}
}

filter {
grok {
# 参考文档: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
# 正则模式可参考: https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "(%{NUMBER:upstream_status}|-)" "(%{NUMBER:upstream_response_length}|-)" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}

mutate {
# 参考文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段转换成相应对数据类型.
convert => [
"bytes", "integer", # 转换成int类型,这样就可以对字段进行算术运算,如果不转换则默认是字符串类型。
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}

mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}
}

output {
#stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}

4.logstash filter date插件修改写入时间

input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}
mutate {
# 参考文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段转换成相应对数据类型.
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}

mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}

#参考文档: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html
date {
# 匹配时间字段并解析"timestamp"
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}

output {
#stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}

5.filebeat 收集 nginx,tomcat日志推送给logstash,logstash发送es

(1)Filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access # 定义索引名称
fields_under_root: true # 把fields设置为顶级字段,否则elasticsearch无法识别。

- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-access # 定义索引名称
fields_under_root: true # 把fields设置为顶级字段,否则elasticsearch无法识别。

output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]


(2)logstash配置:
input {
beats {
port => 5044
}
}

output {
# stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}" # [type_index] 获取Filebeat设置的名称
user => "elastic"
password => "chinaedu"
}
}

ELK 7.17.5 集群部署及使用_elastic_05

六、Kibana 自定义 dashboard

1.统计PV(指标)

Page View(简称:"PV")
⻚⾯访问或点击量。

kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)选择指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数:计数
显示名称: 空
(8)保存到库
标题:lms-saas 总访问量

2.统计客户端IP(指标)

客户端IP:
通常指的是访问Web服务器的客户端IP地址,但要注意,客户端IP数量并不难代表UV。

kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数: 唯⼀计数
选择字段: clientip.keyword
显示名称: 空
(8)保存到库
标题:lms-saas IP

3.统计web下载带宽(指标)

带宽:
统计nginx返回给客户端⽂件⼤⼩的字段进⾏累计求和。

kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数: 求和
选择字段: bytes
显示名称: 空
值格式:字节(1024)
(8)保存到库
标题:lms-saas 总流量

4.访问页面统计(水平条形图)

访问资源统计:
对URI的访问次数统计。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)⽔平条形图
(6)选择索引模式(例如"nginx-access-*")
(7)"垂直轴"
选择函数:排名最前值
字段: request.keyword
值数目:5
排名依据:访问量
排名方向:降序
高级:取消"将其他值分为其他"
显示名称: 空
(8)"水平轴"
聚合: 计数
显示名称: 空

5.IP的Top 5统计(饼图)

IP的TopN统计:
统计访问量的客户端IP最⼤的是谁。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)饼图
(6)切片依据:
选择函数:排名最前
选择字段:client.keyword
高级:取消"将其他值分为其他"
显示名称: 空
(7)大小调整依据:
选择函数:计数
(8)保存到库:
标题:lms-saas 客户端IP top5

6.统计后端IP服务访问高的Top 5(圆环图)

IP的TopN统计:
统计访问量的客户端IP最⼤的是谁。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)圆环图
(6)切片依据:
选择函数:排名最前值
选择字段:upstream_addr.keyword
高级:取消"将其他值分为其他"
显示名称: 空
(7)大小调整依据:
选择函数:计数
(8)保存到库:
标题:lms-saas upstream Top5

7.最终效果图

ELK 7.17.5 集群部署及使用_elastic_06

七、Kafka部署(ELKFK架构)

  • 注意:这里我是单独部署的zookeeper,没有用kafka内置zookeeper。
  • 如果想用 kafka 内置的zookeeper则​​可以参考这篇文章​​

1.kafka 单节点部署

1.1.zookeeper 单节点

(1)解压 zookeeper 软件包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/

(2)创建环境变量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile

(3)创建zookeeper配置文件
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5

(4)启动zookeeper节点
zkServer.sh start
zkServer.sh status # 查看zk服务的状态信息
zkServer.sh stop
zkServer.sh restart

zookeeper配置文件解释:

dataDir ZK数据存放目录。.
dataLogDir ZK日志存放目录。
clientPort 客户端连接ZK服务的端口。
tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit 允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
syncLimit Leader与Follower之间发送消息时,请求和应答时间⻓度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。

1.2.kafka 单节点

(1)解压 kafka 软件包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/

(2)配置环境变量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile

(3)修改kafka配置文件
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

(4)启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 关闭Kafka服务

(5)验证kafka节点,是否正常工作
1.启动生产者
kafka-console-producer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092
>AAAAAAAA
>BBBBBBB
>CCCCCCCC

2、启动消费者
kafka-console-consumer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092 --from-beginning
AAAAAAAA
BBBBBBB
CCCCCCCC

温馨提示:
"--topic":要生成消息的主题id。
"--bootstrap-server":指定kafka节点的地址跟端口
"--from-beginning":代表从该topic的最开始位置读取数据,若不加该参数,则默认从topic的末尾读取。

kafka配置文件解释:

​ broker.id 每个server需要单独配置broker id,如果不配置系统会自动配置。需要和上一步ID一致
​ listeners 监听地址,格式PLAINTEXT://IP:端口。
​ num.network.threads 接收和发送网络信息的线程数。
​ num.io.threads 服务器用于处理请求的线程数,其中可能包括磁盘I/O。
​ socket.send.buffer.bytes 套接字服务器使用的发送缓冲区(SO_SNDBUF)
​ socket.receive.buffer.bytes 套接字服务器使用的接收缓冲区(SO_RCVBUF)
​ socket.request.max.bytes 套接字服务器将接受的请求的最大大小(防止OOM)
​ log.dirs 日志文件目录。
​ num.partitions partition数量。
​ num.recovery.threads.per.data.dir 在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。
​ offsets.topic.replication.factor 偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。
​ log.retention.hours 日志文件删除之前保留的时间(单位小时),默认168
​ log.segment.bytes 单个日志文件的大小,默认1073741824
​ log.retention.check.interval.ms 检查日志段以查看是否可以根据保留策略删除它们的时间间隔。
​ zookeeper.connect ZK主机地址,如果zookeeper是集群则以逗号隔开。
​ zookeeper.connection.timeout.ms 连接到Zookeeper的超时时间。

1.3.filebeat 收集 nginx 日志发送给kafka,logstash消费kafka消息发送给es,Kibana最终展示

(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true

output.kafka:
hosts: ["172.16.3.226:9092"]
topic: "log"

(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092"
}
}

output {
stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}

(3)Kibana展示,参考下图:

ELK 7.17.5 集群部署及使用_nginx_07

2.kafka 分布式集群部署

2.1.zookeeper 集群部署

(1)解压 zookeeper 软件包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/

(2)创建环境变量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile

(3)创建zookeeper配置文件
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.16.3.226:2888:3888
server.2=172.16.3.227:2888:3888
server.3=172.16.3.228:2888:3888

(3)创建data、log目录
mkdir -p /tmp/zookeeper /var/log/zookeeper
echo 1 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.226机器
echo 2 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.227机器
echo 3 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.228机器

(5)启动zookeeper节点
zkServer.sh start
zkServer.sh status # 查看zk服务的状态信息
zkServer.sh stop
zkServer.sh restart

2.2.kafka 集群部署

(1)解压 kafka 软件包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/

(2)配置环境变量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile

(3)修改kafka配置文件
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181,172.16.3.227:2181,172.16.3.228:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

(4)227 配置
...
broker.id=227
listeners=PLAINTEXT://172.16.3.227:9092

(5)228 配置
...
broker.id=228
listeners=PLAINTEXT://172.16.3.228:9092

(6)启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 关闭Kafka服务

(7)验证kafka是否是集群模式:
zkCli.sh ls /brokers/ids | grep "^\["
[226, 227, 228]

2.3.filebeat 收集 tomcat 日志发送给kafka,logstash消费kafka消息发送给es,Kibana最终展示

(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true

output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"

(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}

output {
stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}

(3)Kibana展示,参考下图:

ELK 7.17.5 集群部署及使用_elastic_08

zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

(4)227 配置

broker.id=227
listeners=PLAINTEXT://172.16.3.227:9092

(5)228 配置

broker.id=228
listeners=PLAINTEXT://172.16.3.228:9092

(6)启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 关闭Kafka服务

(7)验证kafka是否是集群模式:
zkCli.sh ls /brokers/ids | grep “^[”
[226, 227, 228]

### 2.3.filebeat 收集 tomcat 日志发送给kafka,logstash消费kafka消息发送给es,Kibana最终展示

```shell
(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true

output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"

(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}

output {
stdout {}

elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}

(3)Kibana展示,参考下图:

ELK 7.17.5 集群部署及使用_elk_09

2.4.filebeat收集 nginx,tomcat,mysql-slow日志发送 kafka,logstash grok 分析 nginx,发送给es,kibana展示

(1)filebeat配置文件:
filebeat.inputs:
- type: filestream
enabled: true
id: nginx-access-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true

- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true

- type: filestream
enabled: true
id: mysql-slowlog-id
paths:
- /data/mysql/logs/slowquery.log
fields:
type_index: mysql-slowlog
fields_under_root: true
parsers:
- multiline:
type: pattern
pattern: '^# Time: '
negate: true
match: after

output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"

(2)logstash配置文件:
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}

filter {
if "nginx" in [type_index] {
grok {
match => ['message', '%{IPV4:remote_addr} - ?(%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:http_method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:response}" "(?:%{NUMBER:bytes}|-)" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"']
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}

mutate {
remove_field => [ "msg" , "message" ]
}
}
}

output {
# stdout {}

if "nginx" in [type_index] {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "logstash-%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
else {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
}

(3)通过Kibana查看es是否收集到日志

ELK 7.17.5 集群部署及使用_elasticsearch_10


ELK 7.17.5 集群部署及使用_elasticsearch_11


标签:ELK,filebeat,7.17,log,kafka,nginx,集群,172.16,elasticsearch
From: https://blog.51cto.com/u_14440843/5976697

相关文章