首页 > 编程语言 >python调用RabbitMQ

python调用RabbitMQ

时间:2023-08-04 11:44:18浏览次数:36  
标签:__ 调用 pika python RabbitMQ queue credentials channel

本文不涉及较难的操作,仅仅提供 常用的生产消息和消费消息的方式。
-- 好像也没啥花里胡哨的操作

1、准备

想要python调用rabbitMQ需要安装pika,所有需要提前安装好pika

# 全局
pip install pika;
# 如果用的anaconda的上面那个装不上可以试试这个?
conda install pika;

2、代码

2.1、生产者

生产者采用direct模式

import pika

USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
WRITE_QUEUE='demo_write.queue' # 队列
WRITE_EXCHANGE='demo.exchange' # 交换机
ROUTING_KEY='demo' # routing-key

if __name__ == '__main__':
	# 创建一个凭证
	credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
	# 创建一个连接
	connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
	# 建立连接并获取一个通道,
	# 此处采用阻塞连接(这个方式最简单了,但是对于生产者没啥区别)
	channel = pika.BlockingConnection(connection).channel()
	# 创建交换机和队列,如果没有就会自动创建
	# 如果已经创建的与当前定义的不一样会**报错**
	# 此处durable表示是是否持久化
	channel.exchange_declare(exchange=WRITE_EXCHANGE,durable=True,exchange_type=ExchangeType.direct)
	channel.queue_declare(queue=WRITE_QUEUE,durable=True)
	# 绑定
	# 如果队列或交换机不存在**报错**
channel.queue_bind(queue=WRITE_QUEUE,exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY)
	message='{"data":"这里是我的消息"}'
	# 进行生产
	writeConnection.basic_publish(exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY,body=message)

2.2、消费者

消费者采用basic模式

import pika

USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
READ_QUEUE='demo_read.queue' # 读取任务的队列名称(各个算法需要匹配对应的)

# 此时需要准备一个回调函数,参数不过多解释
def call_back(ch, method, properties, body):
    # 获取一条消息(如果直接获取会是乱码)
    message = str(body.decode('utf-8'))
    # 处理逻辑
    # 阿巴阿巴

    # ack确认(确定接收成功后调用,不然消息会一直存在)
    ch.basic_ack(delivery_tag=method.delivery_tag)

if __name__ == '__main__':
  # 这些已经在上面解释过了
  credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
  connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
  # 此处采用阻塞连接
  # 这个方式最简单了,当程序启动后会进行阻塞,当有消息来的时候就会进行消费,消费完成后在尝试获取下一个
  channel = pika.BlockingConnection(connection).channel()
  channel.queue_declare(queue=READ_QUEUE,durable=True)
  # 消费设置
  # 预读取数量
  readConnection.basic_qos(prefetch_count=1)
  # on_message_callback:回调函数名称
  # auto_ack:是否自动ack
  channel.basic_consume(queue=READ_QUEUE,on_message_callback=call_back,auto_ack=False)
  # 开始消费
  channel.start_consuming()

Other、实际使用可能出现的问题

A、json格式注意!

python与某个J开头的编程语言(java)通过mq交互时,json格式有规定

java这边采用Jackson2JsonMessageConverter模式接收消息

对于py
1、json的格式必须'{"a":"a"}'而不是"{'a':'a'}"
2、如果json格式为\'{\"a\":\"a\"}\'接收json需要调用两次json_loads()才会变为dict类型,仅调用一次只是str类型

B、104异常?

度娘:
消费某条消息太久写回通道以为程序废了就自动关闭了,然后尝试写回就挂了。
解决方案:创建连接的时候加上heartbeat=0让他等多久都不要关闭

  connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials,heartbeat=0)

感觉不太行,我采用的是消费超时机制和者重启机制
因为重启机制简单所以就没用消费超时机制
1、消费超时机制:消费时通过指定的算法逻辑让消费时间过长时(<断开连接时间)自动停止任务并丢到人工队列去
2、重启机制:就是字面意思重启,有时候程序可能是调度,资源不足等就会导致调用突发性的很慢很慢,此时重启程序可以改善问题。

C、挂起太久不干活了?

当我使用java开启线程去启动mq程序时,执行一段时间后会突然不干活了。原因定位到就是B(上面那个)问题。这个错误报错竟然不会停止任务(不知道是不是我在python端也捕获了异常的原因)
解决方案:java端进行循环启动,程序停止后自动重启。python端采用定时关闭功能。

注意:重启方法必须在python使用os.exit(0),正常通过手动停止mq消息监听关不掉,java端直接结束线程有几率关不掉。

D、开启多个任务实际干活速度没有提上来?

抛开实际的运行瓶颈,资源数量,有可能是没有设置预读取数量。默认预读取数量时全部,这样会导致一个mq任务启动后直接读取全部消息而其他任务无消息可读。

# 就是这句话
readConnection.basic_qos(prefetch_count=1)
# 加在这段代码上面就可以了
# channel.basic_consume()
# channel.start_consuming()

标签:__,调用,pika,python,RabbitMQ,queue,credentials,channel
From: https://www.cnblogs.com/musiro/p/17605355.html

相关文章

  • python 操作oracle
    表DDL--"C##TAPDATA_TEST".ALAM3definitionCREATETABLE"C##TAPDATA_TEST"."ALAM3"( "UID1"VARCHAR2(100)NOTNULLENABLE, "UID2"VARCHAR2(100)NOTNULLENABLE, "COLUMN1"VARCHAR2(100),......
  • Qt 在线程中invokeMethod采用QueuedConnection模式,调用带指针参数槽,实际不会调用
    widgetObject有操函数Test:voidTest(int*v);在线程中调用Test,会被忽略,实际不会调用。QMetaObject::invokeMethod(widgetObject,"Test",Qt::QueuedConnection,Q_ARG(int*,&v));下面是网上找的理由: 在同一个线程中当信号和槽都在同一个线程中时,值传递参数和引用传递参数有......
  • 希尔排序的Python实现,并逐行解释代码
    当然,我可以为您提供希尔排序的Python实现,并逐行解释代码。以下是一个示例:defshell_sort(arr):n=len(arr)gap=n//2#初始化间隔whilegap>0:foriinrange(gap,n):temp=arr[i]j=i#对间隔为gap......
  • Python爬虫遇到重定向问题解决办法汇总
    在进行Python爬虫任务时,遇到重定向问题是常见的问题之一。重定向是指在发送请求时,服务器会返回一个新的URL,将请求重新定向到该URL。为了帮助您解决这个问题,本文将提供一些实用的解决办法,并给出相关的代码示例,希望能对您的爬虫任务有所帮助。了解重定向问题重定向问题通常是由于网......
  • python教程 入门学习笔记 第6天 数据类型转换 字符串转换成数值 数值之间互转 其它类
    4、数据类型转换1)字符串转换成数值:int()-----------将值转换成整数float()-----------将值转换成小数str()-----------将值转换成字符串bool()-----------将值转换成布尔值例如:int()将值转换成整数s1="188"#字符串ns1=int(s1)#转换成整型数值print(ns1+8)#打印数......
  • 配置pytorch环境时出现的问题 Failed to load image Python extension
    安装了torch1.12.0+torchvision0.13.0+torchaudio0.12.0版本后,condainstallpytorch==1.12.0torchvision==0.13.0torchaudio==0.12.0cudatoolkit=11.3-cpytorch按照《动手学深度学习》输入 fromd2limporttorchasd2l命令,跳出警告UserWarning:Failed......
  • 深入探讨API调用性能优化与错误处理
    随着互联网技术的不断发展,API(应用程序接口)已经成为软件系统中重要的组成部分。而优化API调用的性能以及处理错误和异常情况则是保障系统稳定性和可靠性的关键。本文将从以下几个方面来探讨如何进行性能优化和错误处理。一、优化API调用的性能1.使用合适的数据传输格式选择合适的数......
  • 100道Python练习题
    100道Python练习题,希望对你的学习有所帮助!编写一个程序,输入两个数并计算它们的和。编写一个程序,输入一个字符串,并倒序输出该字符串。编写一个程序,判断一个数是否为质数。编写一个程序,计算并输出斐波那契数列的前n项(n由用户输入)。编写一个程序,判断一个字符串是否为回文串。编写一个......
  • 深入探讨API调用性能优化与错误处理
    ​随着互联网技术的不断发展,API(应用程序接口)已经成为软件系统中重要的组成部分。而优化API调用的性能以及处理错误和异常情况则是保障系统稳定性和可靠性的关键。本文将从以下几个方面来探讨如何进行性能优化和错误处理。一、优化API调用的性能1.使用合适的数据传输格式选择......
  • python实现单例的几种方式
    单例模式单例模式(SingletonPattern)是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在。当你希望在整个系统中,某个类只能出现一个实例时,单例对象就能派上用场。比如,某个服务器程序的配置信息存放在一个文件中,客户端通过一个AppConfig的类来读取配置文......