首页 > 其他分享 >Kafka 分区器

Kafka 分区器

时间:2023-02-24 23:56:40浏览次数:43  
标签:序列化 分区 partition Kafka key null public

分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器(下一章会详细介绍)一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。

Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Partitioner 接口,这个接口中定义了2个方法,具体如下所示。

public int partition(String topic, Object key, byte[] keyBytes, 
                     Object value, byte[] valueBytes, Cluster cluster);
public void close();

其中 partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close() 方法在关闭分区器的时候用来回收一些资源。

Partitioner 接口还有一个父接口 org.apache.kafka.common.Configurable,这个接口中只有一个方法:

void configure(Map<String, ?> configs);

Configurable 接口中的 configure() 方法主要用来获取配置信息及初始化数据。

在默认分区器 DefaultPartitioner 的实现中,close() 是空方法,而在 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区。

注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key 为 null 并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。

除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同 DefaultPartitioner 一样实现 Partitioner 接口即可。默认的分区器在 key 为 null 时不会选择非可用的分区,我们可以通过自定义的分区器 DemoPartitioner 来打破这一限制。

//代码清单4-4 自定义分区器实现
public class DemoPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes) {
            return counter.getAndIncrement() % numPartitions;
        }else
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override public void close() {}

    @Override public void configure(Map<String, ?> configs) {}
}

实现自定义的 DemoPartitioner 类之后,需要通过配置参数 partitioner.class 来显式指定这个分区器。示例如下:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
        DemoPartitioner.class.getName());

这个自定义分区器的实现比较简单,读者也可以根据自身业务的需求来灵活实现分配分区的计算方式,比如一般大型电商都有多个仓库,可以将仓库的名称或 ID 作为 key 来灵活地记录商品信息。

标签:序列化,分区,partition,Kafka,key,null,public
From: https://www.cnblogs.com/fxh0707/p/17153562.html

相关文章

  • Linux系列教程(十九)——Linux文件系统管理之手工分区
    上篇博客我们首先介绍了硬盘为什么要分区,以及Linux系统的几种分区类型,然后介绍了Linux系统几个常用的文件系统命令,最后讲解了挂载命令,并通过实例演示了如何挂载光盘和U......
  • SpringBoot30 - 整合Kafka
    SpringBoot整合Kafka安装​ windows版安装包下载地址:https://kafka.apache.org/downloads​ 下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下......
  • 一文读懂Kafka Connect核心概念
    概览KafkaConnect是一种用于在ApacheKafka和其他系统之间可扩展且可靠地流式传输数据的工具。它使快速定义将大量数据移入和移出Kafka的连接器变得简单。KafkaC......
  • centos7-分区2T以上大硬盘
    centos7-分区2T以上大硬盘1.centos7-分区2T以上大硬盘由于使用fdisk进行分区默认在2T内,大于2T后fdisk就无法进行大硬盘进行分区,需要对大于2TB进行分区,使用parted进行......
  • Kafka 序列化
    序列化生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成......
  • Android 分区和内存监控
    Android分区和内存监控Andorid之所以是分区,是因为各自有对应的功能和用途的考量,可以进行单独读写和格式化。Android设备包含两类分区:一类是启动分区,对启动过程至关重......
  • Kafka 上手实战
    参数配置bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以......
  • threejs shader特效,分区辉光
    分区辉光有两种实现方式:1.单个图层两次渲染,先用带bloom的composer渲染一次,再正常渲染一次:https://github.com/mrdoob/three.js/blob/master/examples/webgl_postprocessin......
  • 想完全弄懂kafka?看这篇就够了
    有人说世界上有三个伟大的发明:火,轮子,以及Kafka。发展到现在,ApacheKafka无疑是很成功的,Confluent公司曾表示世界五百强中有三分之一的企业在使用Kafka。今天便和大家分......
  • kafka常用操作
    如果把一个项目/微服务当成一个消费组,那么一个topic可能在多个消费组【一个topic被多个项目订阅】,一个消费组可能有多个topic。【一个项目订阅了多个topic】。一个消费组内......