首页 > 其他分享 >Kafka学习01:默认分区策略解析

Kafka学习01:默认分区策略解析

时间:2023-09-27 21:25:53浏览次数:45  
标签:01 分区 partition 默认 Kafka topic key indexCache newPart

 Kafka学习01:默认分区策略解析

 

Kafka版本:2.5.1

 DefaultPartitioner 类

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it ;如果方法声明了分区,则直接使用
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key;  没有没有申明分区,但是传入了key,则根据key的Hash值,进行分区选择
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full. 如果没有声明分区,也没有传入key,使用粘性分区策略
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
      // key不存在时,使用粘性分区策略
            return stickyPartitionCache.partition(topic, cluster);
        } 
      //获取分区信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
      //获取分区数量
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
     //根据可以的Hash值取余分区数量,确认分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

 

StickyPartitionCache类public class StickyPartitionCache {

  //ConcurrentMap用于缓存,key=topic,value=分区Id
    private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) {
    //在缓存中查找topic对应的分区Id Integer part = indexCache.get(topic); if (part == null) {
    //如果分区号不存在,则寻找下一个 return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    //获取全部分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    //获取原分区Id Integer oldPart = indexCache.get(topic);
    //默认新分区Id = 原分区Id Integer newPart = oldPart;     //判断原分区是否存在(是否已经设置过),如果没有则执行if逻辑 if (oldPart == null || oldPart == prevPartition) {
        //获取可用分区列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
          //如果可用分区不存在,则随机选一个分区 Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) {
          //如果可用分区只有一个,则选择该分区 newPart = availablePartitions.get(0).partition(); } else {
          //如果可用分区有多个,则循环随机选择一个可用分区,直到和原分区不一样 while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) {
          //如果原分区不存在,则对topic这是新的分区 indexCache.putIfAbsent(topic, newPart); } else {
          //如果原分区存在,则用新分区对topic替换原分区 indexCache.replace(topic, prevPartition, newPart); }
        //查询返回分区 return indexCache.get(topic); } return indexCache.get(topic); } }

 

END

 

 

 

 

 

标签:01,分区,partition,默认,Kafka,topic,key,indexCache,newPart
From: https://www.cnblogs.com/wobuchifanqie/p/17734118.html

相关文章

  • JOI Open 2018
    バブルソート2/BubbleSort2可以发现,答案即为\(\max\limits_{i=1}^n\sum\limits_{j=1}^{i-1}[a_j>a_i]\)。因为是\(\max\),所以可能成为答案只有后缀最小值,可以把式子改写成\(\max\limits_{i=1}^n\left(i-\sum\limits_{j=1}^{n}[a_j\lea_i]\right)\)。这个就可以直接使......
  • JOISC 2019
    試験/Examination直接三维偏序。#include<iostream>#include<cstdio>#include<cstring>#include<numeric>#include<algorithm>usingnamespacestd;constintN=100005;intn,Q;intv[N*6],cnt;structQuery{inta,b,c;intv,i......
  • KEYENCE Programming Contest 2019
    A-Beginning排序以后判断一下是否为\(1,4,7,9\)即可。#include<iostream>#include<cstdio>#include<algorithm>usingnamespacestd;constintN=10;inta[N];intmain(){ for(inti=1;i<=4;i++) scanf("%d",&a[i]); sort(a+1,a+4+1......
  • NIKKEI Programming Contest 2019
    A-Subscribers最小值为\(\min(A,B)\),最大值为\(\max(A+B-n,0)\)。#include<iostream>#include<cstdio>usingnamespacestd;intn,A,B;intmain(){ scanf("%d%d%d",&n,&A,&B); printf("%d%d",min(A,B),max(A+B-n,0......
  • 2023-2024-1 20231401 《计算机基础与程序设计》第一周学习总结
    作业信息该作业属于2023-2024-1计算机基础与程序设计https://edu.cnblogs.com/campus/besti/2023-2024-1-CFAP作业要求在https://www.cnblogs.com/rocedu/p/9577842.html#WEEK01作业目标:加入云班课,参考学习本周学习资源注册博客园账号,加入2022-2023-1-计算机基础与程序设计......
  • 「SDOI2011」 黑白棋
    绷不住了,洛谷上的dp没一个表述清楚了,一怒之下写一篇题解。注意本题解只讲dp部分。首先转化不合法的充要条件就是:设相邻两个棋子中间间隔数量为\(b\),那么对于任意非负整数\(i\)都有\((d+1)|\sum(b\&2^i)\)。其中\(\&\)是按位与运算。所以我们要计数一个有序的并且包含......
  • ExaWizards 2019
    A-RegularTriangle判断三个数是否相等。#include<iostream>#include<cstdio>usingnamespacestd;intA,B,C;intmain(){ scanf("%d%d%d",&A,&B,&C); if(A==B&&B==C)printf("Yes"); elseprintf("No"); ......
  • Japanese Student Championship 2019 Qualification
    A-TakahashiCalendar枚举\(m\),再枚举\(d_1\),判断一下是否合法即可。#include<iostream>#include<cstdio>usingnamespacestd;intm,d;intmain(){ scanf("%d%d",&m,&d); intans=0; for(inti=1;i<=m;i++) { for(intd10=2;d10<......
  • NIKKEI Programming Contest 2019-2
    A-SumofTwoIntegers分奇偶讨论一下就好了,答案为\(\lfloor\frac{n-1}\{2\}\rfloor\)。#include<iostream>#include<cstdio>usingnamespacestd;intn;intmain(){ scanf("%d",&n); printf("%d",(n-1)/2); return0;}B-......
  • Tenka1 Programmer Contest 2019
    C-Stones枚举分界点爆算即可。#include<iostream>#include<cstdio>usingnamespacestd;constintN=200005;intn;chars[N];intsum[N][2];intmain(){ scanf("%d",&n); scanf("%s",s+1); sum[0][0]=sum[0][1]=0; for(inti=1;i......