同步模式
from rocketmq.client import PushConsumer, ConsumeStatus
import time
def callback(msg):
print(msg.id, msg.body, msg.get_property('property'))
return ConsumeStatus.CONSUME_SUCCESS
def start_consume_message():
consumer = PushConsumer('consumer_group')
consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('TopicTest', callback)
print ('start consume message')
consumer.start()
while True:
time.sleep(3600)
if __name__ == '__main__':
start_consume_message()
异步模式
import asyncio
from rocketmq.client import AsyncPushConsumer
async def callback(msg):
print(msg.id, msg.body, msg.get_property('property'))
return True # Return True to indicate consume success
async def start_consume_message():
consumer = AsyncPushConsumer('consumer_group')
consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('TopicTest', callback)
print('Start consume message')
await consumer.start()
# Keep the event loop running indefinitely
while True:
await asyncio.sleep(3600) # Sleep for 1 hour (3600 seconds)
if __name__ == '__main__':
asyncio.run(start_consume_message())
标签:__,异步,consume,Python,start,msg,message,consumer,rocketMq
From: https://www.cnblogs.com/guanchaoguo/p/18334100