首页 > 其他分享 >kafka 按时间戳消费

kafka 按时间戳消费

时间:2022-10-18 22:06:34浏览次数:86  
标签:消费 end offsets partition offset kafka 时间 time consumer

步骤

  • 获取当前 topic 的分区列表
  • 利用 offsets_for_times() + 时间戳查找给定分区的偏移量,如:找到开始时间的偏移量
  • 循环每个分区,设置偏移量
  • 根据 end_offset 或结束时间退出
import json

from kafka import KafkaConsumer, TopicPartition

TOPIC = ""
BROKER = ""
GROUP_ID = ""

consumer = KafkaConsumer(bootstrap_servers=BROKER,
                         group_id=GROUP_ID,
                         api_version=(1, 1, 0),
                         max_poll_records=100,  # 每次最大消费数量
                         connections_max_idle_ms=900000,
                         auto_offset_reset='earliest',
                         consumer_timeout_ms=1000,
                         enable_auto_commit=True,  # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
                         auto_commit_interval_ms=5000,
                         sasl_mechanism="",
                         sasl_plain_username="",
                         sasl_plain_password="")


def main():
    start_time, end_time = 1666097095932, 1666098095932

    partition_list = consumer.partitions_for_topic(TOPIC)  # 获取 topic 分区 id 集合,当前 topic存在多少个分区
    assigned_topics = [TopicPartition(topic=TOPIC, partition=index) for index in partition_list]

    consumer.assign(assigned_topics)  # 手动将 TopicPartitions 列表分配给次消费者

    partitions = consumer.assignment()  # 获取当前分配给该消费者的 TopicPartitions

    partition_start_timestamp = {part: start_time for part in partitions}
    partition_end_timestamp = {part: end_time for part in partitions}

    # 根据时间戳查找给定分区的偏移量,这里分别查了 start_time 和 end_time {TopicPartition: OffsetAndTimestamp}
    mapping = consumer.offsets_for_times(partition_start_timestamp)
    # end_offsets = consumer.end_offsets(list(partition_end_timestamp.keys()))
    end_offsets = consumer.offsets_for_times(partition_end_timestamp)

    # 按分区消费 {"TopicPartition(topic="", partition=5): OffsetAndTimestamp(offset=26616345, timestamp=1663560125950)}
    for partition, ts in mapping.items():
        # end_offset = end_offsets.get(partition)
        consumer.seek(partition, ts[0])  # 每个分区设置偏移量
        for msg in consumer:
            value = json.loads(msg.value.decode('utf-8'))

            # 方法一:按时间戳来退出
            if msg.timestamp >= end_time:
                consumer.close()
                break

            # # 方法二:使用 end_offset 来退出
            # if msg.offset == end_offset - 1:
            #     consumer.close()
            #     break


if __name__ == '__main__':
    main()

参考:get results from kafka for a specific period of time

标签:消费,end,offsets,partition,offset,kafka,时间,time,consumer
From: https://blog.51cto.com/u_13942374/5768231

相关文章

  • 技术分享| 消息队列Kafka群集部署
    一、简介1、介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志......
  • Dinky的使用——kafka2mysql
    需求:通过在kafka的topic里面传入json串,再把数据同步到mysql中,这个也可以作为半结构化数据同步的案例一、添加依赖包将依赖包放到dinky的pulgins目录和flink的lib目录下,并......
  • Dubbo——时间轮(Time Wheel)算法应用
    定时任务Netty、Quartz、Kafka以及Linux都有定时任务功能。 JDK自带的java.util.Timer和DelayedQueue可实现简单的定时任务,底层用的是堆,存取复杂度都是O(nlog(......
  • String类型时间转Long类型时间戳
    转:String类型时间转Long类型时间戳 String转Long的两种方法1、Long.valueOf("String")返回Long包装类型2、Long.parseLong("String")返回long基本数据类型String类型......
  • 自动生成模拟数据发至kafka topic
    自动生成一下json数据脚本json数据样例{"provinceCode":"290","companyName":"test","appId":"10","appName":"apptest","eventTime":"2022-10-1709:52:","errorTy......
  • flink sql kafka数据接入clickhouse
    --参数--并行度设置set'parallelism.default'='2';--resetexecution.savepoint.path;--resetexecution.checkpoint.path;--设置队列set'yarn.application.q......
  • 技术分享| 消息队列Kafka群集部署
    一、简介1、介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日......
  • 计算程序运行时间
    使用clock函数获得程序开始和结束的时间,相减就能得到程序运行的时间。clock()是C/C++中的计时函数,而与其相关的数据类型是clock_t#include<stdio.h>#include<time.h>#incl......
  • flink sql kafka数据接入mysql
    --定义source表droptableIFEXISTSsource_applet_kafka;CREATETABLEIFNOTEXISTSsource_applet_kafka(provinceCodeString,companyNameString,appIdStri......
  • 时间同步
    时间同步命令Crontab时间同步方式:选一台机器,作为时间服务器,所有的机器与这台机器进行定时的同步,如每隔十分钟,同步一次时间。定时如何实现?通过crontab命令实现。下面讲......