一、什么是kafka?
Apache kafka is a distributed streaming platform,即官方定义 kafka 是一个分布式流式计算平台。而在大部分企业开发人员中,都是把 kafka 当成消息系统使用,即它是一个分布式消息队列。Kafka也是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域;支持分区的、多副本,基于zookeeper协调的分布式消息系统。
二、kafka基础架构和核心概念
在 Kafka 中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。
生产者(Producer):消息生产者,就是向 kafka broker 发消息的客户端,生产者程序通常持续不断地向一个或多个主题发送消息。
消费者(Consumer):消息消费者,向 kafka broker 取消息的客户端,消费者就是订阅这些主题消息的客户端应用程序。
和生产者类似,消费者也能够同时订阅多个主题的消息。我们把生产者和消费者统称为客户端(Clients)。你可以同时运行多个生产者和消费者实例,这些实例会不断地向 Kafka 集群中的多个主题生产和消费消息。
消费者组Consumer Group (CG):由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
主题(topic) :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
分区(Partition):为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
副本(Replica):副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对 象都是 leader。
follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower
三、kafka 架构
四、安装zookeeper
1、解压zookeeper-3.4.5,目录为D:\zookeeper\zookeeper-3.4.5
2、 将“zoo_sample.cfg”重命名为“zoo.cfg”
3、 打开“zoo.cfg”找到并编辑dataDir= d:\\zooker\\log
4、 添加系统变量:ZOOKEEPER_HOME= D:\zookeeper\zookeeper-3.4.5
5、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
6、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
7、 打开新的cmd,输入“zkServer“,运行Zookeeper
8、 命令行提示如下:说明本地Zookeeper启动成功
在window服务中配置自动启动的界面
打开cmd窗口:输入以下命令打开clent端,查看zookeeper是否可以正常使用
D:\zookeeper\zookeeper-3.4.5\bin> ./zkCli.cmd
五、windows下安装kafka
1、 下载安装包
http://kafka.apache.org/downloads
注意要下载二进制版本
2、 解压并进入Kafka目录,目录为:D:\kafka_3.3.1
3、 进入config目录找到文件server.properties并打开
4、 找到并编辑log.dirs= D://kafka_3.3.1//kafka-logs
5、 找到并编辑zookeeper.connect=localhost:2181
6、 Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
7、打开cmd命令窗口,cd D:\kafka_3.3.1,启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
启动成功后如下界面:
若启动失败,先删除kafka中配置的日志文件,再次启动
Kafka启动报错AccessDeniedException
报错信息:
ERROR Error while renaming dir for test-0 in log dir D:\kafka_3.3.1\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: D:\kafka_3.3.1\kafka-logs\test-0 -> D:\kafka_3.3.1\kafka-logs\test-0.19a32e99d23e4dd484a27c1f94b8fc8e-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
出现的原因是,刚才不小心手动删掉了logs目录,导致实际logs和zookeeper中的状态不一致。
解决办法:删掉zookeeper的data文件,重启就好了
六、使用kafka
1、创建test主题
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1
2、查看主题
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
3、删除主题test
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic test
4、查看某个topic的详细信息
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test
5、查看分组
.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --list
6、查看某个分组详细信息
.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group javagroup
七、CentOS下安装kafka
1、下载kafka
https://kafka.apache.org/downloads
2、解压kafa
mkdir /usr/local/kafka
tar -zxvf /opt/software/kafka_2.12-3.5.0.tgz -C /usr/local/kafka/
3、创建日志文件
mkdir /usr/local/kafka/kafka_2.12-3.5.0/kafka-logs
4、修改server.properties配置文件
log.dirs=/usr/local/kafka/kafka_2.12-3.5.0/ kafka-logs //数据的存储位置,程序启动后会自行生成
zookeeper.connect=localhost:3181 //注意更改自己的ip地址
5、启动kafka
cd /usr/local/kafka/kafka_2.12-3.5.0
./bin/kafka-server-start.sh config/server.properties
6、创建topic:test1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test2
7、查看topic(主题)列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
默认在zookeeper下会创建/config/topics,在其目录下创建了topic: test2
8、启动控制台生产者,并输入一条消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2
9、打开另一个端口,启动控制台消费者,显示了生产者输入的消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning
10、停止kafka
./bin/kafka-server-stop.sh
在生产者控制台输入hello
,消费者控制台,就可以消费到生产者的消息,输出 hello
kafka 的各种操作命令
八、kafka集群部署
前提条件,其中一台服务器已经安装好jdk、zookeeper
1、分发kafka安装目录到其他两台机器
scp -r /usr/local/kafka/ 192.168.154.128:/usr/local/
scp -r /usr/local/kafka/ 192.168.154.130:/usr/local/
2、只需修改server.properties配置文件的
broker.id=2 (192.168.154.130)
broker.id=3 (192.168.154.127)
3、配置环境变量
vim /etc/profile
在最下面添加如下
#set zookeeper environment
export ZK_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin
export PATH=$PATH:$ZK_HOME/bin
4、更新环境变量
source /etc/profile
5、启动
cd /usr/local/kafka/kafka_2.12-3.5.0
./bin/kafka-server-start.sh config/server.properties
若启动失败,则加上- daemon参数启动(对默认的配置进行覆盖)
./bin/kafka-server-start.sh -daemon config/server.properties
6、编写shell脚本,一键启动三台机器(没有成功),以后请教其他人
在任何一台机器的/usr/local/kafka/kafka_2.12-3.5.0/bin目录下,创建脚本文件kf.sh
cd /usr/local/kafka/kafka_2.12-3.5.0
vim ./bin/kf.sh
粘贴如下脚本:
#!/bin/bash
case $1 in
"start"){
for i in 192.168.154.128 192.168.154.129 192.168.154.129
do
echo --------------------- $i kafka 启动 -----------------
ssh $i "/usr/local/kafka/kafka_2.12-3.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-3.5.0/config/server.properties"
done
}
;;
"stop"){
for i in 192.168.154.128 192.168.154.129 192.168.154.129
do
echo ---------------------- $i kafka 停止 -----------------
ssh $i "/usr/local/kafka/kafka_2.12-3.5.0/bin/kafka-server-stop.sh"
done
}
;;
esac
# 保存退出后,修改执行权限
chmod 777 ./bin/kf.sh
启动kafka
./bin/kf.sh start
关闭kafka
./bin/kf.sh stop
其他分享:
https://kafka.apache.org/
https://cloud.tencent.com/developer/article/2005281