首页 > 其他分享 >[Flink/Kafka] Flink消费Kafka消息的检查点设置方式 [转载]

[Flink/Kafka] Flink消费Kafka消息的检查点设置方式 [转载]

时间:2023-11-06 18:23:21浏览次数:36  
标签:offsets Flink kafka checkpoint 检查点 提交 offset commit Kafka

flink消费kafka 本机java代码测试 flink消费kafka机制

flink 消费 kafka 数据,提交消费组 offset 有三种类型

1、开启 checkpoint :                                                  在 checkpoint 完成后提交
 2、开启 checkpoint,禁用 checkpoint 提交:             不提交消费组 offset
 3、不开启 checkpoint:                                              依赖kafka client 的自动提交
重点当然是开启 checkpoint 的时候,怎么提交消费组的 offset

一个简单的 flink 程序: 读取kafka topic 数据,写到另一个 topic

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// enable checkpoint
val stateBackend = new FsStateBackend("file:///out/checkpoint")
env.setStateBackend(stateBackend)
env.enableCheckpointing(1 * 60 * 1000, CheckpointingMode.EXACTLY_ONCE)

val prop = Common.getProp
//        prop.setProperty("enable.auto.commit", "true")
//        prop.setProperty("auto.commit.interval.ms", "15000")
val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop)
//        kafkaSource.setCommitOffsetsOnCheckpoints(false)

val kafkaProducer = new FlinkKafkaProducer[String]("kafka_offset_out", new SimpleStringSchema(), prop)
//        kafkaProducer.setWriteTimestampToKafka(true)

env.addSource(kafkaSource)
  .setParallelism(1)
  .map(node => {
    node.toString + ",flinkx"
  })
  .addSink(kafkaProducer)

// execute job
env.execute("Kafka-To-Kafka-Job")

1 启动 checkpoint & 启用 commit on checkpoint (默认)

开启 checkpoint 默认值就是 消费组 offset 的提交方式是: ON_CHECKPOINTS

offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的时候会设置:

FlinkKafkaConsumer 提交消费者的 offset 的行为在 FlinkKafkaConsumerBase open 的时候会设置:

@Override
public void open(Configuration configuration) throws Exception {
  // determine the offset commit mode
  this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
      getIsAutoCommitEnabled(),
      enableCommitOnCheckpoints,  // 默认值 true
      ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

fromConfiguration 对应代码

public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

  if (enableCheckpointing) {
    // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
    return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
  } else {
    // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
    return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
  }
}

当 flink 触发一次 checkpoint 的时候,会依次调用所有算子的 notifyCheckpointComplete 方法,kafka source 会调用到 FlinkKafkaConsumerBase.notifyCheckpointComplete

注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父类

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
  ....

  if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
    // only one commit operation must be in progress
    ...

    try {
      // 获取当前checkpoint id 对应的待提交的 offset index
      final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
      if (posInMap == -1) {
        LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
          getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
        return;
      }
      // 根据 offset index 获取 offset 值,待提交的就直接删除了
      @SuppressWarnings("unchecked")
      Map<KafkaTopicPartition, Long> offsets =
        (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
      
      ....

      // 调用 KafkaFetcher的 commitInternalOffsetsToKafka 方法 提交 offset
      fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
    
    ....

最后调用了 AbstractFetcher.commitInternalOffsetsToKafka

public final void commitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  // Ignore sentinels. They might appear here if snapshot has started before actual offsets values
  // replaced sentinels
  doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
}

protected abstract void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception;

AbstractFetcher.doCommitInternalOffsetsToKafka 的实现 KafkaFetcher.doCommitInternalOffsetsToKafka

使用 Map<KafkaTopicPartition, Long> offsets 构造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit

注:offset + 1 表示下一次消费的位置

@Override
protected void doCommitInternalOffsetsToKafka(
  Map<KafkaTopicPartition, Long> offsets,
  @Nonnull KafkaCommitCallback commitCallback) throws Exception {

  @SuppressWarnings("unchecked")
  List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();

  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());

  for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) {
    Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
    if (lastProcessedOffset != null) {
      checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

      // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
      // This does not affect Flink's checkpoints/saved state.
      long offsetToCommit = lastProcessedOffset + 1;

      offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
      partition.setCommittedOffset(offsetToCommit);
    }
  }

  // record the work to be committed by the main consumer thread and make sure the consumer notices that
  consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
}

然后,调用 KafkaConsumerThread.setOffsetsToCommit:  将待提交的 offset 放到 kafka 的消费线程对于的属性 nextOffsetsToCommit 中,等待下一个消费循环提交。

void setOffsetsToCommit(
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
      @Nonnull KafkaCommitCallback commitCallback) {

    // 把待提交的 offsetsToCommit 放到 nextOffsetsToCommit 中,供 kafka 的消费线程来取
    // 返回值不为 null,说明上次的没提交完成
    // record the work to be committed by the main consumer thread and make sure the consumer notices that
    if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
      log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
          "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
          "This does not compromise Flink's checkpoint integrity.");
    }

    // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
    handover.wakeupProducer();

    synchronized (consumerReassignmentLock) {
      if (consumer != null) {
        consumer.wakeup();
      } else {
        // the consumer is currently isolated for partition reassignment;
        // set this flag so that the wakeup state is restored once the reassignment is complete
        hasBufferedWakeup = true;
      }
    }
  }

然后就到了kafka 消费的线程,KafkaConsumerThread.run 方法中:  这里是消费 kafka 数据的地方,也提交对应消费组的offset

@Override
  public void run() {
    ...

      this.consumer = getConsumer(kafkaProperties);

    ....
      // 循环从kafka poll 数据
      // main fetch loop 
      while (running) {
        // 这里就是提交 offset 的地方了
        // check if there is something to commit
        if (!commitInProgress) {

          // nextOffsetsToCommit 就是 那边线程放入 offset 的对象了

          // get and reset the work-to-be committed, so we don't repeatedly commit the same
          final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
              nextOffsetsToCommit.getAndSet(null);

          // 如果取出commitOffsetsAndCallback 不为空,就异步提交 offset 到kafka
          if (commitOffsetsAndCallback != null) {
            log.debug("Sending async offset commit request to Kafka broker");

            // also record that a commit is already in progress
            // the order here matters! first set the flag, then send the commit command.
            commitInProgress = true;
            consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
          }
        }

       ... 
        // get the next batch of records, unless we did not manage to hand the old batch over
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          }
          catch (WakeupException we) {
            continue;
          }
        }

        ...
      }

到这里就能看到 flink 的offset 提交到了 kafka 中。

2 开启 checkpoint & 禁用 commit on checkpoint

这是启动 checkpoing kafka consumer offset 提交的默认行为,现在看下,关闭在 checkpoint 的时候提交:

  • 先关闭 commitOnCheckpoints
val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), Common.getProp)
kafkaSource.setCommitOffsetsOnCheckpoints(false)

对应方法代码:

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
  // enableCommitOnCheckpoints 的默认值是 true
  this.enableCommitOnCheckpoints = commitOnCheckpoints;
  return this;
}

警告: 如果启用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消费组的 offset 不会提交到 kafka,也就是说: 消费组的 offset 是不会有变化的。

如下 CURRENT-OFFSET 是不会变化的:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka_offset 0 4172 4691 519 - - -

官网: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

3 不开启 checkpoint 模式

  • 禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能
  • 要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值
prop.setProperty("enable.auto.commit", "true")
prop.setProperty("auto.commit.interval.ms", "15000")

X 参考文献

标签:offsets,Flink,kafka,checkpoint,检查点,提交,offset,commit,Kafka
From: https://www.cnblogs.com/johnnyzen/p/17813374.html

相关文章

  • Kafka常用命令
    Kafka实操命令kafka版本:scala2.11,kafka1.1.0kafka_2.11-1.1.0.jarKafka命令行操作1)查看当前服务器中的所有topickafka-topics.sh--zookeeperhadoop111:2181/kafka--list2)创建topickafka-topics.sh--zookeeperhadoop111:2181/kafka--create--replication-factor3......
  • Flink之输出算子Data Sink
    DataSink在ApacheFlink中,输出算子(DataSink)用于将数据流发送到外部系统或存储介质中,如数据库、消息队列、文件系统、ApacheKafka等,以便进行后续的持久化、分析或其他操作。输出算子是数据流处理的最后一步,它决定了数据的最终去向。Flink提供了各种内置的输出算子,可支持许多......
  • Flink 数据集成服务在小红书的降本增效实践
    摘要:本文整理自实时引擎研发工程师袁奎,在FlinkForwardAsia2022数据集成专场的分享。本篇内容主要分为四个部分:小红书实时服务降本增效背景Flink与在离线混部实践实践过程中遇到的问题及解决方案未来展望一、小红书实时服务降本增效背景1.1小红书Flink使用场景特点小红书的......
  • Flink_state 的优化与 remote_state 的探索
    摘要:本文整理自bilibili资深开发工程师张杨,在FlinkForwardAsia2022核心技术专场的分享。本篇内容主要分为四个部分:相关背景state压缩优化Remotestate探索未来规划一、相关背景1.1业务概况从业务规模来讲,B站目前大约是4000+的Flink任务,其中95%是SQL类型。从部署模......
  • springboot第44集:Kafka集群和Lua脚本
    servers:Kafka服务器的地址。这是Kafka集群的地址,生产者将使用它来发送消息。retries:在消息发送失败时,生产者将尝试重新发送消息的次数。这个属性指定了重试次数。batchSize:指定了生产者在发送消息之前累积的消息大小(以字节为单位)。一次性发送多个消息可以提高性能。linger:指定了生......
  • java——kafka随笔——broker&主题-topic&分区-partition理解
                  首先,让我们来看一下基础的消息(Message)相关术语:名称解释Broker消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群TopicKafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都......
  • 全网最详细4W字Flink全面解析与实践(下)
    本文已收录至GitHub,推荐阅读......
  • 全网最详细4W字Flink全面解析与实践(上)
    本文已收录至GitHub,推荐阅读......
  • 17、Flink 之Table API_ Table API 支持的操作(2)
    (文章目录)本文通过示例介绍了如何使用tableapi进行表的联接、排序、insert、groupwindow、overwindow、以及基于行的操作,每个示例都是经过验证的、可运行的,并将运行结果展示在输出后面。关于如何使用tableapi进行表、视图、窗口函数的操作,同时也介绍了tableapi对表的查询......
  • Kafka反序列化RCE漏洞(CVE-2023-34040)
    漏洞描述SpringKafka是SpringFramework生态系统中的一个模块,用于简化在Spring应用程序中集成ApacheKafka的过程,记录(record)指Kafka消息中的一条记录。受影响版本中默认未对记录配置 ErrorHandlingDeserializer,当用户将容器属性 checkDeserExWhenKeyNull 或 chec......