首页 > 其他分享 >关于Kafka Topic分区和Replication分配的策略

关于Kafka Topic分区和Replication分配的策略

时间:2024-07-14 20:30:17浏览次数:8  
标签:副本 currentPartitionId int 分区 Kafka Topic Replication brokerList

文章目录

1. Topic多分区

在这里插入图片描述

如图,是一个多分区Topic在Kafka集群中可能得分配情况。

P0-RL代表分区0,Leader副本。

这个Topic是3分区2副本的配置。分区尽量均匀分在不同的Broker上,分区的Follower副本尽量不和Leader在一个Broker上。

2. 理想的策略

假设有3个Topic在含有3个Broker的Kafka集群上。

Topic1有1个分区,2个副本。

Topic2有2个分区,2个副本。

Topic3有3个分区,2个副本。

可能如下图所示。不同颜色表示不同Topic。
请添加图片描述

似乎不是特别理想,我们再调整一下,如下图
在这里插入图片描述

不仅每个Broker的副本数一样了,更关键的是,并且每个Broker的主Leader副本也一样的。这样更适合负载均衡。

3. 实际的策略

我们使用Kafka tool,来以此创建上述3个Topic。
请添加图片描述
在这里插入图片描述
在这里插入图片描述

首先看test1
在这里插入图片描述
在这里插入图片描述

然后看test2
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

然后是test3
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

按照上面的信息,画出来的分配结果如下图
在这里插入图片描述

似乎并不和我们想的一样。

查看源码,Breadcrumbskafka/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java中一段代码

private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(int nPartitions,
                                                                                  int replicationFactor,
                                                                                  List<Integer> brokerList,
                                                                                  int fixedStartIndex,
                                                                                  int startPartitionId) {
        Map<Integer, List<Integer>> ret = new HashMap<>();
        int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
        int currentPartitionId = Math.max(0, startPartitionId);
        int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
        for (int i = 0; i < nPartitions; i++) {
            if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0))
                nextReplicaShift += 1;
            int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
            List<Integer> replicaBuffer = new ArrayList<>();
            replicaBuffer.add(brokerList.get(firstReplicaIndex));
            for (int j = 0; j < replicationFactor - 1; j++)
                replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
            ret.put(currentPartitionId, replicaBuffer);
            currentPartitionId += 1;
        }
        return ret;
    }

例子(来自尚硅谷)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4. 如何自定义策略

public class AdminTopicTest {
    public static void main(String[] args) {
        //定义kafka集群配置
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");

        //创建Admin管理员对象
        Admin admin = Admin.create(config);

        //定义Topic属性
        HashMap<Integer, List<Integer>> map = new HashMap<>();
        // 分区0,Leader副本在3上,第二个副本在1上。
        map.put(0, Arrays.asList(3, 1));
        map.put(1, Arrays.asList(2, 3));
        map.put(2, Arrays.asList(1, 2));
        NewTopic test4 = new NewTopic("test2", map);


        //创建Topic
        CreateTopicsResult result = admin.createTopics(
                Arrays.asList(
                        test4
                )
        );

        admin.close();
    }
}

不过在手动分配时,确实需要了解每个broker的负载情况,以便做出更优的分配策略。你可以使用Kafka的AdminClient类来获取集群的状态信息

标签:副本,currentPartitionId,int,分区,Kafka,Topic,Replication,brokerList
From: https://blog.csdn.net/m0_51390969/article/details/140422185

相关文章

  • kafka
    在中小企业中,以下几种消息中间件相对更为常用:RabbitMQ:它具有丰富的功能和易用性。许多中小企业选择它是因为其提供了多种消息路由模式,如直连、主题、扇出等,能够满足不同的业务需求。例如,在一个小型的电商系统中,RabbitMQ可以用于处理订单处理、用户注册通知等消息的异步传递。......
  • 消息队列Kafka简单使用(可以直接上手)
    1.消息中间件简介消息中间件(MessageMiddleware)是一种在分布式系统中用于解耦不同服务或组件的软件,它通过异步消息传递的方式来实现服务之间的通信。消息中间件允许系统组件之间通过发送和接收消息进行交互,而无需知道彼此的具体实现细节,从而提高了系统的可扩展性、灵活性和......
  • kafka实战
    目录分布式安装监听器分布式安装//kafka依赖zookeeper,要先启动zookeepermkdiretc//etc用于存放配置文件//将zookeeper配置文件复制到etccpconfig/zookeeper.propertiesetc//复制3份,伪分布式cpconfig/server.propertiesetc/server-0.propertiescpconfig/serv......
  • Kafka基础知识
    目录Kafka简介消息Kafka简介中间件,如Kafka,用来存储消息的软件(组件)程序员可以在消息队列中存取消息很多时候消息队列是临时存储(设定期限:消息在MQ中保存10天)消息存在topic主题,类似数据库中的表,但是是半结构化的一般存同一类型的消息,但有些情况下也可以存不同类......
  • librdkafka 常见问题FAQ
    1,++librdkafka一个消费者实例分配了所有分区不提交位点,在发生rebalance后,会重新从头消费数据吗在发生重新平衡(rebalance)后,消费者会从哪里开始消费数据取决于消费者组的配置,特别是auto.offset.reset配置项。关键配置项:auto.offset.resetauto.offset.reset决定了在没有有......
  • .NET/C#、Netcore、数据库、Redis 、RabbitMQ&kafka、Docker ⾯试题汇总系列目录
    .NET/C#⾯试题汇总系列.NET/C#⾯试题汇总系列:ASP.NET常见面试题001.NET/C#⾯试题汇总系列:ASP.NET常见面试题002.NET/C#⾯试题汇总系列:ASP.NET常见面试题003.NET/C#⾯试题汇总系列:基础语法.NET/C#⾯试题汇总系列:ASP.NETMVC.NET/C#⾯试题汇总系列:多线程.NET/C#⾯......
  • ROS源码学习分享_TopicManager::start()
        在上一章节中,我们讲解了NodeHandle节点创建后的一些背后行为。其最重要的行为是启动了全部的管理节点。在本章中,我们将看一看TopicManager节点在启动之后发生了什么。(以下内容,属于个人观看源码后得出的理解,可能有错,仅用于自我复习,请批判的看待)    TopicM......
  • centos7下kafka集群安装部署
    centos7下kafka集群安装部署 应用摘要: Apachekafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,是消息中间件的一种,用于构建实时数据管道和流应用程序,很火!Kafka官网:http://kaf......
  • kafka分层存储解读
    分层存储的目标是根据数据的特性和组织的策略,将数据放在最合适的存储介质上,从而优化存储资源的使用,平衡性能和成本。怎么进行分层存储:可以根据分析使用模式、访问频率和其他因素的策略和算法,自动在这些层之间放置和移动数据。这确保了最关键和频繁访问的数据驻留在高性能存储中......
  • Linux 下 kafka 集群部署
    本文将以三台服务器为例,介绍在linux系统下kafka的部署方式。1.zookeeper下载下载地址:ApacheKafka选择需要的介质下载,这里以 kafka_2.11-1.1.1.tgz为例2.环境准备  部署kafka需要先部署JDK以及zookeeper ,JDK部署可以参考Linux下JDK安装-CSDN博客 zookeeper......