首页 > 其他分享 >kafka指定key进行分区遇到的问题

kafka指定key进行分区遇到的问题

时间:2024-08-30 11:03:39浏览次数:13  
标签:keyBytes int 分区 kafka topic nextValue key Utils availablePartitions

问题描述:
kafka在指定key进行分区的时候,若某一个分区异常,则发往这个分区的数据均会失败;没有指定key进行分区的便不会出现改问题。

看一下producer的源码:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
1.指定key,根据keybytes做hash,对partiton的数量取模,但是未考虑某个分区不可用的情况
2.未指定key,会获取可用的分区数量,进行取模操作,返回可用的分区

问题解决:
自定义分区器:将上述源码稍加修改即可:
`public class CustomPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(Utils.murmur2(keyBytes)) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }

    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

public void close() {
}

}`

客户端引用该分区器即可

标签:keyBytes,int,分区,kafka,topic,nextValue,key,Utils,availablePartitions
From: https://www.cnblogs.com/hamsure/p/18388312

相关文章

  • 好多kafka难题啊,看看其中的化解之道
    文末有面经共享群前面已经分享过几篇面试了,这是一篇关于更加面向项目和技术的面经详解,第一次遇见问那么多kafka的问题,看看这个粉丝是怎么回答的。先来看看职位描述:岗位职责:负责基于Go的后端服务的设计、开发和维护;参与系统架构设计,确保系统的高可用性、高性能和可扩展性;......
  • Kafka分布式集群部署实战:跨越理论,直击生产环境部署难题与解决方案,性能调优、监控与管
    本文介绍kafka的集群如何部署和安装,1-4章理论知识,第5章详解集群的部署,部署Kafka之前需要先部署好分布式的Zookeeper,不喜欢理论的可以直接看第5章,欢迎大家一起探讨技术!Zookeeper集群部署参考文章:精通Zookeeper:详解分布式集群部署全程,掌握数据一致性、选举机制与集群容错能力-......
  • vue的for循环不建议用index作为key
    我们页面总有一些相似的,我们想用循环渲染,根据对象数组结构进行渲染,这是不是很熟悉的场景。这时候我们需要有一个唯一的key绑定在循环渲染的元素上,一般情况下我们会用id,因为id是唯一的。然而有些页面要循环的数据(比如描述性的对象数组)没有id的时候,有的人会用index下标作为key,实......
  • 自动化测试:Monkey工具实践应用~
    在移动应用的自动化测试中,意外的用户操作和各种不可预见的场景往往是导致应用崩溃的主要原因。如何有效地模拟这些复杂场景,成为了测试工程师的一大挑战。而在这一过程中,Monkey工具凭借其随机化测试的独特优势,成为了许多团队的利器。那么,Monkey工具究竟是如何帮助测试工程师发现隐......
  • Mybatis-puls中select查询方法报错Can not find table primary key in Class
    1、项目参数springboot2.6.13jdk8Mybatis-Plus3.5.42、问题描述Mybatis-puls中select查询方法报错CannotfindtableprimarykeyinClass,org.apache.ibatis.binding.BindingException:Invalidboundstatement(notfound):com.example.dao.FLowerDao.selectById3、......
  • 使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)
    文章目录1、发送消息KafkaService2、生产者service-album->AlbumInfoServiceImpl2.1、新增saveAlbumInfo()2.2、更新updateAlbumInfo()2.3、删除removeAlbumInfo()3、消费者service-search->AlbumListener.java上架:新增专辑到es下架:删除专辑新增:如果是......
  • APP稳定性测试工具-monkey
    monkey是adbshell中的一个命令行工具,用于执行随机的用户事件(如触摸、点击、滑动、键盘输入等)来测试应用程序的稳定性。安装下载并安装AndroidSDK配置环境变量,将adb所在路径添加到系统环境变量中验证安装adbversion使用1.启动Android设备或模拟器查看已连接设......
  • saveBatch时 遇到Duplicate entry '1828978156126666754' for key
    问题:saveBatch时遇到Duplicateentry'1828978156126666754'forkey分析:1.检查数据库里是否有重复ID      2.检查代码中是否有id赋值     3.       以上排查都没发现问题,以下代码分析了一下,为了节省空间,我在for循环上面new了一个封装类,......