首页 > 其他分享 >Kafka消息过期与清理策略深入研究

Kafka消息过期与清理策略深入研究

时间:2023-09-23 10:05:28浏览次数:26  
标签:log 过期 Kafka 深入研究 消息 props CONFIG

背景

Kafka是一个高性能、高可靠、分布式的消息队列系统,被广泛应用于大数据领域。在Kafka中,消息的过期与清理是一个非常重要的问题,本文将深入探讨Kafka中的消息过期与清理策略。

Kafka消息过期

在Kafka中,消息的过期是通过消息的时间戳(timestamp)来实现的。Kafka支持两种时间戳:消息创建时间戳(create time)和消息日志追加时间戳(log append time)。其中,消息创建时间戳是消息在生产者端创建的时间,而消息日志追加时间戳是消息在Kafka Broker上被追加到日志中的时间。

Kafka中的消息过期是通过Broker端的配置参数log.retention.mslog.retention.bytes来实现的。其中,log.retention.ms表示消息在Kafka中存储的最长时间,单位为毫秒;log.retention.bytes表示Kafka中存储消息的总大小,单位为字节。当消息的时间戳超过log.retention.ms或者消息的总大小超过log.retention.bytes时,消息将被删除。

Kafka消息清理

Kafka中的消息清理是通过Kafka Broker的日志压缩(log compaction)机制来实现的。在Kafka中,每个Topic都有一个或多个Partition,每个Partition都对应一个日志文件(log file),其中包含了该Partition中所有消息的日志。Kafka Broker会定期对每个Partition的日志文件进行压缩,将相同Key的消息合并成一条消息,并保留最新的一条消息。这样可以有效地减少磁盘空间的占用,并且保证消息的可靠性。

Kafka中的日志压缩是通过Broker端的配置参数log.cleanup.policylog.cleaner.xxx来实现的。其中,log.cleanup.policy表示日志清理策略,可以是deletecompactlog.cleaner.xxx表示日志压缩的相关配置参数,包括min.cleanable.dirty.ratiomin.compaction.lag.mssegment.ms等。

Kafka消息过期与清理的实现

Kafka中的消息过期与清理是通过Kafka Broker的LogCleaner线程来实现的。LogCleaner线程会定期扫描所有的Partition,检查其中的消息是否过期,如果过期则将其删除。同时,LogCleaner线程也会对每个Partition的日志文件进行压缩,以减少磁盘空间的占用。

下面是一个简单的Kafka Producer和Consumer的示例代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.*;

public class KafkaExample {

    private static final String TOPIC = "test";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String... args) throws Exception {
        runProducer();
        runConsumer();
    }

    private static void runProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
            producer.send(record);
        }

        producer.close();
    }

    private static void runConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s
", record.offset(), record.key(), record.value());
            }
        }
    }
}

总结

Kafka中的消息过期与清理是一个非常重要的问题,对于Kafka的性能和可靠性都有着重要的影响。本文深入探讨了Kafka中的消息过期与清理策略,并提供了实际的代码示例,希望能够对读者有所帮助。

标签:log,过期,Kafka,深入研究,消息,props,CONFIG
From: https://blog.51cto.com/u_16209833/7575731

相关文章

  • Kafka消息消费者位移存储性能测试
    背景Kafka是一个高性能、分布式的消息队列,被广泛应用于大数据领域。在Kafka中,消费者位移存储是非常重要的一部分,它记录了消费者消费消息的位置,以便在消费者宕机或者重启后能够继续消费未消费的消息。在实际应用中,消费者位移存储的性能对于Kafka的整体性能有着重要的影响。本文将......
  • Kafka 是如何管理消费位点的
    Kafka是如何管理消费位点的?https://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng==&mid=2651220012&idx=2&sn=1d5623daaf327f0688995565901bd63d&chksm=f2a32ac7c5d4a3d1ffe6ebe3d2fbf37cf92320a08aa6f0531989c48b0a72b19f4e94e09ccd75&mpshare=1&scene=1&s......
  • Redis怎么设置过期时间
    pexpire(Stringkey,longmilliseconds):设置n毫秒后过期。expireAt(Stringkey,longunixTime):设置某个时间戳后过期(精确到秒)。pexpireAt(Stringkey,longmillisecondsTimestamp):设置某个时间戳后过期(精确到毫秒)。persist(Stringkey):移除过期时间。setkvexseconds......
  • Kafka详解、Kafka集群搭建与使用
    Kafka详解、Kafka集群搭建与使用原创 凉兮 凉兮的运维日记 2023-09-2116:10 发表于北京收录于合集#docker6个#消息队列1个一、Kafka详解1.Kafka是什么Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统......
  • Flink的Checkpoint状态和Kafka Broker上的提交位点一致
    Flink的Checkpoint状态和KafkaBroker上的提交位点一致消息队列Kafka连接器_实时计算Flink版-阿里云帮助中心https://help.aliyun.com/zh/flink/developer-reference/kafka-connector消息队列Kafka更新时间:2023-09-1910:33:27  本文为您介绍如何使用消息队列Kaf......
  • Kafka怎么保证消息不丢失和重复消费
    (1)生产者发送消息采用异步回调发送,如果发送失败,我们可以通过回调获取消息信息,可以选择记录日志或者重试,同时生产者也可以设置消息重试机制。(2)采用broker的复制机制保证消息在broker中不丢失:开启生产者消息确认机制为all,这样的话,当生产者发送消息到了分区之后,不仅仅只在leader分区......
  • kafka如何保证消费的顺序性
    一个主题有多个分区,只有在一个分区内的消息才有顺序性,我们可以在发送消息时指定对应的分区号或者发送消息时按照相同的业务设置相同的key,通过对应key的hashcode值找到对应的分区,这样就能将消息放入一个分区从而保证消费的顺序性。......
  • 讲清楚 kafka 拉取消息的过程
    kafka是一个高吞吐的消息服务中间件,当然这一切都是有原因的,今天我从kafka拉取消息这个场景剖析下broker的实现。问题:kafkaconsumer在poll的时候传递了一个timeout的参数,broker是怎么处理这个参数的?如果leaderbroker有消息,肯定是立刻返回,如果没有呢,kafka应该是等......
  • vCenter证书过期解决方法
    vCenter证书过期解决方法 目录1概述  2详细操作步骤       2.1检查关键的STS证书是否过期并修复       2.2检查除STS证书外是否还有其余证书过期       2.3续订除STS和data-encipherment以外的证书       2.4续订data-encipherment证书......
  • 深入探讨Kafka消息时间戳与事件处理机制
    背景Kafka是一个高性能、分布式的消息队列系统,被广泛应用于大数据领域。在Kafka中,每个消息都有一个时间戳,用于表示消息的产生时间。在实际应用中,我们需要对消息进行处理,并根据时间戳进行相关的业务逻辑处理。本文将深入探讨Kafka消息时间戳与事件处理机制。Kafka消息时间戳在Kaf......