部署Kafka
#官方文档
http://kafka.apache.org/quickstart
1、环境准备
#在三个节点提前部署jdk和zookeeper
[root@node1 ~]#java -version
openjdk version "1.8.0_342"
[root@node1 ~]#zkServer.sh version
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Apache ZooKeeper, version 3.7.1 2022-05-07 06:45 UTC
#确保三个节点的zookeeper启动
[root@node1 ~]#systemctl start zookeeper
[root@node1 ~]#systemctl status zookeeper.service
2、下载kafka解压
官网:http://kafka.apache.org/downloads
清华源:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
#所有节点
[root@node1 ~]#tar xf kafka_2.13-3.0.0.tgz -C /usr/local
[root@node1 ~]#cd /usr/local
[root@node1 local]#ln -sv kafka_2.13-3.0.0 kafka
[root@node1 local]#echo 'PATH=/bin/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node1 local]#. /etc/profile.d/kafka.sh
3、修改配置文件
#第一个节点:
[root@node1 local]#vim /usr/local/kafka/config/server.properties
broker.id=1 # 每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件
listeners=PLAINTEXT://10.0.0.101:9092 # 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data # kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=3 # 设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
log.retention.hours=168 # 设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 # 指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 # 设置连接zookeeper的超时时间,单位为ms
[root@node1 local]#mkdir /usr/local/kafka/data #数据目录
[root@node1 local]#scp /usr/local/kafka/config/server.properties 10.0.0.102:/usr/local/kafka/config/server.properties
[root@node1 local]#scp /usr/local/kafka/config/server.properties 10.0.0.103:/usr/local/kafka/config/server.properties
=============================================================================
#@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ 其他配置说明 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
num.network.threads=3 # 处理网络请求的线程数量,默认为3个
num.io.threads=8 # 执行磁盘IO操作的线程数量,默认为8个
socket.send.buffer.bytes=102400 # socket服务发送数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400 # socket服务接受数据的缓冲区大小,默认100KB
socket.request.max.bytes=104857600 # socket服务所能接受的一个请求的最大大小,默认为100M
default.replication.factor=3 # 设置副本数量为3,当Leader的Replication故障,会进行故障自动转移。
num.recovery.threads.per.data.dir=1 # 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
log.flush.interval.messages=10000 # 消息刷新到磁盘中的消息条数阈值
log.flush.interval.ms=1000 # 消息刷新到磁盘中的最大时间间隔,1s
log.retention.bytes=1073741824 # 日志保留大小,超出大小会自动删除,默认为1G
log.segment.bytes=1073741824 # 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.retention.check.interval.ms=300000 # 每隔多长时间检测数据是否达到删除条件,300s
#第二个节点:
[root@node2 local]#vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.0.0.102:9092
#第三个节点:
[root@node3 local]#vim /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://10.0.0.103:9092
4、启动服务
[root@node1 local]#kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#@注:vim /usr/local/kafka/bin/kafka-server-start.sh 里的" -Xmx1G-Xms1G" 可以调整内存
# 确保服务启动状态
[root@node1 local]#ss -ntlp | grep 9092
LISTEN 0 50 [::ffff:10.0.0.101]:9092 *:* users:(("java",pid=46161,fd=142))
# 打开zooinspector可以看到三个id
Broker 依赖于 Zookeeper,每个Broker 的id 和 Topic、Partition这些元数据信息都会写入Zookeeper 的 ZNode 节点中
consumer 依赖于Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的offset保存到 Zookeeper 中,下次消费在当前offset往后继续消费.kafka0.9 之前Consumer 的offset 存储在 Zookeeper 中,kafka0.9 以后offset存储在本地
Partition 依赖于 Zookeeper,Partition 完成Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的
5、使用service文件启动
[root@node1 local]#vim /lib/systemd/system/kafka.service标签:教程,部署,local,server,kafka,usr,node1,Kafka,root From: https://blog.51cto.com/dayu/5817819
[Unit]
Description=Apache kafka
After=network.target
[Service]
Type=simple
PIDFile=/usr/local/kafka/kafka.pid
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/bin/kill -TERM ${MAINPID}
Restart=always
RestartSec=20
[Install]
WantedBy=multi-user.target
[root@node1 local]#systemctl daemon-reload
[root@node1 local]#systemctl start kafka.service
[root@node1 local]#systemctl status kafka