首页 > 其他分享 >消息队列 RabbitMQ

消息队列 RabbitMQ

时间:2023-09-12 15:38:37浏览次数:44  
标签:交换器 false channel 队列 RabbitMQ 消息 路由

RabbitMQ 内部结构

  1. 发布者:生产者,消息的发送方。
  2. 连接:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:交换器(路由器),负责消息的路由到相应队列。类型:direct、fanout、topic
  5. Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
  6. Queue:队列,消息的缓冲存储区。
  7. Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
  8. Broker:消息队列的服务器实体。
  9. 消费者:消费者,消息的接收方。

一、安装

1.安装Erlang运行环境    坑:安装完要记得重启
2.安装RabbitMQ

二、消息模型

大致可以分为3类:

  • 基本消费模型(1)
  • Work消息模型(2)
  • 订阅模型(3,4,5)

第6个属于RPC暂时不管。


三、基本消费模型 -邮局

python-one.png
1、生产者 Sending

sending.png

using RabbitMQ.Client;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using(var connection = factory.CreateConnection())
{
    //创建通道
    using(var channel = connection.CreateModel())
    {
        //声明一个队列
        channel.QueueDeclare("hello1",false,false,false,null);

        //如果未声明交换机,则队列会自动绑定到默认direct类型的交换器,并以队列的形式作为路由器

        Console.WriteLine("\nRabbitMQ 连接成功,请输入消息,输入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            //构建byte消息数据包
            var sendBytes =Encoding.UTF8.GetBytes(input);

            //发布消息
            channel.BasicPublish("", "hello1", null, sendBytes);
            //exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 Rabbitmo默认的交换器中
            //routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
            // basicProperties: 消息的本国性集,其包14个属性成员,分别有contentlype contentEnoding, headers (Map & lt; String, Object & gt; ) 、deliveryMode、 priority、correlationId、replyTo、 expiration, messageId, timestamp、 type、 userId, appId、 clusterId.
            //byte[] body: 消体 (payload) ,真正需要发送的消息
            // landatory: 没为true时,如果exchange根据自身类型和消息routeke无一符合条的quele,会调用basic.return方法将消息返还给生产者:设为false时,出现上述情形broker会直接将消息扔掉。

        } while (input.Trim().ToLower()!="exit");

    }
}


2、消费者 Receiving
receiving.png
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection =factory.CreateConnection())
{
    //创建通道
    using (var channel=connection.CreateModel())
    {
        //事件基本消费者
        EventingBasicConsumer consumer=new EventingBasicConsumer(channel);

        //接收到消息事件  +=附加到事件上
        consumer.Received += (ch, ea) =>
        {
            var message=Encoding.UTF8.GetString(ea.Body.ToArray());
            Console.WriteLine($"收到消息:{message}");

            //确认该消息已被消费
            channel.BasicAck(ea.DeliveryTag, false);
        };

        //启动消费者,设置为手动应答消息
        channel.BasicConsume("hello1", false, consumer);
        Console.WriteLine("消费者1已启动");
        Console.ReadLine();
    }
}


四、Wock消费模型-工作队列 (循环调度)

python-two.png此消息模型的好处在于如果你积压了任务,我们只需要添加更多的工作者就可以了。默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环(循环调度)
1、再增加一个消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection = factory.CreateConnection())
{
    //创建通道
    using (var channel = connection.CreateModel())
    {
        //事件基本消费者
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

        //接收到消息事件  +=附加到事件上
        consumer.Received += (ch, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            Console.WriteLine($"收到消息:{message}");

            //确认该消息已被消费
            channel.BasicAck(ea.DeliveryTag, false);
        };

        //启动消费者,设置为手动应答消息
        channel.BasicConsume("hello1", false, consumer);
        Console.WriteLine("消费者2已启动");
        Console.ReadLine();
    }
}


会出现各种情况:(1)工作者完成一项任务可能需要几秒钟。如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。
(2)如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失)RabbitMQ将会做什么处理。
(3)当RabbitMQ遇到异常情况退出或崩溃时,我们的任务会怎么办,只能丢失吗?在重新启动RabbitMQ的时候能重新找回来吗?
(4)当我们有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。这种情况要怎么处理呢。
RabbitMQ都有解决方案
(1、2)消息确认
(3)消息持久性
(4)公平调度

五、订阅模型(三类)


exchanges.png
RabbitMQ 中消息传递模型的核心思想是生产者向交换器发送消息,从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。而交换器,一方面它接收来自生产者的消息,另一方面它将它们推送到队列中,交换必须确切地知道如何处理它收到的消息。

RabbitMQ 给我们提供了3种常用的交换器类型:
direct:定向,把消息交给符合指定routing key 的队列
topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
fanout:广播,将消息交给所有绑定到交换机的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!


1、Direct 定向

(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)direct-exchange.png使用路由键橙色发布到交换的消息 将被路由到队列Q1。带有黑色 或绿色路由键的消息将发送到Q2。所有其他消息将被丢弃。代码:
using RabbitMQ.Client;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection = factory.CreateConnection())
{
    //创建通道
    using (var channel = connection.CreateModel())
    {
        var exchangeName = "direct_exchange";

        #region QueueDeclare方法详解
        //queue:队列的名称。
        //durable:设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
        //exclusive:设置是否排他。为true则设置队列为排他的。
        //autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
        //arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、 x - max - length, x - max - length - bytes, x - dead - letter -exchange, x - dead - letter - routing - key、x - max - priority等。

        //声明一个队列hello1
        channel.QueueDeclare("hello1", false, false, false, null);
        //声明一个队列hello2
        channel.QueueDeclare("hello2", false, false, false, null);
        #endregion 

        #region ExchangeDeclare方法详解
        //ExchangeDeclare方法详解
        //exchange:交换器的名称
        //type:交换器的类型,常见的如fanout、direct、topic.
        //durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
        //autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑才会删除。
        //internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
        //argument:其他一些结构化参数
        #endregion

        //创建一个非持久化的,非自动删除的,绑定类型为direct的交换器
        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

        //将队列与交换机绑定
        channel.QueueBind("hello1", exchangeName, "orange");//使用路由键(orange)将hello1队列和交换器绑定
        channel.QueueBind("hello2", exchangeName, "green");//使用路由键(green)将hello2队列和交换器绑定
        channel.QueueBind("hello2", exchangeName, "black");//使用路由键(black)将hello2队列和交换器绑定

        Console.WriteLine("\nRabbitMQ 连接成功,请输入消息,输入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            //构建byte消息数据包
            var sendBytes = Encoding.UTF8.GetBytes(input);

            //发布消息
            channel.BasicPublish(exchangeName, "orange", null, sendBytes);

            #region BasicPublish方法详解
            //exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 Rabbitmo默认的交换器中
            //routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
            // basicProperties: 消息的本国性集,其包14个属性成员,分别有contentlype contentEnoding, headers (Map & lt; String, Object & gt; ) 、deliveryMode、 priority、correlationId、replyTo、 expiration, messageId, timestamp、 type、 userId, appId、 clusterId.
            //byte[] body: 消体 (payload) ,真正需要发送的消息
            // landatory: 没为true时,如果exchange根据自身类型和消息routeke无一符合条的quele,会调用basic.return方法将消息返还给生产者:设为false时,出现上述情形broker会直接将消息扔掉。
            #endregion

        } while (input.Trim().ToLower() != "exit");

    }
}

我们启动两个消费者"hello1"与"hello2" 然后同时启动生产者
(1)我们先使用路由键orange发布消息
            //发布消息
            channel.BasicPublish(exchangeName, "orange", null, sendBytes);
由下图 看出 只向队列hello1 发送
(2)我们再使用路由键green发布消息
        //发布消息
      channel.BasicPublish(exchangeName, "green", null, sendBytes);
由下图 看出 只向队列hello2 发送
(3)我们再使用路由键black发布消息
            //发布消息
            channel.BasicPublish(exchangeName, "black", null, sendBytes);
由下图 看出 也只向队列hello2 发送

2、Fanout 广播

(消息广播,将消息分发到exchange上绑定的所有队列上)image 47.pngfanout的路由机制(广播)如下图,即发送到 fanout 类型exchange的消息都会分发到所有绑定该exchange的队列上去。
由下图可以看出 消费者都会拿到数据

3、Topic 通配符

(模式匹配的路由规则:支持通配符)
python-five.png

Topic Exchange使用起来非常灵活,它可以通过使用通配符(*与#)来进行模糊匹配(跟我们Api中的模糊查询类似),所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上
看一下匹配规则:

  • "*" 匹配一个单词
  • "#" 匹配一个或多个单词

举个简单的例子:
"mamba.*" 只可以匹配到"mamba.rabbit" 这个格式的routingkey
"mamba.#" 则可以匹配到"mamba.male.rabbit" 这个格式的routingkey


自己理解 
生产者:1、创建连接工厂2、创建连接3、创建通道4、创建交换器5、将队列与交换机绑定6、发布消息
消费者:1、创建连接工厂2、创建连接3、创建通道4、事件基本消费者5、接收到消费事件6、确定消息已被消费7、启动消费者,设置为手动应答消息







标签:交换器,false,channel,队列,RabbitMQ,消息,路由
From: https://www.cnblogs.com/buzheng11/p/17696310.html

相关文章

  • RabbitMQ - Exception (504) Reason: "channel id space exhausted"
    使用go的第三方包:github.com/rabbitmq/amqp091-go出现报错:getmqchannelerror{"error":"Exception(504)Reason:channelidspaceexhausted"}ctx:=context.Background()results,err:=global.Redis.LRange(ctx,abListName,0,-1).Result()......
  • 如何在Linux上设置高可用的消息队列
    消息队列是现代分布式系统中常用的一种通信方式,它可以在多个进程或者多台服务器之间传递数据,实现解耦和异步通信的目的。在Linux系统上,我们可以通过一些开源的消息队列软件来搭建高可用的消息队列系统。本文将以RabbitMQ为例,介绍如何在Linux上搭建和配置高可用的消息队列。步骤一:安......
  • rabbitmq详细实例
    1.概述RabbitMQ是由LShift提供的一个AdvancedMessageQueuingProtocol(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成,因此也是继承了这些优点。FROM《维基百科——RabbitMQ》Rabbit科技有限公司开发了RabbitMQ,并提供对其的支持。起初,Rabbit......
  • 【设计模式】观察者模式Observer:消息的订阅-发布
    (目录)观察者模式是一种非常流行的设计模式,也常被叫作订阅-发布模式。观察者模式在现代的软件开发中应用非常广泛,比如,商品系统、物流系统、监控系统、运营数据分析系统等。常说的基于事件驱动的架构,其实也是观察者模式的一种最佳实践。当观察某一个对象时,对象传递出的每一个行......
  • 【设计模式】备忘录模式Memento - 在聊天会话中记录历史消息
    (目录)相较于其他设计模式,备忘录模式不算太常用,但好在这个模式理解、掌握起来并不难,代码实现也比较简单,应用场景更是比较明确和有限。一般应用于编辑器或会话上下文中防丢失、撤销、恢复等场景中。模式原理分析备忘录模式的原始定义是:捕获并外部化对象的内部状态,以便以后可......
  • C++ 优先队列 priority_queue
    既然是队列那么先要包含头文件#include<queue>,他和queue不同的就在于我们可以自定义其中数据的优先级,让优先级高的排在队列前面,优先出队优先队列具有队列的所有特性,包括基本操作,只是在这基础上添加了内部的一个排序,它本质是一个堆实现的和队列基本操作相同:top访问队头......
  • 队列 queue
         双端队列deque1.双端队列知识需知由于队列是一种先进先出(FIFO)的数据结构,因此无法直接从队列的底部删除元素。如果希望从队列的底部删除元素,可以考虑使用双端队列(deque)。双端队列(deque)是一种允许在两端插入和删除元素的数据结构。使用push_back()和push_fron......
  • 仲裁队列
          ......
  • 关于Android开发中推送消息通知推送消息到前台的问题
    为什么推送点击发送按钮之后没有消息进行发送?(以简单音乐播放器为例)1.前景创建了一个Activity:ForegroundServiceActivity.java//代码如下:packagecom.app.custom.demo01;importandroidx.appcompat.app.AppCompatActivity;importandroid.content.Intent;importandr......
  • TTL机制实现延迟消息
              ......