1. 消息队列
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。
有两种主要的消息传递模式:点对点模式、发布-订阅模式。Kafka 就是一种“发布-订阅”模式。
1.1 两种模式
a. 点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除。
〈消息生产者〉将消息持久化到 Queue 中,然后〈消息消费者〉从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以〈消息消费者〉不可能消费到已经被消费的消息。
Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
b. 发布/订阅模式
一对多,数据发布后推送给所有订阅者,订阅者消费数据之后不会清除消息。
〈消息发布者〉将消息持久化到 Topic 中,同时有多个〈消息订阅者〉订阅(消费) 该消息。与点对点方式不同,消费者可以订阅一个或多个 Topic,消费者可以消费该 Topic 中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
发布者发送到 Topic 的消息,只有订阅了 Topic 的订阅者才会拉到消息。
1.2 优点
- 【解耦】允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 【可恢复性】系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 【缓冲】有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 【灵活性&峰值处理能力】在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 【异步通信】很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
2. Kafka 简介
Kafka 是一个分布式、分区的、多副本的、多订阅者,基于 ZooKeeper 协调的分布式消息系统,常见可以用于 Web/Nginx 日志、访问日志、消息服务等,由 Scala 写成,主要应用于大数据实时处理领域。
Kafka 对消息保存时根据 Topic 进行分类,发送消息者称为 Producer,消息接收者称为 Consumer。此外,Kafka 集群有多个 Kafka 实例组成,每个实例称为 Broker。
无论 Kafka 集群还是 Consumer 都依赖 ZooKeeper 保存一些 meta 信息,来保证系统可用性。
【Kafka 最新定义】Kafka 是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
3. Kafka 架构
在 Zk 中记录谁是 leader,Kafka 2.8.0 以后也可以配置不采用 Zk。
v0.9 之前 offset 存储在 Zk,v0.9 及之后 offset 存储在本地。
服务端(Broker)和客户端(Producer、Consumer)之间的通信是通过 TCP 协议来完成的。
3.1 Producer
消息生产者,就是向 Kafka Broker 发消息的客户端;
3.2 Consumer
Consumer 消息消费者,向 Kafka Broker 取消息的客户端;
Consumer Group 消费者组(CG),由多个 Consumer 组成,消费者组之间互不影响。消费者组内每个消费者负责消费不同 Partition 的数据,一个 Partition 只能由一个组内消费者消费,也即同一 Topic 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给某一个 Consumer)的手段。一个 Topic 可以对应多个 Consumer Group。如果需要实现广播,只要每个 Consumer 有一个独立的 Group 就可以了。要实现单播只要所有的 Consumer 在同一个 Group 里。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。
实际上,Kafka 的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用 Storm 这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop 这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的 Consumer 属于不同的 Consumer Group 即可。
3.3 Topic
每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。
物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上但用户只需指定消息的 Topic 即可生产或消费数据,而不必关心数据存于何处。可以理解为一个队列,生产者和消费者面向的都是 Topic。
3.4 Partition
为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。
创建一个 Topic 时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,Kafka 在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被 append 到该 Partition 中,属于“顺序写磁盘",因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。
每个 Partition 是一个有序的队列,Partition 中的每条消息都会被分配一个有序的 id(offset)。Kafka 只保证一个 Partition 中的消息能顺序发给 Consumer,不保证一个 Topic 的整体(多个 Partition 间)顺序。在需要严格保证消息的消费顺序的场景下,需要将 Partition 数目设为 1。
【Producer 消息路由】
Producer 发送消息到 Broker 时,会根据〈Partition 机制〉选择将其存储到哪一个 Partition。如果〈Partition 机制〉设置合理,所有消息可以均匀分布到不同的 Partition 里,这样就实现了负载均衡。
如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 Broker 的不同 Partition 里,极大的提高了吞吐率。
可以在 $KAFKA_HOME/config/server.properties
中通过配置项 num.partitions
来指定新建 Topic 的默认 Partition 数量,也可在创建 Topic 时通过参数指定,同时也可以在 Topic 创建之后通过 Kafka 提供的工具修改。
在发送一条消息时,可以指定这条消息的 key,Producer 根据这个 key 和〈Partition 机制〉来判断应该将这条消息发送到哪个 Parition。〈Partition 机制〉可以通过指定 Producer 的 paritition. class
这一参数来指定,该 class 必须实现 kafka.producer.Partitioner
接口。
3.5 Broker
一台 Kafka 服务器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。
- 如果某 Topic 有 N 个 Partition,集群有 N 个 Broker,那么每个 Broker 存储该 Topic 的一个 Partition;
- 如果某 Topic 有 N 个 Partition,集群有 (N+M) 个 Broker,那么其中有 N 个 Broker 存储该 Topic 的一个 Partition,剩下的 M 个 Broker 不存储该 Topic 的 Partition 数据;
- 如果某 Topic 有 N 个 Partition,集群中 Broker 数目少于 N 个,那么一个 Broker 存储该 Topic 的一个或多个 Partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
3.6 Replica
副本,为保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个 Partition 都有若干个副本,一个 Leader 和若干个 Follower;
为了更好的做负载均衡,Kafka 尽量将所有的 Partition 均匀分配到整个集群上。一个典型的部署方式是一个 Topic 的 Partition 数量大于 Broker 的数量。同时为了提高 Kafka 的容错能力,也需要将同一个 Partition 的 Replica 尽量分散到不同的机器。实际上,如果所有的 Replica 都在同一个 Broker 上,那一旦该 Broker 宕机,该 Partition 的所有 Replica 都无法工作,也就达不到 HA(High Availability) 的效果。同时,如果某个 Broker 宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有 Broker 上。
Kafka 分配 Replica 的算法如下:
- 将所有 Broker(假设共 N 个 Broker)和待分配的 Partition 排序;
- 将第 i 个 Partition 分配到第
i%N
个 Broker 上; - 将第 i 个 Partition 的第 j 个 Replica 分配到第
(i+j)%N
个 Broker 上;
3.7 Leader
每个 Partition 多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
3.8 Follower
每个 Partition 多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
Leader 副本负责处理读写请求,Follower 副本只负责与 Leader 副本的消息同步。
3.9 Push & Pull
作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 Broker push 消息并由 Consumer 从 Broker Pull 消息。一些 Logging-Centric System,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 Push 模式。事实上,Push 模式和 Pull 模式各有优劣。
Push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。Push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 Pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,Pull 模式更合适。Pull 模式可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式 —— 即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
4. 安装部署
(1)安装&初始化配置信息
# 解压安装包
[root@localhost opt]# tar -zxvf software/kafka_2.12-3.0.0.tgz -C /opt/module/
# 修改解压后的文件夹名称
[root@localhost opt]# mv kafka_2.12-3.0.0 kafka
[root@localhost opt]# cd kafka
# 创建 logs 文件夹
[root@localhost kafka]# mkdir logs
# 修改配置文件
[root@localhost config]# vi server.properties
# broker 的全局唯一编号,不能重复
broker.id=0
# Kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
# Zookeeper 集群地址 (在 zk 根目录下创建 /kafka,方便管理)
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a
# zk server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify
# the root directory for all kafka znodes.
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
【注】加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的,也可以实现多个 Kafka 集群复用一套 Zk 集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于上述 zookeeper.connect
的配置值这种,如果不指定 chroot,那么默认使用 Zk 的根路径。
(2)在 /etc/profile.d/my_env.sh 中配置 Kafka 对应的环境变量,然后刷新 source /etc/profile
;
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
(3)把 /kafka、/etc/profile.d/my_env.sh 分发到 hadoop103、hadoop104,记得修改 broker.id;
(4)编写集群启动的 Kafka 脚本 kafka.sh(注意要先启动 ZooKeeper!)
#!/bin/bash
case $1 in
"start")
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- Kafka-$i start ----------
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- Kafka-$i stop ----------
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
;;
esac
(5)使用 KafkaTools 连接
5. 操作和说明
> # 1. 创建 Topic (replication-factor 是 Leader 和 Follower 的总数)
> # --topic 定义主题名称 | --replication-factor 定义副本数 | --partitions 定义分区数
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic firstTopic --replication-factor 1 --partitions 1
> # 2. 查看当前服务器中的所有 Topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
> # 3. 发送消息到 Topic
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic firstTopic
> # 4. 消费 Topic 中的消息 (--from-beginning 会把 Topic 中以往所有的数据都读取出来)
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic firstTopic
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic firstTopic
> # 5. 查看某个 Topic 的详情
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic firstTopic
> # 6. 修改 Partition 数 (注意:分区数只能增加,不能减少!)
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic firstTopic --partitions 7
> # 7. 删除 Topic (server.properties 中 delete.topic.enable=true 配置是否为逻辑删除)
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic firstTopic
> # 8. 查看 Kafka 版本信息
> find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
Broker 指的是 Kafka 的服务端,可以是一台服务器也可以是一个集群,Producer 和 Consumer 都相当于这个服务端的客户端:
- 对于 Producer,
--broker-list
参数指定了所使用的 Broker。 - 对于 Consumer,由
--bootstrap-server
参数设置。- v0.8 以前的 Kafka,消费的进度(offset)是写在 ZK 中的,所以 Consumer 需要知道 ZK 的地址,查看的方法是使用
./zookeeper-client
,然后ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id]
,这个是查看某个 group_id 的某个 Topic 的 offset。 - 后来的版本都统一由 Broker 管理了(Consumer 的信息将会存放在 Kafka 之中),所以就用
--bootstrap-server
。
- v0.8 以前的 Kafka,消费的进度(offset)是写在 ZK 中的,所以 Consumer 需要知道 ZK 的地址,查看的方法是使用
【9092 是 Broker 集群的默认端口】
【修改 Partition 数量】
【删除 Topic】
标签:01,--,overview,Partition,Broker,Kafka,Topic,消息 From: https://www.cnblogs.com/liujiaqi1101/p/17113673.html