首页 > 其他分享 >RabbitMQ入门

RabbitMQ入门

时间:2023-08-17 16:45:13浏览次数:36  
标签:Console 入门 队列 RabbitMQ 交换机 消息 var channel

1 简介

​ RabbitMQ 是采用 erlang 语言实现 AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

​ RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用 RabbitMQ 凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐。

RabbitMQ的具体特点可以概括为以下几点。

  • 可靠性:RabbitMQ使用一些机制来保证可靠性。如持久化、传输确认及发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换机来路由消息。对于典型的路由功能,提供了一些内置的交换机来实现。针对更复杂的路由功能,可以将多个交换机绑定在一起,可以通过插件机制来实现自己的交换机。
  • 扩展性:多个MQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展
    集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队
    列仍然可用。
  • 多种协议:除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息
    中间件协议。
  • 多语言客户端:几乎支持所有常用语言,比如C#、Java、Python、Ruby、PHP、JavaScript等。
  • 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集
    群中的节点等。
  • 插件机制: RabbitMQ 提供了许多插件 以实现从多方面进行扩展,当然也可以编写自
    己的插件。

依赖

  • 本文基于发稿时RabbitMQ的最新版本:3.8.19。
  • RabbitMQ客户端使用:RabbitMQ.Client 6.2.2
  • RabbitMQ可视化管理插件安装:官网。首先执行rabbitmq-plugins enable rabbitmq_management命令,然后打开管理面板:http://localhost:15672/#/ 即可,默认用户名密码都是guest。

2 相关概念

2.1 消息中间件

​ 消息 (Message):是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON 等,也可以很复杂,比如内嵌对象。

​ 消息队列中间件 (Message Queue Middleware,简称为 MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传和消息排队模型,它可以在分布式环境下扩展进程间的通信。它一般有两种传递模式:点对点(P2P, Point-to-Point) 模式和发布/订阅(Pub/Sub) 模式。

2.2 消息中间件的作用

  • 解耦: 最大的作用其实是解耦。

  • 冗余存储:有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息己经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。

  • 扩展性: 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。

  • 削峰: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流 并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯。

  • 可恢复性: 当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。

  • 顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。

  • 缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。

  • 异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,允许应用把 些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

2.3 RabbitMQ中的一些概念

​ RabbitMQ的整体模型架构如下:

image-20230817154555150

  • Producer:生产者,用来生产消息。并把消息发给交换机(生产者不会把消息直接发给某个队列,很多图你可能会看到生产者直连队列,其实中间隐藏了一个默认的交换机)。生产者也就是发送消息的一方。

  • Consumer:消费者,用来消费队列里的消息。也就是接受消息的一方。

  • Exchange:交换机,有些文章会成为交换器。其实这个东西的作用更像是路由器。交换机会根据生产者发过来的消息的routingKey,把消息丢到不同的队列中。

  • Queue:队列,用来存储交换机丢过来的消息(可以理解为邮箱)。一个队列可以被多个消费者进行消费,此时队列里的消息会按照轮询的方式一个个的分配给下面的消费者(不支持队列层面的广播消费)。

image-20230817154820473

  • channel: 通道,RabbitMQ 处理的每条 AMQP 指令都是通过通道完成的。如下图所示。通道的存在其实就是为了复用TCP连接,本质上我们也可以使用TCP连接发送命令。但是当应用中有多个线程需要生产或者消费时,就需要创建多个TCP连接,而TCP连接的创建和销毁很费资源。

image-20230817154902584

  • routingKey:路由键,交换机根据这个的值来决定把消息丢到哪个队列里,没有队列可以接受的话,可能把消息返回给生产者也可能直接丢弃。

  • Broker:RabbitMQ的服务节点或服务实例。可以简单里的理解为就是一台RabbitMQ服务器。

  • Binding:绑定,消费者端就行配置,建立队列与某个交换机的关系,这样交换机收到消息之后就知道是否要投递到这个队列了。

2.4 RabbitMQ模型

可以看到官网的教程里有六种模型:

image-20230817161141561

看起来很多很唬人,但是不要怕,本质上也就以下两种,学起来也很快。

  • 点对点:前两种就是属于点对点模型,即队列里的一个消息只能被一个消费者消费。第二种是对第一种的扩充,额外增加了一个消费者而已。多个消费者就是采用轮询的机制去消费同一个队列里的消息。
  • 发布订阅:剩下的4种都是发布订阅模型,即生产者发布的一个消息可以被N个消费者消费,实现方式是通过交换机把同一个消息投递到了N个队列里。4、5、6都是对3的功能扩充,让你有更大的自由度来决定一个消息能投递到哪个队列里。

3 ※点对点模型

队列的一个消息只能被一个消费者消费,多个消费者可以通过轮询的方式消费也可以通过手动响应ack的方式竞争消费。

3.1 轮询消费(自动ack)

当开启一个消费者实例时模型如下:

image-20230817161319188

当开启两个消费者实例时模型如下:

image-20230817161335012

//消费者示例:
//channel.BasicConsume的第二个参数为true,表示当消费者收到消息后(非消息的业务逻辑处理完后),会自动发送一个ack给mq表示消息已收到。
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, args) =>
            {
                byte[] body = args.Body.ToArray();
                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", msg);
            };
            //第二个参数autoAck为true
            channel.BasicConsume("hello", true, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
        }
    }

}


//生产者示例:
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            //声明队列操作是幂等的,当队列不存在时,会进行创建。
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            Console.WriteLine("请输入要发送的消息内容:");
            string msg = null;
            while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
            {
                var body = Encoding.UTF8.GetBytes(msg);
                //body是byte类型,使用了一个名为“”的默认交换机
                channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

                Console.WriteLine("[x] Sent {0}", msg);
            }

        }
    }

    Console.WriteLine("Press [Enter] to exit");
    Console.Read();
}

当开启多个消费者之后,默认会轮询消费。

3.2 ※手动发送ACK与数据持久存储

​ 当消费者消费完成一个消息之后,手动发送一条ack命令给broker。解决consumer突然死掉之后,导致消息丢失的问题。如果mq一直没收到ack,则会将此消息重新入队列,给其他消费者进行消费。

生产者示例:

  • task_queue的durable设置为true,这样及时Broker重启,此队列也不会消失。

  • 将消息的Persistent也设置为true。即消息也持久存储,但是并不代表消息会100%不丢失,它只是告诉MQ将消息存储在硬盘上。在MQ收到消息且写入硬盘之前如果挂了,那消息就丢了。如果需要保证100%的可用,可以使用后面小节的“发布确认”功能。

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                //durable设置为了true,队列持久化
                channel.QueueDeclare(queue: "task_quene", durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                var props = channel.CreateBasicProperties();
                //消息持久化
                props.Persistent = true;
    
                Console.WriteLine("请输入要发送的消息内容:");
                string msg = null;
                while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
                {
                    var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(exchange: "", routingKey: "task_quene", basicProperties: props, body: body);
    
                    Console.WriteLine("[x] Sent {0}", msg);
                }
            }
        }
        Console.WriteLine("Press [Enter] to exit");
        Console.Read();
    }
    

消费者示例:

​ 默认情况下MQ会按照worker的顺序把队列里的消息一个个的分给worker,这种分配消息的方式有一定的弊端,假如有两个worker且队列里的消息根据耗时长短间隔排列。这样所有耗时长的消息都会被分给worker1,短的分配给worker2. 造成worker2长时间空闲。所以就可以通过设置Qos的方式来改善,channel.BasicQos(0, 1, false)表示broker一次只把1个消息发给worker,直到这个worker发出了ack,才继续把下一个消息分给他。

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("task_quene", true, false, false, null);
            //设置qos
            channel.BasicQos(0, 1, false);
            Console.WriteLine(" [*] Waiting for messages.");
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received +=async (model, args) =>
            {
                byte[] body = args.Body.ToArray();
                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine($"[-] Task {msg} received");
                await Task.Delay(msg.Length * 1000);//模拟耗时任务
                Console.WriteLine(" [x] Task {0} Done", msg);

                //手动发送ack,必须在同一个channel里发送
                channel.BasicAck(args.DeliveryTag, false);
            };
            channel.BasicConsume("task_quene", false, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
        }
    }
}

3.3 消费模式

消费者消费消息有两种模式:

  • 推(push):服务端主动推送消息到channel里,然后消费者消费信道里的消息

  • 拉(pull):消费者手动从服务端拉去消息

在上面的例子中我们看到的其实就是模式,使用的是channel.BasicConsume方法。而模式需要使用channel.BasicGet方法。如:

var response=channel.BasicGet("task_quene",autoAck:false);
var body=response.Body;
channel.BasicAck(response.Envelope.DeliveryTag,false);

4 ※发布订阅模型

一个消息可以被多个消费者消费,此时就用到了交换机。

4.1 交换机(Exchange)

回顾下我们之前的例子:

  • 一个生产者用来发送消息
  • 一个队列用来缓存和存储这些消息
  • 一个消费者用来接收消息

​ RabbitMQ消息模型的设计核心思想是:生产者从来不把消息直接丢给队列,它甚至都不知道要把消息丢给哪个队列。
取而代之的是生产者只需要把消息丢给交换机(exchange)。交换机决定把消息丢给哪个队列,或者丢给哪些队列,或者丢弃这个消息。

下图就是发布订阅的模型:

image-20230817162204352

交换机分为以下几种类型:

  • direct:把消息路由到与RoutingKey完全匹配的队列中
  • topic:把消息路由到符合RoutingKey匹配规则的队列中
  • headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
  • fanout:把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中

4.2 fanout交换机

1. 声明一个临时队列和一个交换机

​ 消费者需要声明一个临时队列,这个临时队列只能是消费者声明。当消费者断开连接时,这个队列将会被删除。(此场景适用于我们的logs接收测试,因为消费者不关系之前的日志是什么)。临时队列的名称类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg格式。

消费者或者生产者也要声明一个交换机。

//创建临时队列(只能是消费者)
var quenuName = channel.QueueDeclare().QueueName;
//创建交换机(生产者或消费者)
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
2. 将交换机与队列绑定

image-20230817162340818

消费者需要将临时队列与交换机进行绑定。

channel.QueueBind(queue:quenuName, exchange:"logs",routingKey:"");
3. 最终模型与代码

image-20230817162502128

与之前例子最大的不同是,此时生产者需要把消息发送到交换机而不是某个队列上。在发送时我们就需要提供一个routingKey,但是fanout模式的交换机会忽略这个参数。

这样当我们发送消息时,与exchange关联的所有队列都可以收到这个消息。

//生产者:
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {

            channel.ExchangeDeclare("logs", ExchangeType.Fanout);


            Console.WriteLine("请输入要发送的消息内容:");
            string msg = null;
            while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
            {
                var body = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

                Console.WriteLine("[x] Sent {0}", msg);
            }

        }
    }
    Console.WriteLine("Press [Enter] to exit");
    Console.Read();
}



//消费者
//这里我们使用channel.QueueDeclare().QueueName创建一个临时队列并返回队列名称。当消费者断开连接时,这个队列将会被删除。(此场景试用与我们的logs接收,因为消费者不关心之前的日志是什么)
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("logs", ExchangeType.Fanout);

            //创建一个临时队列
            var queueName = channel.QueueDeclare().QueueName;
            //交换机与队列的绑定
            channel.QueueBind(queue:queueName, exchange:"logs",routingKey: "");

            Console.WriteLine(" [*] Waiting for messages.");
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, args) =>
            {
                byte[] body = args.Body.ToArray();
                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine($"[-] Task {msg} received");

                //手动发送ack,必须在同一个channel里发送
                channel.BasicAck(args.DeliveryTag, false);
            };
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
        }
    }
}

4.3 路由(Routing)

在上面的打印log例子中,所有的消费者都能收到同一个log消息。在这一节我们将通过路由的方式来订阅消息的子集。例如一个消费者只用来接收critical级别的消息,而其他消费者接收所有消息。

在上一小节中,消费者将队列与交换机绑定时用到了channel.QueueBind方法,表示“这个队列对这个交换机发出消息很感兴趣,愿意接收这些消息”。这个绑定方法还接收一个routingKey参数,取决于不同的交换机类型,这个参数有可能会被忽略(例如我们之前用到的fanout交换机)。

4.3.1 Direct交换机

所以这里我们将以direct类型的交换机为例,如果绑定队列时设置的routingKey等于发送消息时设置的routingKey,这个队列就可以收到消息。举例如下:

image-20230817162809096

direct类型的交换机X下绑定了两个队列Q1和Q2。Q1的routingKey是orange,Q2的routingKey是black和green。所以发送时如果消息的routingKey设置为orange则Q1会接收到,如果是black或green则Q2会接收到,如果是其他的值,则会被交换机直接丢弃 。

看完之后就会发现fanout交换机其实等同于以下形式:
image-20230817162825494

(即两个队列的routingKey一摸一样。)

接下来就实现一个根据日志级别输出到不同的端的发布订阅系统,模型如下:

image-20230817162840250

一个队列用来处理error级别的日志,另外一个用来处理其他类型的日志。

//生产者示例:
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {

            channel.ExchangeDeclare("logs", ExchangeType.Direct);


            Console.WriteLine("请输入要发送的日志级别和内容(格式:级别-内容):");
            string msg = null;
            while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
            {
                var msgArray=msg.Split('-');
                var body = Encoding.UTF8.GetBytes(msgArray[1]);
                channel.BasicPublish(exchange: "logs", routingKey:msgArray[0], basicProperties: null, body: body);

                Console.WriteLine("[x] Sent {0}", msg);
            }

        }
    }
    Console.WriteLine("Press [Enter] to exit");
    Console.Read();
}


//消费者示例:
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("logs", ExchangeType.Direct);


            var queueName = channel.QueueDeclare().QueueName;
            Console.WriteLine("请输入你要处理的日志级别(格式:error-info-warn)");
            var level = Console.ReadLine();
            level.Split('-').ToList().ForEach(l =>
            {
                channel.QueueBind(queueName, "logs", l);
            });

            Console.WriteLine(" [*] Waiting for messages.");
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, args) =>
            {
                byte[] body = args.Body.ToArray();
                var msg = Encoding.UTF8.GetString(body);
                Console.WriteLine($"[-] Task {msg} received");

                //手动发送ack,必须在同一个channel里发送
                channel.BasicAck(args.DeliveryTag, false);
            };
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
        }
    }
}

4.4 主题(Topics)

​ 上一节我们学习了如何使用direct交换机。虽然相对于fanout来说我们有了更多的选择来接收不同类型的消息。但是如果我们有更复杂点的需求,比如:接收app1发来的所有log,只接收app2发来的error级别的log。此时direct就显得无能为力了,就需要更高级点的topic交换机。

4.4.1 Topic交换机

这种交换机与direct大同小异,只不过是routingKey的格式有区别,加了一些通配符和分隔符。下面讲解下topic具体的格式:

  • routingKey是一个字符串,里面可以随便写,最长255字节;
  • .小数点分隔符:将routingKey分隔为若干部分,一般来说每部分都是一个单词。比如hello.world.ni.hao、hello.world。
  • 星号通配符:用来匹配单个单词,如hello..ni.hao、*.world。
  • 井号通配符:用来匹配0个或多个单词,如hello.#.hao、#.hao。

参考有以下示例:

image-20230817163100035

routingKey的格式为:..。根据上图可以看到Q1只关心颜色为orange的物种。Q2关心兔子(不管它是啥颜色的,也不管它速度到底快不快)和速度为lazy的物种。所以:

  • quick.orange.rabbit:会放到Q1和Q2.
  • lazy.orange.elephant:会放到Q1和Q2.
  • quick.orange.fox:只会放到Q1.
  • lazy.brown.fox:只会放到Q2.
  • lazy.pink.rabbit:只会放到Q2. 虽然匹配Q2上的两条routingKey,但是也只发送一次。
  • quick.brown.fox和quick.orange.male.rabbit和orange消息会直接丢弃,因为找不到任何匹配。
  • lazy.orange.male.rabbit:即使含有4个单词,但是依然能够匹配到Q2.

从上可以看出:

  • fanout其实就是routingKey为#的Topic。
  • direct其实就是routingKey不含任何#和*的Topic

4.4.2 最终实现

接下来就是实现前面所说的日志打印功能:接收app1发来的所有log,只接收app2发来的error级别的log。

只需要修改上一章节的队列的routingKeyapp1.log.*app2.log.error即可。

5 实现远程过程调用(RPC)

​ 一般的RPC我们都是通过WebApi的方式来实现。但是RabbitMQ也可以实现RPC,之前我们介绍手动ack时用了Task.Wait来模拟耗时操作。这里我们将这个操作交给远程一个计算斐波那契数列的RPC服务来模拟。

调用模型如下:

image-20230817163238884

  • Client:发布一个耗时任务到rpc_queue队列,并消费amp.gen-Xa2队列(callback队列)的消息

  • Server:消费rpc_queue,并将计算后的结果放入amp.gen-Xa2.

  • reply_to: Client告诉server,执行完耗时任务之后,应该把结果放到哪个回调队列里。这里设置为amp.gen-Xa2。

  • correlation_id:表示该消息是对另一个消息的响应,用来关联请求与响应。每一个请求都有一个唯一的id。 client消费回调队列时,可以根据这个id匹配到对应的请求。

    //RPC Client:
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace RpcClient
    {
        public class Client
        {
            private readonly IConnection connection;
            private readonly IModel channel;
            private readonly string replyQueueName= "reply_queue";
            private readonly EventingBasicConsumer consumer;
            private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
            private readonly IBasicProperties props;
    
            public Client()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
    
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                 channel.QueueDeclare(queue: replyQueueName);
                consumer = new EventingBasicConsumer(channel);
    
                props = channel.CreateBasicProperties();
                var correlationId = Guid.NewGuid().ToString();
                props.CorrelationId = correlationId;
                props.ReplyTo = replyQueueName;
    
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var response = Encoding.UTF8.GetString(body);
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        respQueue.Add(response);
                    }
                };
            }
    
            public string Call(string message)
            {
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(
                    exchange: "",
                    routingKey: "rpc_queue",
                    basicProperties: props,
                    body: messageBytes);
    
                channel.BasicConsume(
                    consumer: consumer,
                    queue: replyQueueName,
                    autoAck: true);
    
                return respQueue.Take();
            }
    
            public void Close()
            {
                connection.Close();
            }
        }
        class Program
        {
            static void Main(string[] args)
            {
                
                string number;
                Console.WriteLine("请输入数字:");
                while(!string.IsNullOrEmpty(number=Console.ReadLine()))
                {
                    var rpcClient = new Client();
                    Console.WriteLine($" [x] Requesting fib({number})");
                    var response = rpcClient.Call(number);
    
                    Console.WriteLine(" [.] Got '{0}'", response);
                    rpcClient.Close();
                }
               
    
                Console.WriteLine("----End-----");
                Console.ReadLine();
            }
        }
    
    }
    
    
    //RPC Server:
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    namespace RpcServer
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue", durable: false,exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
    
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, args) =>
                    {
                        string response = null;
    
                        var props = args.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;
    
                        try
                        {
                            var message = Encoding.UTF8.GetString(args.Body.ToArray());
                            int n = int.Parse(message);
                            Console.WriteLine(" [.] fib({0})", message);
                            response = fib(n).ToString();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            //将结果回填至callback队列,然后手动ack
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(args.DeliveryTag,false);
                        }
                    };
                    channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
    
                    Console.WriteLine(" [x] Awaiting RPC requests");
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
    
            private static int fib(int n)
            {
                if (n == 0 || n == 1)
                {
                    return n;
                }
    
                return fib(n - 1) + fib(n - 2);
            }
        }
    }
    

标签:Console,入门,队列,RabbitMQ,交换机,消息,var,channel
From: https://www.cnblogs.com/eaknow/p/17638070.html

相关文章

  • 构建跨平台的移动应用程序:Xamarin入门
    介绍:在移动应用开发领域,跨平台的解决方案变得越来越受欢迎。Xamarin是一种流行的跨平台移动应用开发框架,它允许开发者使用C#语言来构建同时运行在iOS和Android平台上的应用程序。本篇博客将带您入门Xamarin开发,展示如何构建跨平台的移动应用程序。步骤1:安装和设置环境在开始之前,......
  • 入门级echarts地图高亮
    入门级echarts地图高亮♥需求我们需要在各个省的地图中对指定城市进行高亮,直辖市在中国地图中高亮。实现1.首先导入echartsnpminstallechartsECharts(EnterpriseCharts)是一个由百度开发的开源图表库,它提供了丰富的交互性、可定制性和扩展性,用于创建各种类型的数据可视化......
  • VSCode使用入门
    学习导航:VSCode入门MarkDown在VSCode环境下使用......
  • 鸿蒙入门开发教程:一文带你详解工具箱元服务的开发流程
    鸿蒙入门开发教程:一文带你详解工具箱元服务的开发流程一,基本概念元服务(原名原子化服务)是一种基于HarmonyOSAPI的全新服务提供方式,以HarmonyOS万能卡片等多种呈现形态,向用户提供更轻量化的服务。具有即用即走、信息外显、服务直达的特性。万能卡片(简称卡片)是一种界面展示形式,可......
  • Sqlite3的入门操作
    Sqlite3的下载Sqlite3整活有点东西,直接看图吧。操作系统:windows10如果你是第一次用sqlite3,直接会给你干自闭。一般情况下你只会下载序号2的zip文件,然后写代码的时候,会发现头文件呢?没错,你又要回来下载序号1的zip文件。找了一份example代码,编译的时候有报错,链接失败。你......
  • 轻松入门云技能,一文解锁亚马逊云科技各类认证资讯
    云职场的00后,二十而已,仍在寻找运动服与格子衫之间的共同点;云职场的90后,三十而立,以实现WLB(work—lifebalance)作为目标;云职场的80后,四十不惑,是否真的找到了云职场的清醒法则?有人说:云职场就像围城外面的人想进去,里面的人想出去;有人说:云职场里看似人人清醒,其实,大家都有各自的迷惑? 《......
  • Python爬虫之scrapy框架入门
    特点:scrapy利用twisted的设计实现了非阻塞的异步操作。这相比于传统的阻塞式请求,极大的提高了CPU的使用率,以及爬取效率。配置简单,可以简单的通过设置一行代码实现复杂功能。可拓展,插件丰富,比如分布式scrapy+redis、爬虫可视化等插件。解析方便易用,scrapy封装了xpath等解析......
  • Redis 7的入门到精通的学习路线可以分为三个层次:入门、进阶和精通
    Redis7的入门到精通的学习路线可以分为三个层次:入门、进阶和精通学习Redis7的入门到精通的学习路线可以分为三个层次:入门、进阶和精通。下面是每个层次的学习内容和示例代码讲解。##入门阶段:1.**安装和配置Redis**:了解如何下载、安装和配置Redis的基本参数。可以使用Redis......
  • Apache ECharts_入门
         ......
  • 「学习笔记」指针的基础入门
    为啥会突然学这个呢?因为长链剖分优化DP的状态转移用到了指针数组,平时的STL使用中也经常碰到指针。So,就去学了一下,记录一下学习的笔记。我绝对不会告诉你另一个原因是因为最近做DP做累了想来写篇博文水水时间引入我们平时用scanf输入的时候,都会在变量名前加一个&,但是,字......