步骤
- 获取当前
topic
的分区列表 - 利用
offsets_for_times()
+ 时间戳查找给定分区的偏移量,如:找到开始时间的偏移量 - 循环每个分区,设置偏移量
- 根据
end_offset
或结束时间退出
import json
from kafka import KafkaConsumer, TopicPartition
TOPIC = ""
BROKER = ""
GROUP_ID = ""
consumer = KafkaConsumer(bootstrap_servers=BROKER,
group_id=GROUP_ID,
api_version=(1, 1, 0),
max_poll_records=100, # 每次最大消费数量
connections_max_idle_ms=900000,
auto_offset_reset='earliest',
consumer_timeout_ms=1000,
enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms=5000,
sasl_mechanism="",
sasl_plain_username="",
sasl_plain_password="")
def main():
start_time, end_time = 1666097095932, 1666098095932
partition_list = consumer.partitions_for_topic(TOPIC) # 获取 topic 分区 id 集合,当前 topic存在多少个分区
assigned_topics = [TopicPartition(topic=TOPIC, partition=index) for index in partition_list]
consumer.assign(assigned_topics) # 手动将 TopicPartitions 列表分配给次消费者
partitions = consumer.assignment() # 获取当前分配给该消费者的 TopicPartitions
partition_start_timestamp = {part: start_time for part in partitions}
partition_end_timestamp = {part: end_time for part in partitions}
# 根据时间戳查找给定分区的偏移量,这里分别查了 start_time 和 end_time {TopicPartition: OffsetAndTimestamp}
mapping = consumer.offsets_for_times(partition_start_timestamp)
# end_offsets = consumer.end_offsets(list(partition_end_timestamp.keys()))
end_offsets = consumer.offsets_for_times(partition_end_timestamp)
# 按分区消费 {"TopicPartition(topic="", partition=5): OffsetAndTimestamp(offset=26616345, timestamp=1663560125950)}
for partition, ts in mapping.items():
# end_offset = end_offsets.get(partition)
consumer.seek(partition, ts[0]) # 每个分区设置偏移量
for msg in consumer:
value = json.loads(msg.value.decode('utf-8'))
# 方法一:按时间戳来退出
if msg.timestamp >= end_time:
consumer.close()
break
# # 方法二:使用 end_offset 来退出
# if msg.offset == end_offset - 1:
# consumer.close()
# break
if __name__ == '__main__':
main()
标签:消费,end,offsets,partition,offset,kafka,时间,time,consumer From: https://blog.51cto.com/u_13942374/5768231