本文不涉及较难的操作,仅仅提供 常用的生产消息和消费消息的方式。
-- 好像也没啥花里胡哨的操作
1、准备
想要python调用rabbitMQ需要安装pika
,所有需要提前安装好pika
包
# 全局
pip install pika;
# 如果用的anaconda的上面那个装不上可以试试这个?
conda install pika;
2、代码
2.1、生产者
生产者采用
direct
模式
import pika
USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
WRITE_QUEUE='demo_write.queue' # 队列
WRITE_EXCHANGE='demo.exchange' # 交换机
ROUTING_KEY='demo' # routing-key
if __name__ == '__main__':
# 创建一个凭证
credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
# 创建一个连接
connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
# 建立连接并获取一个通道,
# 此处采用阻塞连接(这个方式最简单了,但是对于生产者没啥区别)
channel = pika.BlockingConnection(connection).channel()
# 创建交换机和队列,如果没有就会自动创建
# 如果已经创建的与当前定义的不一样会**报错**
# 此处durable表示是是否持久化
channel.exchange_declare(exchange=WRITE_EXCHANGE,durable=True,exchange_type=ExchangeType.direct)
channel.queue_declare(queue=WRITE_QUEUE,durable=True)
# 绑定
# 如果队列或交换机不存在**报错**
channel.queue_bind(queue=WRITE_QUEUE,exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY)
message='{"data":"这里是我的消息"}'
# 进行生产
writeConnection.basic_publish(exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY,body=message)
2.2、消费者
消费者采用
basic
模式
import pika
USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
READ_QUEUE='demo_read.queue' # 读取任务的队列名称(各个算法需要匹配对应的)
# 此时需要准备一个回调函数,参数不过多解释
def call_back(ch, method, properties, body):
# 获取一条消息(如果直接获取会是乱码)
message = str(body.decode('utf-8'))
# 处理逻辑
# 阿巴阿巴
# ack确认(确定接收成功后调用,不然消息会一直存在)
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == '__main__':
# 这些已经在上面解释过了
credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
# 此处采用阻塞连接
# 这个方式最简单了,当程序启动后会进行阻塞,当有消息来的时候就会进行消费,消费完成后在尝试获取下一个
channel = pika.BlockingConnection(connection).channel()
channel.queue_declare(queue=READ_QUEUE,durable=True)
# 消费设置
# 预读取数量
readConnection.basic_qos(prefetch_count=1)
# on_message_callback:回调函数名称
# auto_ack:是否自动ack
channel.basic_consume(queue=READ_QUEUE,on_message_callback=call_back,auto_ack=False)
# 开始消费
channel.start_consuming()
Other、实际使用可能出现的问题
A、json格式注意!
python与某个J开头的编程语言(java)通过mq交互时,json格式有规定
java这边采用
Jackson2JsonMessageConverter
模式接收消息
对于py
1、json的格式必须'{"a":"a"}'
而不是"{'a':'a'}"
2、如果json格式为\'{\"a\":\"a\"}\'
接收json需要调用两次json_loads()才会变为dict
类型,仅调用一次只是str
类型
B、104异常?
度娘:
消费某条消息太久写回通道以为程序废了就自动关闭了,然后尝试写回就挂了。
解决方案:创建连接的时候加上heartbeat=0
让他等多久都不要关闭
connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials,heartbeat=0)
感觉不太行,我采用的是消费超时机制和者重启机制
因为重启机制简单所以就没用消费超时机制
1、消费超时机制:消费时通过指定的算法逻辑让消费时间过长时(<断开连接时间)自动停止任务并丢到人工队列去
2、重启机制:就是字面意思重启,有时候程序可能是调度,资源不足等就会导致调用突发性的很慢很慢,此时重启程序可以改善问题。
C、挂起太久不干活了?
当我使用java开启线程去启动mq程序时,执行一段时间后会突然不干活了。原因定位到就是B(上面那个)问题。这个错误报错竟然不会停止任务(不知道是不是我在python端也捕获了异常的原因)
解决方案:java端进行循环启动,程序停止后自动重启。python端采用定时关闭功能。
注意:重启方法必须在python使用
os.exit(0)
,正常通过手动停止mq消息监听关不掉,java端直接结束线程有几率关不掉。
D、开启多个任务实际干活速度没有提上来?
抛开实际的运行瓶颈,资源数量,有可能是没有设置预读取数量。默认预读取数量时全部,这样会导致一个mq任务启动后直接读取全部消息而其他任务无消息可读。
# 就是这句话
readConnection.basic_qos(prefetch_count=1)
# 加在这段代码上面就可以了
# channel.basic_consume()
# channel.start_consuming()
标签:__,调用,pika,python,RabbitMQ,queue,credentials,channel
From: https://www.cnblogs.com/musiro/p/17605355.html