首页 > 其他分享 >filebeat+kafka_logstash+es进行日志分析

filebeat+kafka_logstash+es进行日志分析

时间:2023-05-06 10:56:50浏览次数:59  
标签:filebeat 配置 logstash 日志 kafka yml

filebeat+kafka_logstash+es进行日志分析

目录

 


回到顶部

一. 将安装包上传至目标服务器(即日志所在的服务器)

就是我提供的安装包filebeat-7.6.2-linux-x86_64.tar.gz
将安装包上传至/opt目录下

回到顶部

二. 解压安装

进入/opt目录

cd /opt
tar -zxvf ./ filebeat-7.6.2-linux-x86_64.tar.gz
mv filebeat-7.6.2-linux-x86_64 filebeat-7.6.2
回到顶部

三. 配置filebeat

1. 配置采集日志到logstash,这种配置适用于日志量较小的场景,Filebeat---> logstash,logstash直接解析filebeat

1.1 修改配置文件
cp filebeat.yml filebeat_logstash.yml
vim filebeat_logstash.yml
#配置filebeat_logstash.yml
#Inputs配置:
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/lemo/*.log 

image

这里注意启动filebeat进程的用户要有日志文件的读权限,不然会报错权限不够

配置output.logstash:
image

这里hosts指定的logstash是logstash主机的ip或主机名,指定主机名是filebeat本地要配置host映射,端口需指定为logstash服务器未被占用的端口.

1.2 启动filebeat
./filebeat -e -c ./filebeat_logstash.yml
后台启动:
nohup ./filebeat -e -c ./filebeat_logstash.yml &

2. 配置采集日志至kafka,filebeat---> kafka,再通过logstash消费kafka数据进行日志解析

2.1 配置filebeat:
cp filebeat.yml filebeat_kafka.yml
vim filebeat_kafka.yml

filebeat.inputs跟上面一样:
image
配置output:

output.kafka:
# ------------------------------ Kafka Output -------------------------------
output.kafka:
  # initial brokers for reading cluster metadata
     hosts: ["hadoop101:9092", "hadoop102:9092", "hadoop103:9092"]
  #
  #     # message topic selection + partitioning
     topic: 'first'
     partition.round_robin:
       reachable_only: false

     required_acks: 1
     compression: gzip
     max_message_bytes: 1000000

image

2.2 启动filebeat
./filebeat -e -c ./filebeat_kafka.yml
后台启动:
nohup ./filebeat -e -c ./filebeat_kafka.yml &
回到顶部

四. 配置logstash

这里以配置filebeat->kafka->logstash->es方式
在logstash中添加以下配置:

input{
    kafka {
         bootstrap_servers => "hadoop101:9092,hadoop102:9092,hadoop103:9092"
         client_id => "test"
         group_id  => "test"
         topics => ["first"]
         auto_offset_reset => "earliest"
         auto_commit_interval_ms => "1000"
         decorate_events => true
             }
}
filter{
   json {
       source => "message"
   }
   grok {
       match => {
          "message" => "\[%{GREEDYDATA:time}\]-%{GREEDYDATA:level}-\[biz:%{GREEDYDATA:biz}\]\[sys:%{NUMBER:sys}\]\[%{GREEDYDATA:message}\]"
       }
   }
}
output{
    elasticsearch {
        hosts => ["hadoop101:9200","hadoop102:9200","hadoop103:9200"]
        index => "test-%{+YYYY.MM.dd}"
    }
}

保存后启动logstash

./logstash -f 指定配置文件运行

发送日志测试:

echo "[2022-06-06 15:50:39.313]-INFO-[biz:983397519587282944][sys:983397519587282945][com.phfund.prit.query.controller.CommonQueryController-43][请求参数:{"sqlId":"CalendarTaskInfoIPO","endDate":"20220610","type":"","startDate":"20220606"}]" >> /home/lemo/test.log

查看kibana界面:
image

 

标签:filebeat,配置,logstash,日志,kafka,yml
From: https://www.cnblogs.com/gaoyuechen/p/17376576.html

相关文章

  • Kafka基础阶段与集群搭建详细教程
    Kafka第一天课堂笔记一.Kafka简介1.1消息队列消息队列——用于存放消息的组件程序员可以将消息放入到队列中,也可以从消息队列中获取消息很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)消息队列中间件:消息队列的组件,例如:Kafk......
  • C#操作kafka
    1.手动设置TopicPartition=>offset//手动设置TopicPartition=>offsetforeach(TopicPartitionpartitioninconsumer.Assignment){if(partition.Partition.Value==13){TopicPartitionOffsetoffset=newTopicPartitionOffset(partition,27060);......
  • Kafka2.4安装与配置
    一、安装zookeeper集群1、安装jdk1.82、下载apache-zookeeper-3.5.7-bin.tar.gz并解压第1台机器:mkdir/usr/local/zookeeper/datamv/usr/local/zookeeper/conf/zoo_sample.cfg/usr/local/zookeeper/conf/zoo.cfgvim/usr/local/zookeeper/conf/zoo.cfgdataDir=/usr/l......
  • JDK导致ActiveMQ、Kafka连接zookeeper失败:Session 0x0 for server 10.1.21.244/<unres
      最近在部署一套ActiveMQ集群时,使用zookeeper来实现,zookeeper启动了,在启动ActiveMQ时,抛出异常:    WARN|Session0x0forserver10.1.21.244/<unresolved>:2181,unexpectederror,closingsocketconnectionandattemptingreconnectjava.lang.IllegalArgu......
  • C# 入门 Kafka
    从C#入门Kafka  目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Kafka脚本工具主题管理使用C#创建分区分区与复制生产者消费者修改配置3,Kafka.NET基础生产者批量生......
  • kafka 不支持读写分离的原因
    前段时间在看kafka相关内容,发现kafka“所有的”读写流量都在主partition上,从partition只负责备份数据。那么为什么kafka从partition不跟其他中间件一样承接读流量?读写分离的初衷读写分离的初衷我觉得是利用读流量&写流量不同的特性做针对性的优化,而这两种流量......
  • 记录一下linux-kafka命令
    使用工具:puTTY下载地址:DownloadPuTTY-afreeSSHandtelnetclientforWindowsloginas:rootroot@*******'spassword:Lastlogin:FriApr2814:54:262023from10.10.16.80[root@kafka272c41~]#cd..[root@kafka272c41/]#ls-a....autorelabelbinboot......
  • Kafka命令行常用命令说明(一)
    基于0.8.0版本。 ##查看topic分布情况kafka-list-topic.shbin/kafka-list-topic.sh--zookeeper192.168.197.170:2181,192.168.197.171:2181(列出所有topic的分区情况)bin/kafka-list-topic.sh--zookeeper192.168.197.170:2181,192.168.197.171:2181--topictest(查......
  • @KafkaListener属性简介
    @KafkaListener从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。例如:@KafkaListener(id="consumer-id",......
  • 【kafka】-分区-消费端负载均衡
    一.为什么kafka要做分区?因为当一台机器有可能扛不住(类比:就像redis集群中的redis-cluster一样,一个master抗不住写,那么就多个master去抗写),把一个队列的单一master变成多个master,即一台机器扛不住qps,那么我就用多台机器扛qps,把一个队列的流量均匀分散在多台机器上不就可以了么。 ......