一、Kafka简介
1.1、定义
- 旧定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列。 - 新定义
Kafka 是一个开源的分布式事件流平台,用于数据管道、流分析、数据集成和关键任务的应用。
1.2、使用场景
主要用于大数据实时处理领域。
- 缓冲: 有助于控制和优化数据流经过系统的速度。
- 消峰: 在访问量剧增的情况下,保证应用不会因突发的超负荷请求而崩溃。
- 解耦: 保证程序的可扩展性。
- 异步通信:
1.3、使用模式
- 点对点: 消费者主动拉取消息,消息收到后清除消息。
- 发布/订阅模式: 可以有多个topic主题;消息被消费者消费后不会删除;每个消费者相互独立,都可以消费到数据。
1.4、基础架构及概念
最简单的流程:生产者 -> 消息主题 -> 消费者。
考虑到一台主机可能存不下 海量的消息 数据,引入分区,将topic进行 分区;
考虑到分区数据的高可用,引入副本,可为每个分区创建副本;
考虑到一个消费者消费信息的瓶颈,引入 消费者组。
- Producer: 消息生产者,向 Kafka broker 发送消息。
- Consumer: 消息消费者,从 Kafka broker 消费消息。
- Consumer Group(CG):消费者组,由多个Consumer组成。
消费者组与消费者组之间对消息的消费互不影响;
消费者组内的消费者之间,不能消费同一个Topic的同一分区数据,即一个Topic的一个分区只能由同一消费者组内的一个消费者消费。 - Broker: 一台Kafka服务器。一个broker可容纳多个topic。一个集群由多个broker组成。
- Topic: 消息主题,Kafka将一组消息抽象归纳为一个主题。主题就是对消息的分类,生产者将消息发送到特定的主题,消费者订阅主题或是主题的某些分区进行消费。
- Parition: 一个topic可分为多个Parition,每个partition是一个有序队列。
- Replica: 分区副本。每个Parition可设置若干副本。 每个分区的所有副本有且只有一个Leader,其他为Follower。
- Leader: 分区Leader副本,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- Follower:分区Leader副本,实时从Leader中同步数据,当Leader故障时,某个follower将成为新的Leader。
- Message:消息。消息是Kafka通信的基本单位,由固定长度的消息头和可变长度的消息体构成。在java重新实现的客户端中,Message被称之为Record。
二、Kafka安装
2.1、准备环境
- Linux操作系统
- Java运行环境(1.8或以上)
- zookeeper 集群环境,可参照 Zookeeper集群部署 。
- 服务器列表:
2.2、准备安装介质
分别登录server1、server2、server3执行,操作、配置相同:
##更新或安装wget命令
yum -y install wget
##创建安装目录
mkdir -p /usr/local/services/kafka
##获取安装包kafka_2.12-3.0.0.tgz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz
##解压缩kafka_2.12-3.0.0.tgz
tar -zxvf kafka_2.12-3.0.0.tgz
2.3、修改环境配置
- 分别登录server1、server2、server3添加主机名,操作、配置相同:
vi /etc/hosts
##添加如下内容
168.5.7.75 server1
168.5.7.76 server2
168.5.7.77 server3
:wq
- 分别登录server1、server2、server3添加kafka环境变量,操作、配置相同:
vi ~/.bash_profile
##添加如下内容
export KAFKA_HOME=/usr/local/services/kafka/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
:wq
source /etc/profile
说明:再任一路径下输入 kafka 按 Tab 键后会补全 Kafka 相关脚本.sh,即Kafka 环境变量配置成功。 因 Kafka 脚本运行时会加载 /config 路径下的相关配置文件,故当不在 Kafka 安装目录 bin 下执行相关脚本时, 需要指定配置文件绝对路径。
2.4、修改Kafka配置
登录server1执行操作:
cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties
## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=1
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka
:wq
登录server2执行操作:
cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties
## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=2
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka
:wq
登录server3执行操作:
cd $KAFKA_HOME/config
cp server.properties server.properties.$(date +%Y%m%d)
vi server.properties
## 修改文件内配置如下
## broker 的全局唯一编号,不能重复,只能是数字
broker.id=3
## kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/data/kafka-logs
port=9093
## 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=server1:2181,server2:2181,server3:2181/kafka
:wq
2.5、Kafka服务启停
## 默认 zookeeper 集群启动成功,查看命令:./zkServer.sh status
## 启动命令
sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
## 停止命令
sh ${KAFKA_HOME}/bin/kafka-server-stop.sh
2.6、Kafka集群启停脚本
说明:本脚本基于SSH服务器免密登录,如集群未配置SSH,参照:《SSH安装配置》 。
- 启动脚本:start-kafka-cluster.sh
#!/bin/bash
brokers="server1 server2 server3"
KAFKA_HOME="/usr/local/services/kafka/kafka_2.11-2.3.0"
KAFKA_NAME="kafka_2.11-2.3.0"
echo "INFO : Begin to start kafka cluster ..."
for broker in $brokers
do
echo "INFO : Starting ${KAFKA_NAME} on ${broker} ..."
ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties"
if [[ $? -eq 0 ]]; then
echo "INFO:[${broker}] Start successfully"
fi
done
echo "INFO:Kafka cluster starts successfully !"
为脚本添加执行权限:
chmod a+x start-kafka-cluster.sh
- 停止脚本:stop-kafka-cluster.sh
#!/bin/bash
brokers="server1 server2 server3"
KAFKA_HOME="/usr/local/services/kafka/kafka_2.11-2.3.0"
KAFKA_NAME="kafka_2.11-2.3.0"
echo "INFO : Begin to stop kafka cluster ..."
for broker in $brokers
do
echo "INFO : Shut down ${KAFKA_NAME} on ${broker} ..."
ssh ${broker} "source /etc/profile;bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"
if [[ $? -ne 0 ]]; then
echo "INFO : Shut down ${KAFKA_NAME} on ${broker} is down"
fi
done
echo "INFO : kafka cluster shut down completed!"
为脚本添加执行权限:
chmod a+x stop-kafka-cluster.sh
三、Kafka-Shell API
3.1、主题相关
${KAFKA_HOME}/bin/kafka-topics.sh
## 参数详解
## --bootstrap-server <io:port> 指定连接 kafka-broker 节点
## --topic <topic_name> 指定操作的 主题
## --create 指定对主题的操作:新增
## --delete 指定对主题的操作:删除
## --alter 指定对主题的操作:编辑
## --list 指定对主题的操作:查看列表
## --describe 指定对主题的操作:查看详情
## --partitions <num> 设置主题分区数。
## -- replication-factor <num> 设置分区副本数。
## --config <key-value> 更改系统默认设置
## 案例:查看集群中所有 topic
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --list
## 案例:创建一个名为 hello 的主题,其分区数为:1,副本数为:3。
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --create --partitions 1 --replication-factor 3 --topic hello
## 案例:查看主题 hello 详情
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --describe --topic hello
## 案例:修改主题 hello 分区数(注意:通过alter命令只能增加分区数,不能减少)
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --alter --topic hello --partitions 3
## 案例:删除主题 hello
kafka-topics.sh --bootstrap-server 168.5.7.75:9092 --delete --topic hello
3.2、生产者相关
${KAFKA_HOME}/bin/kafka-console-producer.sh
## 参数详解
## --bootstrap-server <io:port> 指定连接 kafka-broker 节点
## --topic <topic_name> 指定操作的 主题
## 案例:生产者连接 broker 生产消息
kafka-console-producer.sh --bootstrap-server 168.5.7.75:9092 --topic hello
## 进入命令行后发送消息即可
>
3.3、消费者相关
${KAFKA_HOME}/bin/kafka-console-consumer.sh
## 参数详解
## --bootstrap-server <io:port> 指定连接 kafka-broker 节点
## --topic <topic_name> 指定操作的 主题
## –from-beginning 从头开始消费。
## –group <group_id> 指定消费者组名称。
## 案例:消费者连接 broker 消费消息,默认只消费连接之后生产的新消息
kafka-console-consumer.sh --bootstrap-server 168.5.7.75:9092 --topic hello
## 案例:消费者连接 broker 消费消息,指定从主题中的第一条消息开始消费
kafka-console-consumer.sh --bootstrap-server 168.5.7.75:9092 --from-beginning --topic hello
四、Kafka-Java API
TODO:参考gitee项目地址。