消息安全之durable持久化
# queue 持久化
# 消息 持久化
send
import pika # 1 拿到链接 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello2', durable=True) # queue持久化 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World 2222', properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent # 消息也持久化,即便断电---》它也不会丢失 ) ) print("Sent 'Hello World!'") # 关闭 connection.close()
recieve
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello12', durable=True) def callback(ch, method, properties, body): print(f" [x] Received {body}") # 消费呢,还没消费完--》程序崩了--->之前在消息队列中得消息---》没了 # 真正消费完再确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 消费者监听:hello 这个queue,只要queue中有消息,就会触发on_message_callback对应的函数的执行 # auto_ack 自动确认,为了保证数据不丢失,只要拿到消息---》通知服务端,消息拿到了删除 channel.basic_consume(queue='hello2', on_message_callback=callback, auto_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 卡在这
闲置消费
# 默认情况下,如果多个消费者,消费消息,是依次按顺序消费,即便消费者很耗时,也是依次消费 # 改成:闲置消费 -谁空闲,谁优先消费 # 在消费消息之前加 channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来 channel.basic_consume(queue='hello3', on_message_callback=callback, auto_ack=False)
发布订阅
生产者
import pika # 1 拿到链接 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() # 生产者 负责管理(绑定) 某个交换机---m1 channel.exchange_declare(exchange='m1',exchange_type='fanout') channel.basic_publish(exchange='m1', routing_key='', body='Hello World 111', properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent # 消息也持久化,即便断电---》它也不会丢失 ) ) print("Sent 'Hello World!'") # 关闭 connection.close()
消费者
import pika import time credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m1',exchange_type='fanout') result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字 queue_name = result.method.queue print(queue_name) # 让exchange和queque进行绑定. channel.queue_bind(exchange='m1',queue=queue_name) def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True) channel.start_consuming()
发布订阅高级之Routing(按关键字匹配)
# 应用场景 -很多很多系统,产生大量日志--》日志自己推到消息队列中 error 时间 哪一行 错误信息 info 时间 哪一行 错误信息 info 时间 哪一行 错误信息 -通过direct和routing_key -启动多个消费者 -一个消费只只关注error级别的日志 -后来又加新功能,加一个新消费者--》监听error和warn -日志分级通知
发送者
import pika # 1 拿到链接 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() # 生产者 负责管理(绑定) 某个交换机---m1 channel.exchange_declare(exchange='m2',exchange_type='direct') channel.basic_publish(exchange='m2', routing_key='error', # routing_key 写个字符串 body='Hello World 999', properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent # 消息也持久化,即便断电---》它也不会丢失 ) ) print("Sent 'Hello World!'") # 关闭 connection.close()
订阅者
import pika import time credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m2',exchange_type='direct') result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字 queue_name = result.method.queue print(queue_name) # 让exchange和queque进行绑定. channel.queue_bind(exchange='m2',queue=queue_name,routing_key='info') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True) channel.start_consuming()
订阅2
import pika import time credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m2',exchange_type='direct') result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字 queue_name = result.method.queue print(queue_name) # 让exchange和queque进行绑定. channel.queue_bind(exchange='m2',queue=queue_name,routing_key='info') channel.queue_bind(exchange='m2',queue=queue_name,routing_key='error') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True) channel.start_consuming()
发布订阅高级之Topic(按关键字模糊匹配)
只能加一个单词
# 可以加任意单词字符
— — — —— — — — — — — — — — — — — — —— — — — — —— — — — — — — — —
发送者
import pika # 1 拿到链接 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() # 生产者 负责管理(绑定) 某个交换机---m1 channel.exchange_declare(exchange='m3',exchange_type='topic') channel.basic_publish(exchange='m3', routing_key='lqz.nb.xx', # routing_key 写个字符串 body='Hello World 888', properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent # 消息也持久化,即便断电---》它也不会丢失 ) ) print("Sent 'Hello World!'") # 关闭 connection.close()
接受者
import pika import time credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) # 不命名,会随机生成一个名字 queue_name = result.method.queue print(queue_name) # 让exchange和queque进行绑定. channel.queue_bind(exchange='m3', queue=queue_name, routing_key='lqz.#') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
接受者2
import pika import time credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) # 不命名,会随机生成一个名字 queue_name = result.method.queue print(queue_name) # 让exchange和queque进行绑定. channel.queue_bind(exchange='m3', queue=queue_name, routing_key='lqz.*') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
rabbitmq的5种消息模模型
# 1 简单模式(基本消息模型) 简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。 # 2 工作模式(work消息模型) 工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。 # 3 发布/订阅模式(订阅模型-fanout) fanout 发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。 #4 路由模式 (订阅模型-Direct:)direct 路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。 # 5 通配符模式 (订阅模型-Topic)topic 通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。
RPC介绍
互联软件架构的变迁
# 单体架构 -所有功能集成到一个项目中---》大单体 -单体架构---》集群化部署 # 集群机构 -单体应用--》部署在多台机器上---》组成机器 -nginx 转发 # 分布式架构 -电商: 支付 物流 订单---》三个项目 分布式服务顾名思义服务是分散部署在不同的机器上的,一个服务可能负责几个功能,是一种面向SOA架构的,服务之间也是通过rpc来交互或者是webservice来交互的 # SOA(分布式架构) # 微服务架构 (分布式架构)
https://www.cnblogs.com/liuqingzheng/p/16271923.html # RPC (Remote Procedure Call)是指远程过程调用,也就是说两台服务器 A,B 一个应用部署在 A 服务器上,想要调用 B 服务器上应用提供的函数或方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据 # 用来做服务间通信的 -方式一: 使用 restful调用 同步调用 -方式二:借助于消息队列 异步通信(微服务案例就是这种方案) -方式三:rpc通信:远程过程调用 # 为什么要用 RPC? 就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用 # 主流rpc框架 grpc:跨语言---》python调用go服务 https://zhuanlan.zhihu.com/p/425725192 go调用python服务 dubbo:java用的多
功能 | Hessian | Montan | rpcx | gRPC | Thrift | Dubbo | Dubbox | Spring Cloud |
---|---|---|---|---|---|---|---|---|
开发语言 | 跨语言 | Java | Go | 跨语言 | 跨语言 | Java | Java | Java |
分布式(服务治理) | × | √ | √ | × | × | √ | √ | √ |
多序列化框架支持 | hessian | √(支持Hessian2、Json,可扩展) | √ | × 只支持protobuf) | ×(thrift格式) | √ | √ | √ |
多种注册中心 | × | √ | √ | × | × | √ | √ | √ |
管理中心 | × | √ | √ | × | × | √ | √ | √ |
跨编程语言 | √ | ×(支持php client和C server) | × | √ | √ | × | × | × |
支持REST | × | × | × | × | × | × | √ | √ |
关注度 | 低 | 中 | 低 | 中 | 中 | 中 | 高 | 中 |
上手难度 | 低 | 低 | 中 | 中 | 中 | 低 | 低 | 中 |
运维成本 | 低 | 中 | 中 | 中 | 低 | 中 | 中 | 中 |
开源机构 | Caucho | Apache | Apache | Alibaba | Dangdang | Apache |
python实现rpc
# SimpleXMLRPCServer 自带的 # ZeroRPC-第三方
内置的
from xmlrpc.server import SimpleXMLRPCServer # 通信使用xml格式 class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() def add(self, a, b): print('来了') return a + b # SimpleXMLRPCServer server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True) server.register_introspection_functions() server.register_instance(RPCServer()) server.serve_forever()
import time from xmlrpc.client import ServerProxy # SimpleXMLRPCServer def xmlrpc_client(): print('xmlrpc client') c = ServerProxy('http://localhost:4242') res = c.add(3, 4) print(res) if __name__ == '__main__': xmlrpc_client()
zeroRpc
import zerorpc class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) def add(self, a, b): print(a, b) return a + b + 10 # zerorpc s = zerorpc.Server(RPCServer()) s.bind('tcp://0.0.0.0:4243') s.run()
import zerorpc import time # zerorpc def zerorpc_client(): print('zerorpc client') c = zerorpc.Client() c.connect('tcp://127.0.0.1:4243') res=c.add(2,3) print(res) if __name__ == '__main__': zerorpc_client()
rabbitmq实现rpc
跨语言
import pika import uuid class FibonacciRpcClient(object): def __init__(self): credentials = pika.PlainCredentials("admin", "admin") self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', credentials=credentials)) self.channel = self.connection.channel() # 随机生成一个消息队列(用于接收结果) result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response) self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) # 客户端 给 服务端 发送一个任务: 任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称 self.channel.basic_publish(exchange='', routing_key='rpc_queue', # 服务端接收任务的队列名称 properties=pika.BasicProperties( reply_to=self.callback_queue, # 用于接收结果的队列 correlation_id=self.corr_id, # 任务ID ), body=str(n)) while self.response is None: self.connection.process_data_events() return self.response fibonacci_rpc = FibonacciRpcClient() response = fibonacci_rpc.call(9) print('返回结果:', response)
import pika credentials = pika.PlainCredentials("admin","admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials)) channel = connection.channel() # 声明一个队列rpc_queue channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
微服务项目
# flask 项目 主站接口 #表结构 - Product 商品(图片)表 id = db.Column(db.Integer, primary_key=True, autoincrement=False) title = db.Column(db.String(200)) image = db.Column(db.String(200)) - UserLike 用户收藏表 id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer) product_id = db.Column(db.Integer) #接口 -查询所有商品接口 -收藏接口 #运行,创建库,迁移表,邮件启动(注意修改 rabbitmq地址) #django项目 # 表 class Product(models.Model): title = models.CharField(max_length=200) image = models.CharField(max_length=200) likes = models.PositiveIntegerField(default=0) class User(models.Model): pass # 接口 -商品 的增删查改5个接口 -随机获取一个用户id接口 2,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,0 3,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,1 4,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,0 # 测试: 1 访问增加商品接口
标签:pika,exchange,rabbitmq,queue,connection,credentials,channel From: https://www.cnblogs.com/wzh366/p/18104313