传统的消息队列的主要应用场景包括:
缓存/ 消峰
解耦
异步通信
消息队列的 两种模式
点对点模式
发布/ 订阅模式
基础架构
1.为方便扩展,并提高吞吐量,一个topic分为多个partition
2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3.为提高可用性,为每个partition增加若干副本,类似NameNode HA
4. ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK
Producer :消息生产者,就是向 Kafka broker发消息的客户端。
Consumer :消息消费者,向 Kafka broker 取消息的客户端。
Consumer Group (CG ):消费者组,由多个 consumer组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个 组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。
Broker :一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker可以容纳多个 topic。
Topic:可以理解为一个队列,个 生产者和消费者面向的都是一个 topic。
Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,个 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
Replica :副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
Leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
Follower :每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
安装部署
配置文件的配置 server.properties
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理) zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka 启动kafka bin/kafka-server-start.sh -daemonconfig/server.properties 停止kafka bin/kafka-server-stop.sh 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息, Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
生产经验
1:生产者 如何提高吞吐量 --修改配置参数
batch.size:批次大小,默认16k
linger.ms:等待时间,修改为5-100ms
compression.type:压缩snappy
RecordAccumulator:缓冲区大小,修改为64m
2:数据可靠性
properties.put(ProducerConfig.ACKS_CONFIG, "all");
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR 队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0 很少使用;acks=1,一般用于传输普通日志 , 允许丢个别数据;acks=-1,一般用于传输和钱相关的数据 ,对可靠性要求比较高的场景 。
数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小 副本 数量大于等于2
3:数据去重 --幂等
至少一次(At Least Once)=ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
最多一次(At Most Once)= ACK级别设置为0
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2)
4:数据有序
单分区内,有序;多分区,分区与分区间无序;
Kafka 副本
Kafka 副本作用:提高数据可靠性
Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率
Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据
Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR:意为和Leader保持同步的Follower+Leader集合,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参
数设定,默认30s
OSR:表示 Follower 与 Leader副本同步时,延迟过多的副本
leader故障:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
kafka存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断 追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了 分片和 索引机制,
将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引
kafka高效的原因
1)Kafka 本身是分布式集群,可以采用分区技术,并行度高
2)读数据采用稀疏索引,可以快速定位要消费的数据
3)顺序写磁盘
4)页缓存 + 零拷贝技术
消费者
Kafka 消费方式
1:pull 拉模式 :consumer采用从broker中主动拉取数据,Kafka 采用这种方式。(不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据)
2:push 推模式 :Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者不建议超过主题分区数,否则会有部分消费者空闲,不会接收到消息
Zookeeper可视化工具
prettyZoo、ZooKeeperAssistant
分区分配策略之Range容易产生数据倾斜!
分区分配策略之RoundRobin
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。。
分区分配策略之RoundRobin
分区数可以增加,但是不能减少。
如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。
kafka监控
Kafka图形界面连接工具:
1、Offset Explorer (以前叫 Kafka Tool),官网:https://www.kafkatool.com/
2、CMAK(以前叫 Kafka Manager)基于zookeeper方式启动kafka才可以使用该web管理后台 官网:https://github.com/yahoo/CMAK
3、EFAK(以前叫 kafka-eagle) 官网:https://www.kafka-eagle.org/
官网地址:https://kafka.apache.org/36/documentation.html
标签:知识点,副本,消费者,分区,Kafka,kafka,Leader From: https://www.cnblogs.com/weixsa/p/18086069