介绍
Elastic Stack 有四个主要组件:
Elasticsearch:一个分布式RESTful搜索引擎,存储所有收集的数据。 Logstash:Elastic Stack 的数据处理组件,将传入数据发送到 Elasticsearch。 Kibana:用于搜索和可视化日志的 Web 界面。 Beats:轻量级、单一用途的数据传送器,可以将数据从数百或数千台机器发送到 Logstash 或 Elasticsearch。
先决条件
yum install -y java-1.8.0-openjdk //安装jdk.18
yum install -y vim* //安装vim编辑器
yum -y install net-tools //安装net-tools工具箱
systemctl stop firewalld //关闭防火墙
systemctl disable firewalld //开机时不要自动启动防火墙
安装和配置 Elasticsearch
Elastic Stack 的所有软件包都使用 Elasticsearch 签名密钥进行签名,以保护你的系统免受软件包欺骗。将导入 Elasticsearch 公共 GPG 密钥并添加 Elastic 包源列表以安装 Elasticsearch。
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch //导入GPG公共秘钥
添加软件包仓库,使用vim编辑软件仓库文件
vim /etc/yum.repos.d/elasticsearch.repo //使用vim新建并编辑
软件包仓库版本信息如下(elasticsearch.repo内容)
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
然后安装 Elasticsearch组件
yum install -y elasticsearch
编辑Elasticsearch配置文件
vim /etc/elasticsearch/elasticsearch.yml
Elasticsearch 侦听来自 port 上各处的流量9200。需要限制对 Elasticsearch 实例的外部访问,以防止外部人员通过 REST API 读取您的数据或关闭您的 Elasticsearch 集群。找到指定的行network.host,取消注释,并将其值替换为localhost。
在55行和59行,取消注释并修改如下
network.host: localhost
http.port: 9200
保存之后启动 Elasticsearch 服务
systemctl start elasticsearch //开启 Elasticsearch 服务
systemctl enable elasticsearch //使 Elasticsearch 在每次服务器启动时自动启动
systemctl status elasticsearch //查看 Elasticsearch 状态是否正常运行
发送 HTTP 请求来测试 Elasticsearch 服务是否正在运行
curl -X GET "localhost:9200"
安装和配置 Kibana 仪表板
安装 Kibana 组件
yum install -y kibana
编辑 Kibana 配置文件
vim /etc/kibana/kibana.yml
将2行和7行的注释取消并修改如下
server.port: 5601
server.host: "0.0.0.0"
保存之后开启 Kibana 服务
systemctl start kibana //开启 Kibana 服务
systemctl enable kibana //设置开机自动启动 Kibana 服务
systemctl status kibana //查看 Kibana 服务是否正常运行
看一下端口监听
172.16.10.191:5601 //服务器IP地址+5601端口号
安装和配置 Logstash
安装 Logstash
yum install -y logstash
安装 Logstash 后继续进行配置。Logstash 的配置文件以 JSON 格式编写并驻留在/etc/logstash/conf.d目录中。在配置它时,将 Logstash 视为一个管道,它在一端接收数据,以一种或另一种方式处理它,然后将其发送到它的目的地(在这种情况下,目的地是Elasticsearch)。Logstash 管道具有两个必需元素input和output和一个可选元素filter。输入插件使用来自源的数据,过滤器插件处理数据,输出插件将数据写入目的地。
创建一个名为的配置文件02-beats-input.conf,您将在其中设置 Filebeat 输入
vim /etc/logstash/conf.d/02-beats-input.conf
插入以下input配置信息并保存。这指定了一个beats将侦听 TCP 端口的输入5044。
input {
beats {
port => 5044
}
}
保存并关闭文件。创建一个名为 的配置文件10-syslog-filter.conf,我们将在其中添加系统日志过滤器,也称为syslogs
vim /etc/logstash/conf.d/10-syslog-filter.conf
插入以下 syslog 过滤器配置。此示例系统日志配置取自Elastic 官方文档。此过滤器用于解析传入的系统日志,以使它们结构化并可供预定义的 Kibana 仪表板使用
filter {
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
}
完成后保存并关闭文件。最后创建一个名为的配置文件30-elasticsearch-output.conf
vim /etc/logstash/conf.d/30-elasticsearch-output.conf
插入以下output配置。本质上,此输出将 Logstash 配置为将 Beats 数据存储在localhost:9200以所使用的 Beat 命名的索引中的 Elasticsearch 中 。
output {
elasticsearch {
hosts => ["localhost:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
使用以下命令测试您的 Logstash 配置是否有语法错误
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
如果没有语法错误,会显示 Configuration OK
启动并启用 Logstash 以使配置更改生效
systemctl start logstash //启动 Logstash 服务
systemctl enable logstash //开机启动 Logstash 服务
systemctl status logstash //查看 Logstash 是否正常
安装和配置 Filebeat
Elastic Stack 使用几个称为 Beats 的轻量级数据传送器从各种来源收集数据并将它们传输到 Logstash 或 Elasticsearch。以下是 Elastic 目前提供的 Beats:
Filebeat:收集和发送日志文件。
Metricbeat:从您的系统和服务中收集指标。
Packetbeat:收集和分析网络数据。
Winlogbeat:收集 Windows 事件日志。
Auditbeat:收集 Linux 审计框架数据并监控文件完整性。
Heartbeat:通过主动探测监控服务的可用性。
安装 Filebeat
yum install -y filebeat
注意:与 Elasticsearch 一样,Filebeat 的配置文件是 YAML 格式。这意味着正确的缩进至关重要,因此请务必使用这些说明中指示的相同数量的空格。 Filebeat 支持多种输出,但通常只会将事件直接发送到 Elasticsearch 或 Logstash 以进行额外处理。这里我们将使用 Logstash 对 Filebeat 收集的数据进行额外处理。Filebeat 不需要直接向 Elasticsearch 发送任何数据,所以让我们禁用该输出。
...
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
...
然后,配置该output.logstash部分。取消注释这些行output.logstash:并hosts: [“localhost:5044”]删除#. 这会将 Filebeat 配置为连接到 Elastic Stack 服务器上5044的 Logstash 端口,我们之前为其指定了 Logstash 输入的端口
. . .
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
. . .
Filebeat 的功能可以通过Filebeat 模块进行扩展。在这我们启用系统模块
filebeat modules enable system //启用system模块
filebeat modules list //此命令查看启用和禁用模块的列表
接下来,将索引模板加载到 Elasticsearch 中。Elasticsearch 索引是具有相似特征的文档的集合。索引用一个名称标识,用于在其中执行各种操作时引用索引。创建新索引时将自动应用索引模板。
sudo filebeat setup --template -E output.logstash.enabled=false -E 'output.elasticsearch.hosts=["localhost:9200"]'
之后会输入一条 Loaded index template
随着仪表板的加载,Filebeat 连接到 Elasticsearch 以检查版本信息。要在启用 Logstash 时加载仪表板,您需要禁用 Logstash 输出并启用 Elasticsearch 输出
sudo filebeat setup -e -E output.logstash.enabled=false -E output.elasticsearch.hosts=['localhost:9200'] -E setup.kibana.host=localhost:5601
会看到如下输出
启动并启用 Filebeat
systemctl start filebeat //启动 Filebeat 服务
systemctl enable filebeat //开机启动 Filebeat 服务
systemctl status filebeat //查看 Filebeat 状态
如果正确设置了 Elastic Stack,Filebeat 将系统日志和授权日志传送到 Logstash,然后将这些数据加载到 Elasticsearch。
要验证 Elasticsearch 确实在接收此数据,请使用以下命令查询 Filebeat 索引
curl -XGET 'http://localhost:9200/filebeat-*/_search?pretty'
看到类似于的输出
最后回到 Kibana 仪表板(elk服务器:5601),点击 Discover 默认选择预定义的filebeat- * 索引模式以查看 Filebeat 数据。右边将显示过去 15 分钟内的所有日志数据。您将看到一个带有日志事件的直方图,以及下面的一些日志消息