首页 > 其他分享 >深入探讨Kafka消息时间戳与事件处理机制

深入探讨Kafka消息时间戳与事件处理机制

时间:2023-09-20 18:36:39浏览次数:35  
标签:事件处理 timestamp 深入探讨 Kafka record 时间 消息 props

背景

Kafka是一个高性能、分布式的消息队列系统,被广泛应用于大数据领域。在Kafka中,每个消息都有一个时间戳,用于表示消息的产生时间。在实际应用中,我们需要对消息进行处理,并根据时间戳进行相关的业务逻辑处理。本文将深入探讨Kafka消息时间戳与事件处理机制。

Kafka消息时间戳

在Kafka中,每个消息都有一个时间戳,可以通过以下代码获取:

ConsumerRecord<String, String> record = ...;
long timestamp = record.timestamp();

Kafka消息时间戳有两种类型:

  • 创建时间(create time):表示消息被创建的时间。
    • 日志追加时间(log append time):表示消息被追加到Kafka日志的时间。 默认情况下,Kafka使用创建时间作为消息时间戳。但是,可以通过配置来将日志追加时间作为消息时间戳。
# 将日志追加时间作为消息时间戳
log.message.timestamp.type=LogAppendTime

事件处理机制

在实际应用中,我们需要对Kafka消息进行处理,并根据时间戳进行相关的业务逻辑处理。一种常见的场景是根据时间戳进行数据清洗。

以下是一个简单的示例,演示如何根据时间戳进行数据清洗:

public class DataCleaner {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                long timestamp = record.timestamp();
                String data = record.value();
                if (timestamp < System.currentTimeMillis() - 24 * 60 * 60 * 1000) {
                    // 数据清洗
                    data = data.replaceAll("\d+", "");
                }
                System.out.printf("offset = %d, key = %s, value = %s, timestamp = %d
", record.offset(), record.key(), data, timestamp);
            }
        }
    }
}

在上述示例中,我们通过KafkaConsumer消费消息,并根据时间戳进行数据清洗。如果消息的时间戳早于当前时间24小时,则将消息中的数字全部替换为空字符串。

总结

Kafka消息时间戳是一个非常重要的概念,可以帮助我们实现更加精细化的事件处理机制。在实际应用中,我们需要根据业务需求选择合适的时间戳类型,并根据时间戳进行相关的业务逻辑处理。

标签:事件处理,timestamp,深入探讨,Kafka,record,时间,消息,props
From: https://blog.51cto.com/u_16266017/7541880

相关文章

  • 进击消息中间件系列(一):Kafka 入门(基本概念与架构)【转】
    在这之前,我们相继卷完了:关系型数据库 MySQL 、NoSQL数据库 Redis 、 MongoDB 、搜索引擎 ElasticSearch 、大数据 Hadoop框架、PostgreSQL数据库这些系列的知识体系。今天开始,我们将踏上另一个学习之路:中间件!第一个要学习的中间件就是:Kafka。消息队列介绍传统消息队......
  • 浅析 kafka 的 DelayedOperation
    在kafkabroker内部,当执行一些需要等待的任务时(比如broker处理producer的消息,需要等待消息同步到其他副本),会使用到 DelayedOperationPurgatory和 DelayedOperation,大致流程如下图:顶层的类是 DelayedOperationPurgatory,它内部包含2个重要的属性:WatcherList的数组,哈......
  • embeddedkafka 方便测试的基于内存的kafka 实现
    embeddedkafka方便测试的基于内存的kafka实现,可以用来方便的进行基于kafka周边的测试目前embeddedkafka提供了stream,core,conenct以及schema-registry,对于测试场景是一个不错的选择参考资料https://github.com/embeddedkafka/embedded-kafka-schema-registryhttps://github.c......
  • Vue-js循环方式、v-model的使用、事件处理、表单控制、购物车案例
    js循环方式在es6语法中:(以后尽量少用var有很多坑)-let:定义变量-const:定义常量1.方式一:for循环,基于索引的循环<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>Title</title><scriptsrc=".......
  • Vue之js循环方式、v-model 的使用、事件处理、表单控制、购物车案例、v-model修饰符
    js循环方式<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>js循环方式</title><scriptsrc="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.4/jquery.js"></sc......
  • 一文告诉你为什么时序场景下 TDengine 数据订阅比 Kafka 好
    在TDengine3.0中,我们对流式计算、数据订阅功能都进行了再升级,帮助用户极大简化了数据架构的复杂程度,降低整体运维成本。TDengine提供的类似消息队列产品的数据订阅、消费接口,本质上是为了帮助应用实时获取写入TDengine的数据,或者以事件到达顺序处理数据,与其他消息队列相比,它......
  • 1.5万字长文:从 C# 入门 Kafka
    目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Kafka脚本工具主题管理使用C#创建分区分区与复制生产者消费者修改配置3,Kafka.NET基础生产者批量生产使用Tasks.WhenAll如何进行性能测试消费4,生产者连接BrokerK......
  • js循环方式、v-model、事件处理、表单控制、购物车案例
    js循环方式js循环for(),基于索引的循环let:es6语法,用于定义变量const:用于定义常量var以后尽量少用、for循环写法一: for循环写法二: 列表循环 循环方式二:in循环基于迭代的循环,依赖于索引取值直接console.log是索引值,只有list[i]才是要取的值 循环方式三:of循环......
  • Kafka监控&故障恢复
    监控Kafka集群Kafka集群的监控是确保其正常运行和性能优化的关键步骤。下面列出了一些常用的方法和工具来监控Kafka集群:JMX监控:Kafka提供了JMX(JavaManagementExtensions)接口,可以通过JMX来监控和管理Kafka集群。您可以使用JConsole、JavaMissionControl等工具连接到KafkaBroker......
  • 深入探讨Spring Cloud Config的分布式事件
    介绍SpringCloudConfig是一个分布式配置管理工具,它可以将应用程序的配置集中管理,并提供了RESTAPI和Web界面来访问这些配置。在分布式系统中,配置管理是非常重要的,因为它可以帮助我们快速地修改应用程序的配置,而不需要重新部署应用程序。在本文中,我们将深入探讨SpringCloudConf......