首页 > 编程语言 >Python脚本消费多个Kafka topic

Python脚本消费多个Kafka topic

时间:2024-11-20 23:20:52浏览次数:1  
标签:消费 Python Kafka topic offset message consumer

在Python中消费多个Kafka topic,可以使用kafka-python库,这是一个流行的Kafka客户端库。以下是一个详细的代码示例,展示如何创建一个Kafka消费者,并同时消费多个Kafka topic。

1.环境准备

(1)安装Kafka和Zookeeper:确保Kafka和Zookeeper已经安装并运行。

(2)安装kafka-python库:通过pip安装kafka-python库。

bash复制代码

pip install kafka-python

2.示例代码

以下是一个完整的Python脚本,展示了如何创建一个Kafka消费者并消费多个topic。

from kafka import KafkaConsumer
import json
import logging
 
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
 
# Kafka配置
bootstrap_servers = 'localhost:9092'  # 替换为你的Kafka服务器地址
group_id = 'multi-topic-consumer-group'
topics = ['topic1', 'topic2', 'topic3']  # 替换为你要消费的topic
 
# 消费者配置
consumer_config = {
    'bootstrap_servers': bootstrap_servers,
    'group_id': group_id,
    'auto_offset_reset': 'earliest',  # 从最早的offset开始消费
    'enable_auto_commit': True,
    'auto_commit_interval_ms': 5000,
    'value_deserializer': lambda x: json.loads(x.decode('utf-8'))  # 假设消息是JSON格式
}
 
# 创建Kafka消费者
consumer = KafkaConsumer(**consumer_config)
 
# 订阅多个topic
consumer.subscribe(topics)
 
try:
    # 无限循环,持续消费消息
    while True:
        for message in consumer:
            topic = message.topic
            partition = message.partition
            offset = message.offset
            key = message.key
            value = message.value
 
            # 打印消费到的消息
            logger.info(f"Consumed message from topic: {topic}, partition: {partition}, offset: {offset}, key: {key}, value: {value}")
 
            # 你可以在这里添加处理消息的逻辑
            # process_message(topic, partition, offset, key, value)
 
except KeyboardInterrupt:
    # 捕获Ctrl+C,优雅关闭消费者
    logger.info("Caught KeyboardInterrupt, closing consumer.")
    consumer.close()
 
except Exception as e:
    # 捕获其他异常,记录日志并关闭消费者
    logger.error(f"An error occurred: {e}", exc_info=True)
    consumer.close()

3.代码解释

(1)日志配置:使用Python的logging模块配置日志,方便调试和记录消费过程中的信息。

(2)Kafka配置:设置Kafka服务器的地址、消费者组ID和要消费的topic列表。

(3)消费者配置:配置消费者参数,包括自动重置offset、自动提交offset的时间间隔和消息反序列化方式(这里假设消息是JSON格式)。

(4)创建消费者:使用配置创建Kafka消费者实例。

(5)订阅topic:通过consumer.subscribe方法订阅多个topic。

(6)消费消息:在无限循环中消费消息,并打印消息的详细信息(topic、partition、offset、key和value)。

(7)异常处理:捕获KeyboardInterrupt(Ctrl+C)以优雅地关闭消费者,并捕获其他异常并记录日志。

4.运行脚本

确保Kafka和Zookeeper正在运行,并且你已经在Kafka中创建了相应的topic(topic1topic2topic3)。然后运行脚本:

bash复制代码

python kafka_multi_topic_consumer.py

这个脚本将开始消费指定的topic,并在控制台上打印出每条消息的详细信息。你可以根据需要修改脚本中的处理逻辑,比如将消息存储到数据库或发送到其他服务。

5.参考价值和实际意义

这个示例代码展示了如何在Python中使用kafka-python库消费多个Kafka topic,适用于需要处理来自不同topic的数据流的场景。例如,在实时数据处理系统中,不同的topic可能代表不同类型的数据流,通过消费多个topic,可以实现数据的整合和处理。此外,该示例还展示了基本的异常处理和日志记录,有助于在生产环境中进行调试和监控。

标签:消费,Python,Kafka,topic,offset,message,consumer
From: https://www.cnblogs.com/TS86/p/18559602

相关文章

  • RocketMQ多个消费组消费同一个topic,其中有一个组正常消费,其余均异常堆积
    @Service@RocketMQMessageListener(consumerGroup="${rocketmq.consumer.group}",topic="${rocketmq.consumer.topic}")publicclassMsgListenerimplementsRocketMQListener<MessageExt>,RocketMQPushConsumerLifecycleListener{priv......
  • Centos编译Python3.10解决openssl异常
    问题描述在Linux中进行Python应用部署时,安装Python3.10后,在pip安装依赖出现SSLError异常。(venv)[root@server100flask-app]#pipinstallflaskWARNING:pipisconfiguredwithlocationsthatrequireTLS/SSL,howeverthesslmoduleinPythonisnotavailable.Looki......
  • python | 结合__dict__理解__getattr__函数的使用
    理解__getattr__函数什么是__getattr__工作流程什么是__dict__三者:__dict__、__dir__()、dir(obj)`obj.__dict__`示例代码`obj.__dir__()`示例代码`dir(obj)`示例代码区别什么是__getattr__看一段代码classMyClass():class_attribute="Iamaclas......
  • 人工智能——Python 基础
    学习人工智能(AI)需要一定的Python基础,因为Python是AI领域最广泛使用的编程语言之一。以下是Python基础知识的总结以及如何应用这些知识进入AI领域的学习:1.Python基础语法1.1打印输出打印是Python的基础功能,用于输出内容到屏幕:print("Hello,AIWorld!")1......
  • python: Serialize and Deserialize complex JSON using jsonpickle
     #encoding:utf-8#版权所有2024©涂聚文有限公司#许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎#SerializeandDeserializecomplexJSONinPython#描述:pipinstalljsonpicklehttps://github.com/jsonpickle/jsonpickle#Author:geovindu,......
  • 自学习python之函数1
    函数:python的乐高积木,灵活即强大函数的调用函数的参数可以任意多个函数的返回值(return)形参(parameter):函数定义过程中的'n'是叫形参,只是一个形式,表示占据一个参数位置实参(argument):传递进来的'6'叫做实参,因为它是具体的参数值关键字参数默认参数收集参......
  • 如何运行python脚本
    平时刷misc时会遇到需要用脚本的题目,之前都是直接跳过不做的(刚刚接触时看着那些脚本就烦),前几天才做了到需要用脚本的题,网上misc的WP都直接给的脚本,第一次做根本不知道怎么开始,最后还是问AI做出来的,原本以为自己能记住怎么弄的了,结果今天又遇到了一题需要运行脚本的,发现自己忘了......
  • python+requests
    python+request一、介绍request库(1)requests是用python语言编写的简单易用的http库,用来做接口测试的库;(2)接口测试自动化库有哪些?requests、urllib、urllib2、urllib3、httplib等(最受欢迎的是requests)(3)安装request库方式一:dos下pip:命令:pipinstallrequests方法二:pycharm......
  • Python常见Error
    https://stackoverflow.com/questions/48399498/git-executable-not-found-with-gitpython-bad-git-executableImportError:cannotimportname'iterative'frompartiallyinitializedmodule'scipy.sparse.linalg.isolve'(mostlikelyduetoaci......
  • Python小白学习教程从入门到入坑------习题课3(基础巩固)
    目录一、选择题二、实战题2.1实战一:从键盘获取一个4位整数,分别输出个位、十位、百位、千位上的数字2.2实战二:根据父母身高预测儿子的身高一、选择题1、以下哪项不是Python语言的保留字符(C)A.FalseB.andC.trueD.if【解析】:true不是Python的保留字,正确的布......