首页 > 其他分享 >kafka

kafka

时间:2023-02-14 16:22:04浏览次数:41  
标签:消费 分区 partition kafka topic 线程 consumer

1、名词介绍

broker :一台kafka服务器就是一个broker

Partition:是实际存储消息的物理单元,一个Topic内部可以包含多个partition。Topic内部的partition是从0开始,顺序编号,消息在Partition内部用offset编号,每条新消息offset增加1。使用Topic名称 + partition num + offset可以唯一确定一条消息。Partiton有如下性质(下面所说的consumer行为都是同group的情形下)

1) 每个Partition都有独立的物理文件存储,Topic增加Partition是能够达到扩容的目的。

2) parition内部消息的发送和消费保证顺序性,partition之间消息消费不保证顺序

3) 一个partition只能被一个consumer消费,一个consumer可以消费多个partition

4) 如果consumer没有变动,在整个期间消费的partition是固定的。如果consumer新增或者下线,如果partion数量改变,会触发partition重新分配,即rebalance。rebalance 期间consumer不能正常消费

5) 如果consumer数量大于partition数量,则会出现冷备consumer,因为没有partition分配给这个consumer。这时consumer会打日志提示;并不影响正常消费

6) Java consumer里还有 --consumer-thread概念,consumer实例数量 * 每个实例的--consumer-thread数量是总的消费线程数,这个数应当小于等于partition数量,否则会有冷备的消费线程;同时这个数量应能整除partition数量,否则各个消费线程的任务量会不均匀。默认consumerThread都是1

7) 一个topic在集群中可以有多个partition,那么发送消息怎么确定应该发送到哪个分区上?有两种基本的策略,一是采用Key Hash算法,如果消息指定key,那么会根据消息的key进行hash,然后对partition分区数量取模,决定落在哪个分区上,所以,对于相同key的消息来说,总是会发送到同一个分区上,也是我们常说的消息分区有序性;一是采用Round Robin算法,即轮询方式,没有指定的时候经常采用这种方式

2、工作线程和消费线程

kafka consumer有worker threads和--consumer-threads,默认情况下工作线程和consumer线程数量都是1
--consumer-threads 是指Kafka IO线程的个数,主要用于从broker拉取message
--worker-threads是指Kafka实际执行消费逻辑的线程,一般消费能力不足时扩容这个参数即可。特殊情况: 当topic吞吐特别大,且消费逻辑是不堵塞行为时,可以考虑使用--worker-threads 0,这样消费逻辑就会在Kafka IO线程中直接执行,减少了转发到工作线程池的开销 这个时候,消费扩容就只能依赖提高--consumer-threads数来进行了 再次强调:如果没有特殊理由,我们一般不使用--consumer-threads参数
--worker-threads 4096
--consumer-threads 5,一个consumer thread会消费一个partition
注意consumer的main函数中不能有\n和换行符等
分别指定两者的数量

3、Consumer Group

1)是Kafka组织消费时的概念。对于一个Topic的消费,由Kakfa的consumer负责记录消费到了partition的哪个offset。

2)Group内部的consumer是按照一定策略去分配一个Topic下的partition。不同的Group侧消费相同的topic时独立互相不影响,也就是说,对于同一个topic,可以通过启动不同的group来消费多份全量数据。

3)由于Kafka实现机制的原因,不同topic的consumer,使用了相同的group,在发生rebanlance时,会互相影响。因此不推荐group复用:即对于不同的topic,不要使用相同的group消费。

4)

4、如果多个consumer消费同一个partition会有什么问题?

由于消费者自己可以控制读取消息的offset,就有可能C1和C2读到相同的消息并消费。则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复。

5、partition怎么确定应该分给哪个consumer

1)range策略,是默认的策略。range分配策略针对的是topic,将一个topic中分区按数字顺序排行序,消费者按消费者名称的字典序排好序。例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
为什么不是两个消费者各三个分区呢?partition分配原则:对于一个topic,用partition总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区。topic t0有3个partition,3 / 2除不尽,所以C0会多分配一个partition。对于topic t1,也是相同道理。

如果只有一个topic,对c0和c1影响不大,如果有多个 topic,那么针对每个topic,消费者c0都将多消费1个分区,topic越多,c0 消费的分区会比其他消费者多。这就是 Range 范围分区的一个很明显的弊端了

2)round robin(轮询)

这个会根据所有的主题进行轮询分配,不会出现Range那种主题越多可能导致分区分配不均衡的问题。P0->A,P1->B,P1->A。。。以此类推。轮询分两种情况:

a:所有consumer消费的topic相同,则RoundRobin策略的分区分配会是均匀。例如三个消费者c0,c1,c2都消费相同的topic(t0,t1),每个topic都有3个分区(p0,p1,p2),那么所有的分区标识为t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,最终分区分配的结果如下:

c0消费 t0p0 、t1p0 分区
C1消费 t0p1 、t1p1 分区
C2消费 t0p2 、t1p2 分区

b:consumer消费的topic不同,例如,有3个消费者C0、C1和C2,有3个topic:t0,t1,t2,t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2),c0消费t0,c1消费t0和t1,c2消费t0和t1和t2,最终分区分配结果如下:

C0消费 t0p0分区
C1消费 t1p0分区
C2消费 t1p1、t2p0、t2p1、t2p2分区

从如上实例,可以看到RoundRobin策略也并不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者 C1。所以,如果想要使用RoundRobin 轮询分区策略,必须满足如下两个条件:

①每个consumer消费的topic,必须是相同的
②每个topic的消费者实例都是相同的

 

标签:消费,分区,partition,kafka,topic,线程,consumer
From: https://www.cnblogs.com/MarkLeeBYR/p/17119581.html

相关文章

  • 【Spring-boot-route(十四)整合Kafka】
    kafka简介kafka是用Scala和Java语言开发的,高吞吐量的分布式消息中间件。高吞吐量使它在大数据领域具有天然的优势,被广泛用来记录日志。kafka架构分析注1:图中的红色箭头表示......
  • 决战圣地玛丽乔亚Day10--kafka学习
    概念上kafka和RocketMQ的结构很类似。除了Broker、Producer、Consumer、Topic。多了一个分区的概念Partition。对于NameServer的概念,kafka用的是zookeeper来保存信息。包......
  • 通过 KoP 将 Kafka 应用迁移到 Pulsar
    通过KoP将Kafka应用迁移到Pulsar版权声明:原文出自https://github.com/streamnative/kop,由Redisant进行整理和翻译目录通过KoP将Kafka应用迁移到Pulsar什......
  • Kafka原理-分区leader选举
    0.说明kafka源码版本为1.0 1.分区状态kafka源码定义了4种状态NewPartition:表示正在创建新的分区,是一个中间状态,只是在Controller的内存中存了状态信息OnlinePart......
  • kafka 常见命令以及增加topic的分区数
    基础命令1.创建topickafka-topics.sh--bootstrap-server${kafkaAddress}--create--topic${topicName}--partitions${partipartions}--replication-factor${rep......
  • kafka删除topic清空数据
    一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。如果一定要删除可以删除topic在重建topic了No.1:如果需要被删除topic此时正在被程序produce和consum......
  • kafka笔记
    1、概念:kafka是一个用scala语言编写的,分布式、支持分区(partition)、多副本(replica),基于zookeeper协调分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各种需......
  • go连接kafka
    Part1前言本文主要介绍如何通过go语言连接kafka。这里采用的是sarama库。​​https://github.com/Shopify/sarama​​Part2库的安装goget-ugithub.com/Shopify/saramago......
  • kafka如何开启kerberos认证
    参考:     https://www.cnblogs.com/wuyongyin/p/15624452.html kerberos基本原理       https://www.cnblogs.com/wuyongyin/p/15634397.html kerb......
  • 05-KafkaConsumer
    1.工作流程1.1消费者组概述ConsumerGroup(CG):由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupId相同。消费者与消费组这种模型可以让整体的消费......