消息队列在日常开发中比较常用的开发中间件,每家大厂一般都会具有自己的消息队列服务器。本文主要讲述Python中如何使用RocketMQ的相关SDK。希望大家在阅读本文前可以先了解一下RocketMQ的基本知识。
使用 pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple 可以下载到rocketmq所需要的包(需要注意到的是RocketMQ是基于java写的C/S架构服务,因此我们这安装的仅仅是客户端,也就是能够连接到远程的RocketMQ服务器)。
1、消费者
看了前面“基本知识”之后,我们知道消费者消费数据的方式有两种:1、主动调用从Broker服务器拉消息(pull)来使用,主动权在于程序。2、被动接收Broker服务器推送(push)的消息,主动权在Broker服务器,一般是Brocker接收到了消息就会可以传导给消费者(也就是我们应用)。使用pull模式时,需要自己编写逻辑来循环调用pull方法,并且处理可能出现的异常情况,例如网络问题或者队列暂时没有消息可供消费的情况。因此我们一般是使用push模式,被动的接受Borker的数据并这是回调函数(接受消息后对消息的处理函数)返回给Borker这条消息消费情况。
消费数据我们需要明确,我们能够接收到数据,需要有几个必要的步骤:
- 创建消费者对象,创建时需要指明 消费组名称
- 与RocketMQ的服务器建立连接。
- 连接后需要进行身份验证,可以选择账户密码验证
- 指定消费者订阅哪个Topic的消息,并且指定消息的回调函数(回调函数需要返回消费状态:CONSUME_SUCCESS,RECONSUME_LATER)
完成上述操作之后,我们就可以使用消费者进行消费了,下面给出了两种不同消费类型的消费者代码模板。
1.1、Push消费者
import time from rocketmq.client import PushConsumer, ConsumeStatus # 设置回调函数来处理消息 def call_back(msg): # 需要使用msg.body来获取内容 print(f"Received message: {msg.body.decode('utf-8')}") # 在这里编写您的消息处理逻辑 # ... # 如果消息处理成功,返回CONSUME_SUCCESS # 如果消息处理失败,返回RECONSUME_LATER return ConsumeStatus.CONSUME_SUCCESS # 初始化消费者 consumer = PushConsumer('your_consumer_group') # 使用 IP 和端口名称设置服务器地址 consumer.set_name_server_address("127.0.0.1:9887") consumer.set_session_credentials("access_key", "secret_key", 'authChannel') # 完成设置验证 # 订阅Topic和过滤信息 consumer.subscribe('topic名称', call_back, '*') # 启动消费者, 此时会启动新的现成, 消费者将一直运行, 直到主进程被停止 consumer.start() # 在实际应用中,您可能需要添加逻辑来优雅地关闭消费者 try: while True: time.sleep(1000) except KeyboardInterrupt: pass # 停止消费者 consumer.shutdown()
我们需要注意到的是consumer.start()会开启新的线程持续监听Brocker是否推送了新的消息,这个消费线程需要一直被进行,因此这里使用了while True循环保证主进程不结束(消费线程也就不会被kill,就算消费数据时出现了异常也会继续监听)。
1.2、Pull消费者
pull消费者与push消费者不同的是,不需要一直开启一个线程去监听,而是由程序作为主动方主动去获取未消费数据
from rocketmq.client import PullConsumer # 初始化消费者(需要传入消费组名称) consumer = PullConsumer('your_consumer_group') # 使用 IP 和端口名称设置服务器地址 consumer.set_name_server_address("127.0.0.1:9887") consumer.set_session_credentials("access_key", "secret_key", 'authChannel') # 完成设置验证 # 启动消费者 consumer.start() # 从topic中拉取还未被消费的数据(consumer.pull返回的是可迭代对象) for msg in consumer.pull('YOUR-TOPIC'):
# 消费数据的逻辑在这里 print(msg.id, msg.body) # 停止消费者 consumer.shutdown()
对于消费者需要注意的是:一个同名消费组只能监听一个topic,因此如果想要使用多个topic的消息,需要创建不同名的消费者。
2、生产者
对于生产者,其实与消费者类似也需要那四步必需步骤。只不过发送的信息需要使用Message对象来指定对应的topic,还需要注意的是传递的数据必须是字符串,因此我们传递对象数据时需要用json.dumps转换一下。
import json from rocketmq.client import Producer from rocketmq.client import Message # 初始化生产者 producer = Producer('your_producer_group') # 设置NameServer地址 producer.set_namesrv_addr('nameserver的address') producer.set_session_credentials("access_key", "secret_key", 'authChannel') # 完成设置验证 # 启动生产者 producer.start() # 构建消息(需要将消息发送到指定Topic) message = Message('your_topic') # 设置消息内容(注意消息只能是字符串) msg_body = {"id":"test_id","name":"test_name","message":"test_message"} message.set_body(json.dumps(msg_body).encode('utf-8')) # 可以设置其他属性,如Tags、Keys等 # message.set_tags('your_tag') # message.set_keys('your_key') try: # 发送消息(此处就会发送消息到RocketMQ服务器) ret = producer.send(message) # 打印发送结果 if ret.status == Message.SendStatus.OK: print("发送成功") else: print(f"发送失败, 消息状态: {ret.status}") except Exception as e: # 处理发送过程中可能出现的异常 print(f"Send message failed, exception: {e}") finally: # 停止生产者 producer.shutdown()
同一个生产者对象可以发送多个Message对象,也就是说同一个生产者对象可以向多个topic发送信息。需要注意的是Message对象被发送后,它通常不会被重新使用来发送其他消息,如果你想对同一个topic发送多个数据需要创建新的Message对象。
标签:set,消费者,Python,队列,需要,消息,message,consumer,RocketMQ From: https://www.cnblogs.com/CircleWang/p/18072775