kafka
一、单点部署
docker-compose创建参考地址
1.下载kafka软件包
[[email protected] ~]# ll
-rw-r--r-- 1 root root 103956099 Apr 10 16:09 kafka_2.13-3.2.1.tgz
2. 解压软件包
[[email protected] ~]# tar xf kafka_2.13-3.2.1.tgz -C /es/softwares/
3. 创建符号连接
[[email protected] ~]# cd /es/softwares/ && ln -svf kafka_2.13-3.2.1 kafka
‘kafka’ -> ‘kafka_2.13-3.2.1’
4. 配置环境变量
[[email protected] /es/softwares]# cat /etc/profile.d/kafka.sh
#!/bin/bash
export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[[email protected] /es/softwares]# source /etc/profile.d/kafka.sh
5.修改配置文件
[[email protected] ~]# egrep -nv '^$|^#' /es/softwares/kafka/config/server.properties
#修改这一行
24:broker.id=101
44:num.network.threads=3
47:num.io.threads=8
50:socket.send.buffer.bytes=102400
53:socket.receive.buffer.bytes=102400
56:socket.request.max.bytes=104857600
62:log.dirs=/tmp/kafka-logs
67:num.partitions=1
71:num.recovery.threads.per.data.dir=1
76:offsets.topic.replication.factor=1
77:transaction.state.log.replication.factor=1
78:transaction.state.log.min.isr=1
105:log.retention.hours=168
112:log.segment.bytes=1073741824
116:log.retention.check.interval.ms=300000
#修改这一行
125:zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
128:zookeeper.connection.timeout.ms=18000
138:group.initial.rebalance.delay.ms=0
6.启动kafka单点
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
7. 验证zookeeper的源数据信息
nohup java -jar zkWeb-v1.2.1.jar &
补充:
如果装错了,重新启动的话,要删除kafaka的日志/tmp/kafka-logs
二、部署kafka集群
1.停止kafka单点服务
[[email protected] ~]# kafka-server-stop.sh
2.修改配置文件
[[email protected] ~]# egrep -v '^#|^$' /es/softwares/kafka/config/server.properties
# 唯一标识kafka节点的数字编号,随便写,同一个集群中节点的id编号不重复即可。
broker.id=101
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 指定kafka的数据存储路径。
log.dirs=/es/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# kafka连接zookeeper集群地址
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
3.同步kafka软件包相关文件
[[email protected] ~]# data_rsync.sh /es/softwares/kafka
===== rsyncing elk102.com: kafka =====
命令执行成功!
===== rsyncing elk103.com: kafka =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /es/softwares/kafka_2.13-3.2.1/
===== rsyncing elk102.com: kafka_2.13-3.2.1 =====
命令执行成功!
===== rsyncing elk103.com: kafka_2.13-3.2.1 =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /etc/profile.d/kafka.sh
===== rsyncing elk102.com: kafka.sh =====
命令执行成功!
===== rsyncing elk103.com: kafka.sh =====
命令执行成功!
4.其他2个节点修改配置文件
[[email protected] ~]# vim /es/softwares/kafka/config/server.properties
...
broker.id=102
[[email protected] ~]# vim /es/softwares/kafka/config/server.properties
...
broker.id=103
5.所有节点启动kafka环境
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[[email protected] ~]# source /etc/profile.d/kafka.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[[email protected] ~]# source /etc/profile.d/kafka.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
6.查看zookeeper的源数据
[[email protected] ~]# zkCli.sh -server 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
Connecting to 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
....
[zk: 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181(CONNECTED) 1] ls /linux-zk-kafka321/brokers/ids
[101, 102, 103]
三、kafka的常用术语
kafka cluster(broker list):
kafka集群。
kafka Server (broker):
指的是kafka集群的某个节点。
Producer:
生产者,即往kafka集群写入数据的角色。
Consumer:
消费者,即从kafka集群中读取数据的角色。一个消费者隶属于一个消费者组。
Concumer Group:
消费者组,里面有一个或多个消费者。
Topic:
主题,是一个逻辑概念,用于区分业务,一个主题最少要有1个分区和一个副本。
Partition:
分区,分区可以暂时理解为分区编号。
replica:
副本,副本是实际存储数据的地方,分为两种角色,即leader和follower。
leader:
负责读写。
follower:
负责从leader节点同步数据,无法对集群外部提供任何服务。当leader无法访问时,follower会接管leader的角色。
AR:
所有的副本,包含leader和follower副本。
ISR:
表示和leader同步的所有副本集合。
OSR:
表示和leader不同步的所有副本即可。
zookeeper集群:
kafka 0.9之前的版本维护消费者组的offset,之后kafka内部的topic进行维护。
协调kafka的leader选举,控制器协调者选举等....
client:
consumer API:
即消费者,指的是从boker拉取数据的角色。
每个消费者均隶属于一个消费者组(consumer Group),一个消费者组内可以有多个消费者。
producer API:
即生产者,指的是往broker写入数据的角色。
admin API:
集群管理的相关API,包括topic,parititon,replica等管理。
stream API:
数据流处理等API,提供给Spark,Flink,Storm分布式计算框架提供数据流管道。
connect API:
连接数据库相关的API,例如将MySQL导入到kafka。
常见问题:
Q1: 分区和副本有啥区别?
分区可以暂时理解为分区编号,它包含该分区编号的所有副本,和磁盘的分区没关系。
副本是实际存储数据的地方,
Q2: offset存储在kafka集群,客户端在kafka集群任意一个节点如何获取偏移量。
通过内部的消费者组的偏移量读取即可。("__consumer_offsets")
四、topic管理
1.查看topic
#查看topic列表
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list
#查看指定topic的详细信息
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic linux-zk
Topic: linux-zk TopicId: Bi3swYr7SVWeseyhi8hJFA PartitionCount: 3
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: linux-zk Partition: 0 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: linux-zk Partition: 1 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: linux-zk Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
#分区 #副本领导者 #副本追随者 与领导者同步的副本
#查看所有的topic详细信息
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe
Topic: linux-zk TopicId: Bi3swYr7SVWeseyhi8hJFA PartitionCount: 3
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: linux-zk Partition: 0 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: linux-zk Partition: 1 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: linux-zk Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
Topic: linux-zk-test TopicId: EU4xyfjoSL2lYgmVKPb-YQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: linux-zk-test Partition: 0 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: linux-zk-test Partition: 1 Leader: 103 Replicas: 103,102 Isr: 103,102
Topic: linux-zk-test Partition: 2 Leader: 102 Replicas: 102,101 Isr: 102,101
2.创建topic
一个topic是生产者(producer)和消费者(consumer)进行逻辑的通信单元。
底层存储数据的是对应一个或多个分区(partition)副本(replica)。
#创建一个名为"linux-zk",分区数为3,副本数量为2的topic。
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --partitions 3 --replication-factor 2 --topic linux-zk
Created topic linux-zk.
#replication-factor副本数量不能大于主机数量
#创建一个名为"linux-zk-test",分区数为3,副本数量为2的topic。
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --partitions 3 --replication-factor 2 --topic linux-zk-test
Created topic linux-zk-test.
3.修改topic
(分区数量可以调大,但不可以调小!)
#修改"linux-zk"的分区大小为5
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --alter --topic linux-zk --partitions 5
#查看修改后的linux-zk
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic linux-zk
Topic: linux-zk TopicId: Bi3swYr7SVWeseyhi8hJFA PartitionCount: 5
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: linux-zk Partition: 0 Leader: 102 Replicas: 102,101 Isr: 102,101
Topic: linux-zk Partition: 1 Leader: 101 Replicas: 101,103 Isr: 101,103
Topic: linux-zk Partition: 2 Leader: 103 Replicas: 103,102 Isr: 103,102
Topic: linux-zk Partition: 3 Leader: 102 Replicas: 102,103 Isr: 102,103
Topic: linux-zk Partition: 4 Leader: 103 Replicas: 103,101 Isr: 103,101
4.删除topic
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --delete --topic linux-zk-test
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list
linux-zk
补充
kafka关于修改副本数和分区的数的案例实战:
https://www.cnblogs.com/yinzhengjie/p/9808125.html
5.创建生产者
[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk
>1111
6.创建消费者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk --from-beginning
1111
#从头开始读取 --from-beginning
7.消费者组案例
7.1 创建topic
[[email protected] ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create --partitions 3 --replication-factor 2 --topic linux-zk-test
Created topic linux-zk-test.
7.2 启动生产者
[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk-test
>
7.3 启动消费者加入同一个消费者组
#通过配置文件加入
[[email protected] ~]# egrep -v '^$|^#' /es/softwares/kafka/config/consumer.properties
bootstrap.servers=localhost:9092
#修改下边这一行,起一个名字
group.id=zk
#启动消费者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer.config /es/softwares/kafka/config/consumer.properties --from-beginning
#通过命令行加入
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer-property group.id=zk --from-beginning
7.4 生产者写入数据查看
#生产者写入
[[email protected] ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.103:9092 --topic linux-zk-test
>1111
>2222
#配置文件的消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer.config /es/softwares/kafka/config/consumer.properties --from-beginning
1111
#命令行的消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-test --consumer-property group.id=zk --from-beginning
2222
[[email protected] ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --group zk
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
#组名 #topic #延迟
CONSUMER-ID HOST CLIENT-ID
#consumerID 主机 client-id
zk linux-zk-test 0 1 1 0 console-consumer-be33c37b-f6cf-4787-a528-05aa549ed407 /10.0.0.101 console-consumer
zk linux-zk-test 1 0 0 0 console-consumer-be33c37b-f6cf-4787-a528-05aa549ed407 /10.0.0.101 console-consumer
zk linux-zk-test 2 1 1 0 console-consumer-c75187c0-7533-4267-a81b-ea96e840d490 /10.0.0.102 console-consumer
8.查看内置的"__consumer_offsets "的数据
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning | grep zk
五、kafka堆内存调优
1.修改启动脚本
[[email protected] ~]# vim `which kafka-server-start.sh ` +28
[[email protected] ~]# cat /es/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#注释掉这一行
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xmx256m -Xms256m -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="8888"
fi
2.同步集群启动脚本
[[email protected] ~]# data_rsync.sh `which kafka-server-start.sh `
===== rsyncing elk102.com: kafka-server-start.sh =====
命令执行成功!
===== rsyncing elk103.com: kafka-server-start.sh =====
命令执行成功!
3.重启kafka集群
要滚动重启(重要重要重要)
#要滚动重启(重要重要重要)
#kafka停止的时候,会很慢,要等一会儿
#要一台一台重启
[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# jps
1456 Elasticsearch
2294 QuorumPeerMain
4059 Jps
1455 Elasticsearch
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[[email protected] ~]# kafka-server-stop.sh
[[email protected] ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
4.验证kafka集群的内存大小
[[email protected] ~]# jmap -heap `jps | awk '/Kafka/{print $1}'`
Attaching to process ID 8591, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.291-b10
using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)
Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 268435456 (256.0MB)
六、kafka开源监控组件-kafka-eagle
1.启动kafka的JXM端口
这个步骤上边已经操作,可以不操作
1.1 所有节点停止kafka
kafka-server-stop.sh
1.2所有节点修改kafka的配置文件
#这个步骤上边已经操作,可以不操作
vim `which kafka-server-start.sh`
...
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 注视掉该行,并将下面2行复制即可
export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="8888"
1.3 所有节点启动kafka服务
kafka-server-start.sh -daemon /es/softwares/kafka/config/server.properties
2.启动zookeeper的JMX端口
2.1 修改zookeeper配置文件
部署的时候已经操作,可以不操作
vim /es/softwares/zk/conf/zoo.cfg
# 添加下面的一行,启动zk的4字监控命令
4lw.commands.whitelist=*
2.2修改zookeeper的启动脚本
[[email protected] ~]# vim /es/softwares/zk/bin/zkServer.sh +77
[[email protected] ~]# cat /es/softwares/zk/bin/zkServer.sh
...
if [ "x$JMXHOSTNAME" = "x" ]
then
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.man
agement.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
echo "ZooKeeper remote JMX Hostname set to $JMXHOSTNAME" >&2
ZOOMAIN="-Dcom.sun.management.jmxremote -Djava.rmi.server.hostname=$JMXHOSTNAME -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxre
mote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMa
in"
#加这一行
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
fi
2.3 修改zookeeper环境变量开启JMX
[[email protected] ~]# cat /es/softwares/zk/bin/zkEnv.sh
....
#最后加这几行
JMXLOCALONLY=false
JMXPORT=21812
JMXSSL=false
JMXLOG4J=false
2.4同步脚本
[[email protected] ~]# data_rsync.sh /es/softwares/zk/bin/zkServer.sh
===== rsyncing elk102.com: zkServer.sh =====
命令执行成功!
===== rsyncing elk103.com: zkServer.sh =====
命令执行成功!
[[email protected] ~]# data_rsync.sh /es/softwares/zk/bin/zkEnv.sh
===== rsyncing elk102.com: zkEnv.sh =====
命令执行成功!
===== rsyncing elk103.com: zkEnv.sh =====
命令执行成功!
2.5 启动zk
[[email protected] ~]# zkManager.sh restart
2.6 使用jconsole验证是否能连接JMX端口
3.安装mariadb
3.1 安装mariadb
#如果有mariadb机器的话,可以用自己的
[[email protected] ~]# yum -y install mariadb-server
3.2 配置mariadb的配置文件
[[email protected] ~]# cat /etc/my.cnf
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#加了下边这一行
skip-name-resolve=1
3.3 启动服务并设置开机自启动
[[email protected] ~]# systemctl enable mariadb --now
3.4 创建数据库
[[email protected] ~]# mysql
MariaDB [(none)]> CREATE DATABASE kafka DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
3.5 创建授权用户
MariaDB [(none)]> CREATE USER admin IDENTIFIED BY 'admin';
MariaDB [(none)]> GRANT ALL ON kafka.* TO admin;
MariaDB [(none)]> SHOW GRANTS FOR admin;
3.6 测试连接
[[email protected] ~]# mysql -u admin -padmin -h 10.0.0.102
4.部署kafka-eagle监控
4.1 下载kafka-eagle软件
[[email protected] ~]# ll
-rw-r--r-- 1 root root 81087419 Apr 10 16:09 kafka-eagle-bin-2.0.8.zip
4.2 解压软件包
[[email protected] ~]# unzip kafka-eagle-bin-2.0.8.zip
[[email protected] ~]# tar xf efak-web-2.0.8-bin.tar.gz -C /es/softwares/
4.3 修改配置文件
[[email protected] ~]# cat /es/softwares/efak-web-2.0.8/conf/system-config.properties
#修改了这个名字,下边的是这个名字的也要修改
efak.zk.cluster.alias=linux-zk
#需要和kafka的名字一样
linux-zk.zk.list=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/linux-zk-kafka321
linux-zk.efak.broker.size=20
kafka.zk.limit.size=32
efak.webui.port=8048
linux-zk.efak.offset.storage=kafka
linux-zk.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
#在执行某些操作的时候进行验证时要输入
efak.topic.token=linuxzk
efak.driver=com.mysql.cj.jdbc.Driver
#修改成自己的数据库地址连接
efak.url=jdbc:mysql://10.0.0.102:3306/kafka?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=admin
efak.password=admin
4.4 配置环境变量
[[email protected] ~]# cat >> /etc/profile.d/kafka.sh <<'EOF'
> export KE_HOME=/es/softwares/efak-web-2.0.8
> export PATH=$PATH:$KE_HOME/bin
> EOF
[[email protected] ~]# cat /etc/profile.d/kafka.sh
#!/bin/bash
export ZK_HOME=/es/softwares/zk
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/es/softwares/kafka
export PATH=$PATH:$KAFKA_HOME/bin
export KE_HOME=/es/softwares/efak-web-2.0.8
export PATH=$PATH:$KE_HOME/bin
[[email protected] ~]# source /etc/profile.d/kafka.sh
4.5 修改堆内存大小
[[email protected] ~]# vim `which ke.sh `
#修改下边的这一行,把1G修改为256m
export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRe
gionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
4.6 启动服务
[[email protected] ~]# ke.sh start
....
....
....
[2023-04-11 21:10:53] INFO: Status Code[0]
[2023-04-11 21:10:53] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )
Version 2.0.8 -- Copyright 2016-2021
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.0.0.101:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
4.7 测试访问
七、filebeat将数据写入到Kafka实战
[[email protected] /es/softwares/filebeat-7.17.5-linux-x86_64]# cat config/stdin-to-kafka.yaml
filebeat.inputs:
- type: stdin
# 将数据输出到kafka
output.kafka:
# 指定kafka主机列表
hosts:
- 10.0.0.101:9092
- 10.0.0.102:9092
- 10.0.0.103:9092
# 指定kafka的topic
topic: "linux-zk-kafka"
#启动filebeat
[[email protected] /es/softwares/filebeat-7.17.5-linux-x86_64]# filebeat -e -c config/stdin-to-kafka.yaml
#filebeat输入
....
....
2023-04-11T21:19:12.054+0800 INFO [stdin.harvester] log/harvester.go:309 Harvester started for paths: [] {"harvester_id": "75c94ca6-3307-4bdd-bcd8-39449590e0ac"}
1111 #输入内容
2023-04-11T21:19:22.182+0800 INFO [publisher_pipeline_output] pipeline/output.go:143 Connecting to kafka(10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092)
#kafka创建消费者读取
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic linux-zk-kafka --consumer-property group.id=zk --from-beginning
{"@timestamp":"2023-04-11T13:19:21.165Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.5"},"input":{"type":"stdin"},"ecs":{"version":"1.12.0"},"host":{"name":"elk103.com"},"agent":{"hostname":"elk103.com","ephemeral_id":"e77e2788-be08-4880-ad6f-02a572194e83","id":"21611ec8-4839-4bb7-8161-b55a15616548","name":"elk103.com","type":"filebeat","version":"7.17.5"},"log":{"file":{"path":""},"offset":0},"message":"1111"}
八、logstash从kafka拉取数据并解析json格式案例
[[email protected] ~]# cat config/kafka-to-stdout.conf
input {
kafka {
# 指定kafka集群地址
bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
# 指定消费的topic
topics => ["linux-zk-kafka"]
# 指定消费者组
group_id => "linux-zk-kafka-logstash"
# 指定消费的偏移量,"earliest"表示从头读取数据,"latest"表示从最新的位置读取数据.
auto_offset_reset => "earliest"
}
}
filter {
json {
# 对指定字段进行json格式解析。
source => "message"
}
mutate {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
output {
stdout {}
}
#启动logstash
[[email protected] ~]# logstash -rf config/kafka-to-stdout.conf
#输出
{
"@timestamp" => 2023-04-11T13:19:21.165Z,
"@version" => "1",
"message" => "1111"
}
标签:10.0,zk,--,com,kafka,linux,root
From: https://www.cnblogs.com/world-of-yuan/p/17504013.html