首页 > 其他分享 >flink消费kafka数据如何保证kafka的分区与flink的消费线程一一对应?

flink消费kafka数据如何保证kafka的分区与flink的消费线程一一对应?

时间:2022-09-06 14:57:12浏览次数:86  
标签:消费 分区 flink 并行度 线程 kafka

1.flink程序总的消费线程是如何找见消费的对应kafka分区的?
核心代码如下:
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex =
((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

    // here, the assumption is that the id of Kafka partitions are always ascending
    // starting from 0, and therefore can be used directly as the offset clockwise from the
    // start index
    return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

根据topic名称的哈希值乘以31,再与0x7FFFFFFF按位进行与操作后,再与程序设置的并行度(如果算子单独设置并行度,则此并行度为source端并行度)进行取余操作,得到起始下标值。
起始下标拿到后,与程序消费的kafka当前分区编号求和后,再与程序并行度(也就是子任务的运行个数。如果算子单独设置并行度,则此并行度为source端并行度)进行取余操作,得到的就是程序中处理数据的具体线程编号。
这样就得到了,kafka分区编号和程序线程编号之间的映射关系。
2.kafka分区数和flink消费线程数的匹配
通过上面的分析我们知道了,flink消费线程是如何找到消费到的kafka某个分区中的数据的。但是在这里消费线程数和kafka分区数存在3种情况:消费线程数=kafka分区数,消费线程数>kafka分区数,消费线程数<kafka分区数。我们一一进行讨论。
消费线程数=kafka分区数
在这种情况下,通过第一部分我们知道,kafka分区和消费线程是一一对应的,每个消费线程只会去消费特定的kafka分区中的数据。kafka分区数:flink程序消费线程数=1:1,这样消费效率较高。但这样也可能会存在:kafka中的某个分区数据量较大,有数据倾斜的可能性,在消费线程中就会出现消费速率低下,甚至出现反压的情况。想要控制这种情况的出现,可以在将数据放入kafka中按照主键或者某个字段进行分区操作,这样尽可能的保证kafka中的数据不会存在倾斜的情况。
消费线程数>kafka分区数
这种情况就会出现,某几个线程会消费kafka分区中的数据,但是剩下的线程找不见分区中的数据,线程就会空转,浪费资源。这样也会导致一个问题,checkpoint做不了,可能会导致ck超时失败。ck失败,程序无法保存状态,程序就会失败宕机。这种情况应该禁止出现。
消费线程数<kafka分区数
这种情况下,某个线程可能会消费多个kafka分区中的数据。数据量较大时,出现数据倾斜的可能性会比较大,也可能因为消费速率导致出现反压进而影响程序的正常运行。这种情况也尽量避免。
综上所述,完美的情况是:kafka分区数:flink程序消费线程数=1:1,但可能受限于环境、网络等情况,也可以设置为kafka分区数:flink程序消费线程数=2:1。

标签:消费,分区,flink,并行度,线程,kafka
From: https://www.cnblogs.com/jia-tong/p/16661777.html

相关文章

  • 跨线程(线程池)日志链路追踪
    不跨线程的业务日志链路我们可以通过日志框架如Logback 使用MDC对象放入一个traceId,再在日志配置文件配置即可但如跨线程的话traceid无法传递(MDC底层使用的TreadLoca......
  • Kafka Connector Source/ Kafka Connector Sink连接器的开发使用
    一,Kafka连接器介绍Kafka连接器通常用来构建数据管道,一般有两种使用场景:开始和结束的端点:例如,将Kafka中的数据导出到HBase数据库,或者把Oracle数据库中的数据导入......
  • Flink 双流联结——窗口联结(Window Join)
    对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理。例如用传感器监控火情时,我们需要将大量温度传感......
  • 面试~线程池-三大方法、七个参数、四种拒绝策略、实际应用
    池化技术程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术线程池、连接池、内存池、对象池///.....创建、销毁。十分浪费资源池化技术:事先准备好一些资源,有......
  • linux线程同步简单示例
    #include<stdio.h>#include<pthread.h>#include<stdlib.h>//intpthread_create(pthread_t*thread,constpthread_attr_t*attr,void*(*start_routine)(void*),v......
  • Kafka Docker安装
    安装Zookeeperdockerrun--namezookeeper-d-p2181:2181-v/etc/localtime:/etc/localtimelibrary/zookeeper安装Kafkadockerrun-d--namekafka-p9092:9092......
  • Java---线程入门
    前置知识什么是进程,什么又是线程?咱不是讲系统,简单说下,知道个大概就好了。进程:一个可执行文件执行的过程。线程:操作系统能够进行运算调度的最小单位。它被包含在进程之......
  • Flink-状态管理
     流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发......
  • JAVA线程
    01、基本概念:程序、进程、线程程序(program):为完成特定任务、用某种语言编写的一组指令的集合。即指一段静态的代码,静态对象。进程(process):程序的一次执行过程,或是正在运行......
  • 线程
    1、什么是线程?进程?两者区别?线程:是操作系统能够进⾏运算调度的最⼩单位,由进程创建的,是进程的一个实体,线程也可以创建线程;进程:正在运行的一个程序,一个进程可以拥有多个线......