首先,让我们来看一下基础的消息(Message)相关术语:
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群 |
Topic | Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic |
Producer | 消息⽣产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个Consumer属于⼀个特定的Consumer Group,⼀条消息可以被多个不同的Consumer Group消费,但是⼀个Consumer Group中只能有⼀个Consumer能够消费该消息 |
Partition | 物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的 |
一、代理商Broker
在之前我们已经为大家介绍了生产者向消息队列中投递消息,消费者从消息队列中拉取数据。
在kafka消息队列中有一个非常重要的概念就是代理Broker,大家可以想象生活中的商品代理商是做什么的?进货、存货、销货。
kafka的代理Broker也承担着同样的作用:接收消息、保存消息、为消费者提供消息。
具体到kafka架构层面,我们可以认为一个Broker代理就是一个kafka的服务实例。
kafka可以启动多个服务实例,组成一个具有多个Broker代理的服务集群。
通常一个集群内的Broker越多,kafka集群的整体吞吐能力就越强。
这个也好理解,现实生活中一个产品的代理商越多,销售能力就越强,是一个道理。
因为kafka通常时进行分布式部署,所以一个物理服务器(一个操作系统)通常只部署启动一个kafka实例,所以在这种场景下Broker代理就可以理解是一台服务器。
可不可以一个服务器部署多个kafka实例?可以,通过修改端口避免端口冲突是可以实现的,但是这样不好。因为kafka之所以分布式部署是考虑到高可用,
即:一台服务器宕机,kafka集群仍然可用。如果一个服务器部署多个kafka实例,一旦该服务器宕机,那么影响面就很大了,很可能直接将kafka集群拖垮。
=====================================================================================
二、主题与主题分区
代理商Broker可以帮助上产厂家对商品进行“进销存”,但是有一个问题,商品没有进行分类。
显然茅台酒和乳制品、猪肉等不同产品的销售周期、销售频率是不一样的,为了有效的安排商品进销存需要对商品进行分类。
同理,kafka接收到的消息有些是需要快速处理数据,有些是高频但时效性要求低的数据。所以要对消息数据进行分类,每一个分类被称为一个Topic(主题)。
笔者觉得Topic这个词在这里翻译成"渠道"会更好,但是更多的人已经认可“主题”这种翻译方式了,所以先入为主。
下图中中间实线部分是一个单kafka实例的Broker,包含三个Topic。也就是该代理商代理酒类、乳制品、零食三种商品。Topic主题是一个逻辑概念,用来对消息进行分类。
上图中虚线范围代表主题Topic,管道状图形代表主题的分区partition。
代理商将商品按照主题Topic进行分类,那么还有一个问题,如果一个代理商的工作是单线程的,处理高并发的工作时就捉襟见肘了。
所以,我们为Topic主题引入分区的概念,分区是一个实实在在的物理存在的队列数据结构用于存放数据,占用系统内存以及磁盘数据存储等资源。
- 分区是一个主题Topic的分区,所以主题Topic包含一个或多个分区。一个Topic包含多少个分区取决于该主题下的商品处理的吞吐量能力需求。
- 因为Topic是一个逻辑概念,所以分区也可以被叫做"分区代理",一个代理Broker包含多个分区。就像一个省级代理可以包含多个地市级代理。
疑问点:
kafka的工作方式和其他MQ基本相同,只是在一些名词命名上有些不同。为了更好的讨论,这里对这些名词做简单解释。通过这些解释应该可以大致了解kafka MQ的工作方式。
- Producer (P):就是网kafka发消息的客户端
- Consumer (C):从kafka取消息的客户端
- Topic (T):可以理解为一个队列
- Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个 topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还 可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
- Broker (B):一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Partition(P):为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
===========================================================================================
分区副本与高可用
解决完吞吐量的问题,下面就是如何保证kafka集群高可用的问题。
上文已经介绍:一个kafka集群包含多个Broker实例,通常每个Broker实例部署在不同的服务器上独立运行。
这就是分布式架构内保证高可用的常用方法之一:服务多实例。一个服务实例挂掉,还有其他实例可以提供服务,从而保证高可用。
那么保证分布式集群高可用的第二种方法:数据多副本,一个副本的数据丢失,还有其他的数据副本可以使用。
kafka的数据存在哪?分区,所以对于kafka来说保证数据不丢的高可用方式就是分区多副本。
Broker服务多实例、分区多副本结合到一起就是上面的这一张图,解释一下这张图:
- 1个酒类主题包含4个分区,每个分区有三个副本(一主二从、同一颜色)
- 三个分区副本分布式四个Broker服务实例上
- 生产者只向主分区副本发送消息数据
- 消费者也只从主分区副本拉取消息数据
另外,上图中的一个主题分区的三个分区副本之间的主从关系不是固定不变的,是根据分区副本所在的Broker服务实例的状态选举出来的。
举例说明,分区副本A、B、C,目前状态是A是主分区副本。如果分区副本A所在的Broker挂掉了,那么分区副本A就失去了“主副本资质”,在分区副本B和C之间重新选举一个主分区副本。
所以以上图所示的集群为例,分区副本分散存在于多个Broker服务实例上,所以即使其中1个或2个服务实例挂掉了,也不会使消息服务整体不可用。
因为还剩一个分区副本,生产者和消费者只和主分区副本(Leader)进行数据通信,从分区副本(Follower)只起到数据备份的作用。
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition**,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower;
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。
=============================================================================
一个集群中有多个broker(kafka服务器)
一个broker中可以有多个主题;
一个主题有多个分区,多个分区分布在不同的服务器上;
多副本(备份);
分区副本分散存在于多个Broker服务实例上,所以即使其中1个或2个服务实例挂掉了,也不会使消息服务整体不可用。
因为还剩一个分区副本,生产者和消费者只和主分区副本(Leader)进行数据通信,从分区副本(Follower)只起到数据备份的作用。
=================================================================================
=====================================================================================
引用:https://blog.csdn.net/Aeroever/article/details/130352535
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据
1.什么是Topic
Kafka 和 ActiveMQ 一样,都是非常优秀的消息订阅/发送的中间件。在 ActiveMQ 中,我们知道它有 Queue 和 Topic 的概念,
但是在 Kafka 中,只有 Topic 这一个概念(Kafka 消费端通过 group.id 属性可以实现 ActiveMQ 中 Queue 的功能,参见图1)
在 Kafka 中,Topic 是一个存储消息的逻辑概念,可以理解为是一个消息的集合。
每条发送到 Kafka 集群的消息都会自带一个类别,表明要将消息发送到哪个 Topic 上。
在存储方面,不同的 Topic 的消息是分开存储的,每个 Topic 可以有多个生产者向他发送消息,也可以有多个消费者去消费同一个Topic中的消息(参见图2)
补充:
此处Queue涉及到一个消费者组(Consumer Group)的概念。(上图groupid=1处的说明,有点小问题,这个消费者组 groupid 参考本文 3.Consumer Group 消费者组)
2.什么是Partition
Partition,在 Kafka 中是分区的意思。分区,提高了Kafka的并发,也解决了Topic中数据的负载均衡。
即:Kafka 中每个 Topic 可以划分多个分区(每个 Topic 至少有一个分区),同一个 Topic 下的不同分区包含的消息是不同的(分区可以间接理解成数据库的分表操作)。
每个消息在被添加到分区的时候,都会被分配一个 offset (偏移量),它是消息在当前分区中的唯一编号。
Kafka 通过 offset 可以保证消息在分区中的顺序性,但是跨分区是无序的,即 Kafka 只保证在同一个分区内的消息是有序的。
如下图,我们通过命令(命令如下↓↓↓)创建一个名为 test 的 Topic,并对其进行分区,设置 3 个分区,分别是 test-0、test-1、test-2。
每一条消息发送到 broker 的时候,会根据 Partition 的分区规则计算,然后选择将该消息存储到哪一个 Partition。
如果 Partition 规则设置合理,那么所有的消息都会均匀的分布在不同的 Partition 中,这样就类似于数据库的分库分表的概念,将数据做了分片处理操作。
问题1:此时你可能会有疑惑,为什么第一个producer会将消息写入 test-0,以此类推,此处涉及到5.producer消息分发策略。请继续往后看。
创建 Topic 命令如下:
bin/kafka-topics.sh --create --zookeeper 192.168.204.201:2181,192.168.204.202:2181,192.168.204.203:2181 --replication-factor 1 --partitions 3 --topic test
备注:bin/kafka-topics.sh --create ---->kafka自带命令 --create表示创建 topic
--zookeeper xxx.xxx.xxx.xxx:2181 ---->zookeeper 集群地址
--replication-factor 1 ---->备份数(1个备份)
--partitions 3 ---->kafka分区数(表示分了3个分区)
--topic test ---->要创建的 topic 的名称
3.Consumer Group 消费者组
消费者组,由多个 consumer 组成。 消费者组内每个消费者负 责消费不同分区的数据,一个分区只能由一个组内的某一个消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者 。
比如一个 topic,有2个分区 partition0、partition1。有一个消费者组,组内有2个消费者 customer0、customer1。
消费者组中的customer0 和 customer1 只能 【各自】 消费该topic中某个分区的数据,比如customer0消费partition0,customer1消费partition1。
如果没有消费者组的概念,该topic有2个分区 partition0、partition1,只有一个消费者 customer0,那么 partition0 和 partition1 两个分区的数据都需要customer0 来消费。 消费者组的好处,可以提高消费能力!!!
4.Topic 和 Partition 的存储
本实例,是以192.168.204.201、192.168.204.202、192.168.204.203三台服务器搭建成的Kafka集群,来做介绍的
如下图,表示名称为 test 的 topic已经创建完成。那么 Partition 是如何存储的呢??
Partition 是以文件的形式存储在文件系统中,如上创建了一个名为 test 的topic,我们定义其有 3 个 partition,既然 partition 是以文件的形式存储,那么这 3 个 partition 在哪里存储着呢?
我们可以在 kafka 的数据目录(/tmp/kafka-log)下找到,此目录可自行配置。在 /tmp/kafka-log 目录下,我们会看到有 3 个目录:test-0、test-1、test-2。命名规则是 topic_name-partition_id。所在目录如下图所示:
问题2:此时你可能会有疑惑,为什么 3个分区会随机分配到3台服务器,此时会涉及到多个分区在集群中的分配策略。那么多个分区如何在集群中做到合理的分配?
答:(1)将所有 N 个Broker 和 i 个 Partition 排序(本例中 N = 3,i = 3)
(2)将第 i 个 Partition 分配到 ( i % n)个 Broker 上。(这样 test-1 就分配到第一台了,以此类推)
5.producer消息分发策略
消息是 Kafka 中最基本的数据单元。在 Kafka 中,一条消息由 key 和 value 两部分组成,key 和 value 值都可以为空。
这里的 key 有什么用呢?当我们在发送一条消息时,我们可以指定这个 key ,那么 producer 则会根据 key 和 partition 机制,来判断当前这条消息应该发送并存储到哪个 partition 中。(此时问题1便得到了解决)
如果 Kafka 中的 key 为 null 该怎么办?默认情况下,Kafka 采用的是 hash 取模的分区算法。如果 key 为 null 的话,则会随机的分配一个分区。这个随机是在这个参数 "metadata.max.age.ms"的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区,这个值默认情况下是 10 分钟更新一次。
此外,Kafka 也为我们提供了自定义消息分发策略的入口,我们可以根据自身业务的情况,来自定义消息分发策略。那么如何来实现我们自己的分区策略呢?我们只需要定义一个类,实现 Partitioner 接口,重写它的 partition 方法即可。然后在配置 kafka 的时候,设置使用我们自定义的消息分发策略即可。如何自定义消息分发策略,请参照 4.1 自定义消息分发策略Demo
5.1 自定义消息分发策略Demo
** * 1.自定义分区策略 */ public class MyPartition implements Partitioner { Random random = new Random(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取分区列表 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int partitionNum = 0; if(key == null){ partitionNum = random.nextInt(partitionInfos.size());//随机分区 } else { partitionNum = Math.abs((key.hashCode())/partitionInfos.size()); } System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum); return partitionNum; } public void close() { } public void configure(Map<String, ?> map) { } }
/** * SpringBoot 下,添加如下partitioner.class 属性,指定使用自定义MyPartition类即可 */ spring: kafka: properties: partitioner.class: com.report.kafka.partition.MyPartition /** * Spring使用 xml 或 注解形式,配置如下属性即可 */ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.report.kafka.partition.MyPartition");
6.消费者如何消费指定分区消息
此时,名称为 test 的 topic 有 3 个分区,分别为0、1、2,如果我们想消费分区0中的消息,该如何消费呢?使用Java操作kafka 有 spring-kafka.jar 和 kafka-clients.jar 两种方式。如下对这两种方式分别作了介绍,便可以完成对指定分区消息的消费。
/** * 1.使用 spring-kafka.jar包中的 KafkaTemplate 类型 * 使用 @KafkaListener 注解方式 * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "test",partitions = {"1"})}) /** * 2.使用kafka-clients.jar包中的 KafkaConsumer 类型 * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 */ TopicPartition topicPartition = new TopicPartition("test" , 1); KafkaConsumer consumer = new KafkaConsumer(props); consumer.assign(Arrays.asList(topicPartition));
标签:Topic,副本,java,分区,partition,broker,kafka,topic,消息 From: https://www.cnblogs.com/xiaobaibailongma/p/17810377.html