在使用 Python RocketMQ 的 PushConsumer 进行消息消费时,需要进行以下步骤:
- 创建 PushConsumer
from rocketmq.client import PushConsumer, ConsumeStatus
consumer = PushConsumer("consumer_group_name")
consumer.set_name_server_address("localhost:9876")
- 注册消息处理函数
def message_listener(msg): print(msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer.subscribe("topic_name", "*", message_listener)
以上代码中,message_listener 是自定义的消息处理函数。当消费者成功消费一条消息时,需要返回 ConsumeStatus.CONSUME_SUCCESS(也可以返回其他值,如 CONSUME_LATER,代表稍后重新消费)。
- 开始消费消息
consumer.start()
以上代码将启动 PushConsumer,开始从指定的 RocketMQ 服务器上消费消息。
完整的 PushConsumer 代码示例:
from rocketmq.client import PushConsumer, ConsumeStatus consumer = PushConsumer("consumer_group_name") consumer.set_name_server_address("localhost:9876")
def message_listener(msg): print(msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer.subscribe("topic_name", "*", message_listener) consumer.start()
需要注意的是,PushConsumer 会在启动时自动向 NameServer 注册并订阅相关的 Topic 和 Tag。如果未开启自动创建 Topic 的功能,此时需要确保 Topic 已经存在。如果 Topic 不存在,可以使用 PushProducer 事先发送一条消息,让 RocketMQ 自动创建该 Topic。
标签:name,python,keep,listener,How,PushConsumer,message,consumer,ConsumeStatus From: https://www.cnblogs.com/LanTianYou/p/17251181.html