首页 > 其他分享 >rabbitmq

rabbitmq

时间:2024-03-29 17:44:58浏览次数:35  
标签:pika exchange rabbitmq queue connection credentials channel

消息安全之durable持久化

# queue 持久化
# 消息 持久化

send

import pika

#  1 拿到链接
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True) # queue持久化
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World 2222',
                      properties=pika.BasicProperties(
                          delivery_mode=pika.DeliveryMode.Persistent  # 消息也持久化,即便断电---》它也不会丢失
                      )
                      )
print("Sent 'Hello World!'")
# 关闭
connection.close()

recieve

import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='hello12', durable=True)

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")
    # 消费呢,还没消费完--》程序崩了--->之前在消息队列中得消息---》没了

    # 真正消费完再确认
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 消费者监听:hello 这个queue,只要queue中有消息,就会触发on_message_callback对应的函数的执行
# auto_ack 自动确认,为了保证数据不丢失,只要拿到消息---》通知服务端,消息拿到了删除
channel.basic_consume(queue='hello2', on_message_callback=callback, auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 卡在这

闲置消费

# 默认情况下,如果多个消费者,消费消息,是依次按顺序消费,即便消费者很耗时,也是依次消费

# 改成:闲置消费
    -谁空闲,谁优先消费
    
    
# 在消费消息之前加
channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='hello3', on_message_callback=callback, auto_ack=False)

发布订阅

生产者

import pika

#  1 拿到链接
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
# 生产者 负责管理(绑定) 某个交换机---m1
channel.exchange_declare(exchange='m1',exchange_type='fanout')

channel.basic_publish(exchange='m1',
                      routing_key='',
                      body='Hello World 111',
                      properties=pika.BasicProperties(
                          delivery_mode=pika.DeliveryMode.Persistent  # 消息也持久化,即便断电---》它也不会丢失
                      )
                      )
print("Sent 'Hello World!'")
# 关闭
connection.close()

消费者

import pika
import time
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m1',exchange_type='fanout')


result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

发布订阅高级之Routing(按关键字匹配)

# 应用场景
    -很多很多系统,产生大量日志--》日志自己推到消息队列中
        error 时间 哪一行  错误信息
        info 时间 哪一行  错误信息
        info 时间 哪一行  错误信息
    -通过direct和routing_key
        -启动多个消费者
            -一个消费只只关注error级别的日志
            -后来又加新功能,加一个新消费者--》监听error和warn
    -日志分级通知

 

发送者

import pika

#  1 拿到链接
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
# 生产者 负责管理(绑定) 某个交换机---m1
channel.exchange_declare(exchange='m2',exchange_type='direct')

channel.basic_publish(exchange='m2',
                      routing_key='error', # routing_key 写个字符串
                      body='Hello World 999',
                      properties=pika.BasicProperties(
                          delivery_mode=pika.DeliveryMode.Persistent  # 消息也持久化,即便断电---》它也不会丢失
                      )
                      )
print("Sent 'Hello World!'")
# 关闭
connection.close()

订阅者

import pika
import time
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m2',exchange_type='direct')


result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='info')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅2

import pika
import time
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m2',exchange_type='direct')


result = channel.queue_declare(queue='',exclusive=True) # 不命名,会随机生成一个名字
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='info')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='error')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

发布订阅高级之Topic(按关键字模糊匹配)

只能加一个单词

# 可以加任意单词字符

 — — — —— — — — — — — — — — — — —   — —— — —  — — —— — — — — — — — — 

 

发送者

import pika

#  1 拿到链接
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
# 生产者 负责管理(绑定) 某个交换机---m1
channel.exchange_declare(exchange='m3',exchange_type='topic')

channel.basic_publish(exchange='m3',
                      routing_key='lqz.nb.xx',   # routing_key 写个字符串
                      body='Hello World 888',
                      properties=pika.BasicProperties(
                          delivery_mode=pika.DeliveryMode.Persistent  # 消息也持久化,即便断电---》它也不会丢失
                      )
                      )
print("Sent 'Hello World!'")
# 关闭
connection.close()

接受者

import pika
import time

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)  # 不命名,会随机生成一个名字
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3', queue=queue_name, routing_key='lqz.#')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

接受者2

import pika
import time

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)  # 不命名,会随机生成一个名字
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3', queue=queue_name, routing_key='lqz.*')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

rabbitmq的5种消息模模型

# 1 简单模式(基本消息模型)
简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

# 2 工作模式(work消息模型)
工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

# 3 发布/订阅模式(订阅模型-fanout)  fanout
发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。

#4  路由模式 (订阅模型-Direct:)direct
路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。

# 5 通配符模式 (订阅模型-Topic)topic
通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。

RPC介绍

互联软件架构的变迁

# 单体架构
    -所有功能集成到一个项目中---》大单体
    -单体架构---》集群化部署
# 集群机构
    -单体应用--》部署在多台机器上---》组成机器
    -nginx 转发
    
# 分布式架构
    -电商: 支付   物流   订单---》三个项目
    分布式服务顾名思义服务是分散部署在不同的机器上的,一个服务可能负责几个功能,是一种面向SOA架构的,服务之间也是通过rpc来交互或者是webservice来交互的
    
# SOA(分布式架构)



# 微服务架构 (分布式架构)

 

 

 

https://www.cnblogs.com/liuqingzheng/p/16271923.html
    
# RPC (Remote Procedure Call)是指远程过程调用,也就是说两台服务器 A,B 一个应用部署在 A 服务器上,想要调用 B 服务器上应用提供的函数或方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据

# 用来做服务间通信的
	-方式一: 使用 restful调用 同步调用
    -方式二:借助于消息队列 异步通信(微服务案例就是这种方案)
    -方式三:rpc通信:远程过程调用
    
    
# 为什么要用 RPC?
就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用


# 主流rpc框架
	grpc:跨语言---》python调用go服务   https://zhuanlan.zhihu.com/p/425725192
    			   go调用python服务
    dubbo:java用的多

  

功能HessianMontanrpcxgRPCThriftDubboDubboxSpring Cloud
开发语言 跨语言 Java Go 跨语言 跨语言 Java Java Java
分布式(服务治理) × × ×
多序列化框架支持 hessian √(支持Hessian2、Json,可扩展) × 只支持protobuf) ×(thrift格式)
多种注册中心 × × ×
管理中心 × × ×
跨编程语言 ×(支持php client和C server) × × × ×
支持REST × × × × × ×
关注度
上手难度
运维成本
开源机构 Caucho Weibo Apache Google Apache Alibaba Dangdang Apache

python实现rpc

 

# SimpleXMLRPCServer 自带的
# ZeroRPC-第三方

 

内置的

from xmlrpc.server import SimpleXMLRPCServer


# 通信使用xml格式
class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()

    def add(self, a, b):
        print('来了')
        return a + b


# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()

 

import time
from xmlrpc.client import ServerProxy


# SimpleXMLRPCServer
def xmlrpc_client():
    print('xmlrpc client')
    c = ServerProxy('http://localhost:4242')

    res = c.add(3, 4)
    print(res)


if __name__ == '__main__':
    xmlrpc_client()

zeroRpc

import zerorpc


class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)

    def add(self, a, b):
        print(a, b)
        return a + b + 10


# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()

 

import zerorpc
import time
# zerorpc
def zerorpc_client():
    print('zerorpc client')
    c = zerorpc.Client()
    c.connect('tcp://127.0.0.1:4243')
    res=c.add(2,3)
    print(res)


if __name__ == '__main__':
    zerorpc_client()

rabbitmq实现rpc

跨语言

 

import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101', credentials=credentials))
        self.channel = self.connection.channel()

        # 随机生成一个消息队列(用于接收结果)
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 客户端 给 服务端 发送一个任务:  任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',  # 服务端接收任务的队列名称
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,  # 用于接收结果的队列
                                       correlation_id=self.corr_id,  # 任务ID
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return self.response


fibonacci_rpc = FibonacciRpcClient()

response = fibonacci_rpc.call(9)
print('返回结果:', response)

 

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))
channel = connection.channel()

# 声明一个队列rpc_queue
channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

 

微服务项目

# flask 项目 主站接口
    #表结构
        - Product  商品(图片)表
                id = db.Column(db.Integer, primary_key=True, autoincrement=False)
                title = db.Column(db.String(200))
                image = db.Column(db.String(200))
        - UserLike 用户收藏表
                id = db.Column(db.Integer, primary_key=True)
                user_id = db.Column(db.Integer)
                product_id = db.Column(db.Integer)
    
       #接口
        -查询所有商品接口
        -收藏接口
    
    #运行,创建库,迁移表,邮件启动(注意修改 rabbitmq地址)
 


#django项目
    # 表 
    class Product(models.Model):
        title = models.CharField(max_length=200)
        image = models.CharField(max_length=200)
        likes = models.PositiveIntegerField(default=0)
    class User(models.Model):
        pass
    
    # 接口
        -商品 的增删查改5个接口
        -随机获取一个用户id接口
        
        
        
 2,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,0
3,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,1
4,图片1,http://img.crcz.com/allimg/202003/19/1584589085800735.jpg,0



# 测试:
    1 访问增加商品接口

 

标签:pika,exchange,rabbitmq,queue,connection,credentials,channel
From: https://www.cnblogs.com/wzh366/p/18104313

相关文章

  • RabbitMQ的部分模式
    1发布订阅模式发送者packageorg.example;importcom.alibaba.fastjson.JSON;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava......
  • 深入了解RabbitMQ:构建高效的消息队列系统(三)
    本系列文章简介:        本系列文章将深入了解RabbitMQ的工作原理、特性和最佳实践。我们将介绍如何安装和配置RabbitMQ服务器,以及如何开发生产者和消费者应用程序。我们还将探讨如何处理消息的持久化、消息路由和消息过滤。除此之外,我们将研究如何使用RabbitMQ进行负......
  • 深入了解RabbitMQ:构建高效的消息队列系统(二)
    本系列文章简介:        本系列文章将深入了解RabbitMQ的工作原理、特性和最佳实践。我们将介绍如何安装和配置RabbitMQ服务器,以及如何开发生产者和消费者应用程序。我们还将探讨如何处理消息的持久化、消息路由和消息过滤。除此之外,我们将研究如何使用RabbitMQ进行负......
  • RabbitMQ3.x之四_RabbitMQ角色说明及创建用户与授权
    RabbitMQ3.x之四_角色说明及创建用户与授权文章目录RabbitMQ3.x之四_角色说明及创建用户与授权1.访问和授权1.Tags说明2.命令行示例2.管理界面新建用户及访问授权1.管理界面新建用户2.管理界面中的授权说明3.guest用户不能远程登录提示3.创建用户1.基本命令2......
  • 2、RabbitMQ、Erlang
    1、RabbitMQ是一个实现了高级消息队列协议(AMQP)的开源消息代理软件,也被称为面向消息的中间件。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移则是构建在开放电信平台框架上的。RabbitMQ具有以下特点:可扩展性:RabbitMQ可以通过添加更多的节点和队列来实现水平扩展,从而提高......
  • rabbitmq安装使用问题汇总
    附上别的同学的安装教程:win10安装rabbitMQ详细步骤_rabbitmq安装-CSDN博客安装rabbitmq之前是需要先安装erlang环境的,否则的话是无法安装的,还有一点就是安装的版本要对应上,否则也是会存在问题的*重点:rabbitmq安装的时候要选择管理员权限安装,否则可能出现其他问题,后面还是要重......
  • RabbitMQ工厂虚拟机集群可靠性测试报告
    高可用集群架构节点域名操作系统RabbitMQ版本Erlang版本rabbitmq1.mfg.tp-link.comCentos7.93.8.2823.3-2rabbitmq2.mfg.tp-link.comCentos7.93.8.2823.3-2rabbitmq3.mfg.tp-link.comCentos7.93.8.2823.3-2目前Centos7.9通过直接RPM包部署安装的版......
  • RabbitMQ3.x之一_WindowServer2019中安装RabbitMQ详细教程
    RabbitMQ3.x之一_WindowServer2019中安装RabbitMQ详细教程文章目录RabbitMQ3.x之一_WindowServer2019中安装RabbitMQ详细教程1.安装环境说明1.WindowServer20192.ErLang与RabbitMQ对应版本2安装Erlang1.安装Erlang2.ErLnag环境变量配置3.查看是否安装成功3.安......
  • org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.Authen
    出现问题org.springframework.amqp.AmqpAuthenticationException:com.rabbitmq.client.AuthenticationFailureException:ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAIN.Fordetailsseethebrokerlogfile.解决问题创建一个超级用户//添......
  • rabbitmq
    composerrequirephp-amqplib/php-amqplib:2.6.1在界面上设置好交换机、队列、绑定代码上只需要插入和读取 设置交换机 设置队列 队列绑定交换机  ......