一、生产者与消费者模式之 简单模式,原理图
二、生产者产生消息
import json import pika import datetime # 生产者 producer.py def get_message(): # 产生消息入口处 for i in range(100): # 生成10条消息 message = json.dumps({'id': "10000%s" % i, "amount": 100 * i, "name": "melon", "createtime": str(datetime.datetime.now())}) producer(message) print('i',i) def producer(message): # 获取与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个 # 报错StreamLostError: ('Transport indicated EOF',) 是因为将端口 5672 写成 15672 connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo',host='82.156.19.94', port=5672, credentials=pika.PlainCredentials('guest', 'guest'))) # 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱 channel = connection.channel() # 声明消息队列melon.msg.demo.queue ,消息将在这个队列传递,如不存在,则创建 channel.queue_declare(queue='melon.msg.demo.queue') # 向队列插入数值 routing_key的队列名为melon.msg.demo.queue,body 就是放入的消息内容,exchange指定消息在哪个队列传递, # 这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串exchange(默认的exchange) # exchange指定四种模式:direct(默认),fanout, topic, 和headers channel.basic_publish(exchange='', routing_key='melon.msg.demo.queue', body=message) # 关闭连接 connection.close() if __name__ == "__main__": get_message() # 程序执行入口
三、消费者消费消息
#!/usr/bin/env python # -*- coding:utf-8 -*- # 消费者 receive.py import time import pika # 建立与rabbitmq的连接 credentials = pika.PlainCredentials("guest","guest") connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo',host='82.156.19.94', port=5672,credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="melon.msg.demo.queue") def callback(ch,method,properties,body): print('body',body) time.sleep(100) print("消费者接收到了任务:%r"%body.decode("utf8")) # 有消息来临,立即执行callback,没有消息则夯住,等待消息 # 老百姓开始去邮箱取邮件啦,队列名字是水许传 # def basic_consume(self, # queue, # on_message_callback, # auto_ack=False, # exclusive=False, # consumer_tag=None, # arguments=None): # 这个参数的调用有所改动 # 第一个参数是队列 # 第二个是回调函数 # 第三个这是auto_ack=True 使用自动确认模式 channel.basic_consume("melon.msg.demo.queue",callback,True) # 开始消费,接收消息 channel.start_consuming()
标签:队列,pika,RBMQ,python,demo,queue,案例,melon,channel From: https://www.cnblogs.com/1314520xh/p/17093465.html