connection
connection = BlockingConnection(ConnectionParameters(host='xxx', port=xxx,
credentials=PlainCredentials(username='xxx', password='xxx')))
channel
- channel = connection.channel()
- 生产和消费动作基本上是通过channel进行操作
- channel.exchange_declare
- channel.queue_declare
- channel.basic_publish
- channel.queue_bind
- channel.basic_qos
- channel.basic_sonsume
- channel.start_consuming
- channel.basic_ack
- channel.basic_rejec
- channel.basic_nack
publisher
- channel.basic_publish(exchange='xxx', routing_key='xxx')
- '': 没有指定exchange时,走的是默认的exchange,会绑定所有的queue,routing_key为queue的名称
- channel.exchange_declare(exhange='xxx', exchange_type='xxx')
exchange
- exchange_type
- fanout
- 发布的消息都会被传到exchange绑定的每一个queue上,忽略routing_key
- direct
- exhange根据消息的routing_key,把消息传到绑定了对应routing_key的queue中
- 不同queue可以绑定同一个routing_key, 这样可以实现fanout效果
- topic
- queue绑定的routing_key可以包含多个词,词之间用.间隔
- 可以使用#和,#表示0个或多个词,表示正好1个词
- exchange会把消息的routing_key与各queue中的routing_key进行匹配,只把消息传递到匹配上的queue
- fanout
queue
- result = channel.queue_declare(queue_name, exclusive=True, arguments={xxx})
- 如果queue的名称是空字符串'',则会自动生成一个名称,可以通过result.method.queue获取
- exclusive: connection关闭后,queue会自动删除
- arguments:
- 'x-single-active_consumer': True 单活客户端,即只有一个活跃的客户端可以消费queue中的消息
- channel.queue_bind(exchange='xxx', queue='xxx', routing_key='xxx')
- 如果routing_key为None,则绑定的routing_key就是queue的名称
- channel.start_consuming 启动消费
consumer
- channel.basic_qos(prefetch_count=1)
- prefetch_count: 指定queue中最多有多少个没有ack的消息,如果达到了这个数量,就不在投递消息到consumer
- channel.basic_consume(queue='queue_name', on_message_callback=callback_func, auto_ack=False)
- queue: 指定消费的queue
- on_message_callback: 指定消费消息的函数