首页 > 其他分享 >《RabbitMQ实战指南》笔记

《RabbitMQ实战指南》笔记

时间:2023-01-29 11:12:25浏览次数:137  
标签:指南 交换器 队列 笔记 路由 消息 RabbitMQ channel

1. RabbitMQ简介

1.1 什么是消息中间件

消息队列中间件(Message Queue Middleware,简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

消息队列中间件一般有两种传递模式:

  • 点对点(P2P,Point-to-Point)模式:点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。
  • 发布/订阅(Pub/Sub)模式:发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

1.2 消息中间件的作用

  • 解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。
  • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
  • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
  • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
  • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
  • 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
  • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

1.3 RabbitMQ的特点

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,
    RabbitMQ 已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个
    交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  • 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息
    中间件协议。
  • 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
  • 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集
    群中的节点等。
  • 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自
    己的插件。

1.4 RabbitMQ的安装及简单使用

使用yum安装
进入到网站https://packagecloud.io/rabbitmq

1.4.1 安装 Erlang

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
sudo yum install erlang

1.4.2 安装RabbitMQ

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

sudo yum install rabbitmq-server

1.4.3 运行RabbitMQ

systemctl start rabbitmq-server

1.4.4 配置环境变量

1) 找到rabbitmq安装位置

[root@localhost bin]# whereis rabbitmq
rabbitmq: /usr/lib/rabbitmq /etc/rabbitmq

2) 在/etc/profile中添加执行脚本路径,内容如下

export PATH=$PATH:/usr/lib/rabbitmq/bin
export RABBITMQ_HOME=/usr/lib/rabbitmq

3) 使环境变量生效

source /etc/profile

1.4.5 创建用户

默认情况下,访问 RabbitMQ 服务的用户名和密码都是“guest”,这个账户有限制,默认只能通过本地网络(如 localhost)访问,远程网络访问受限.

1) 添加新用户,用户名为“root”,密码为“root123”:

[root@localhost bin]# rabbitmqctl add_user root root
Adding user "root" ...

2) 为 root 用户设置所有权限:

[root@localhost bin]# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/" ...

3) 设置 root 用户为管理员角色:

[root@localhost bin]# rabbitmqctl set_user_tags root administrator
Setting tags for user "root" to [administrator] ...

1.4.6 生产和消费消息

引入java客户端

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

生产消息代码:

public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.79.132";
    //RabbitMQ 服务端默认端口号为 5672
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection();//创建连接
        Channel channel = connection.createChannel();//创建信道
        //创建一个 type="direct"、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        //创建一个持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //将交换器与队列通过路由键绑定,这里的ROUTING_KEY就是BindingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        //发送一条持久化的消息:hello world!
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费消息代码:

public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.79.132";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException,
            TimeoutException, InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("root");
        //这里的连接方式与生产者的 demo 略有不同,注意辨别区别
        Connection connection = factory.newConnection(addresses);//创建连接
        final Channel channel = connection.createChannel();//创建信道
        channel.basicQos(64);//设置客户端最多接收未被 ack 的消息的个数
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
        //等待回调函数执行完毕之后,关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

2. RabbitMQ入门

2.1 RabbitMQ模型架构

img

2.1.1 生产者和消费者

  • Producer:生产者,就是投递消息的一方。消息一般可以包含 2 个部分:消息体和标签(Label)
    • 消息体:业务逻辑结构数据。
    • 标签:用来表述这条消息,消息的标签比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
  • 消费者,就是接收消息的一方。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,
  • Broker:消息中间件的服务节点。

下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程:
img

2.1.2 队列

Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。

  • 多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

2.1.3 交换器、路由键、绑定

交换器相当于投递包裹的邮箱,RoutingKey 相当于填写在包裹的邮箱,BindingKey 相当于包裹的目的地,当填写在包裹上的地址和实际想要投递的地址相匹配
时,那么这个包裹就会被正确投递到目的地,最后这个目的地的“主人”——队列可以保留这个包裹。如果填写的地址出错,邮递员不能正确投递到目的地,包裹可能会回退给寄件人,也有可能被丢弃。

  • Exchange:生产者将消息发送到 Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。投递包裹的邮箱
  • RoutingKey:用来指定这个消息的路由规则,而这个 Routing Key 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。包裹的邮箱
  • Binding:RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey)。包裹的目的地
    生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。

2.1.4 交换机类型

  • fanout:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
  • direct:direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey 和 RoutingKey完全匹配的队列中。
  • headers:headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
  • topic:topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
    • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、 “java.util.concurrent”、 “com.hidden.client”。
    • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串。
    • BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)。
topic类型的example:
  • 路由键为“com.rabbitmq.client”的消息会同时路由到 Queue1 和 Queue2。
  • 路由键为“com.hidden.client”的消息只会路由到 Queue2 中。
  • 路由键为“com.hidden.demo”的消息只会路由到 Queue2 中。
  • 路由键为“java.rabbitmq.demo”的消息只会路由到 Queue1 中。
  • 路由键为“java.util.concurrent”的消息将会被丢弃或者返回给生产者(需要设置
    mandatory 参数),因为它没有匹配任何路由键。
    img

2.1.5 RabbitMQ运转流程

生产者发送消息:

  1. 生产者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
  2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等。
  3. 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等。
  4. 生产者通过路由键将交换器和队列绑定起来。
  5. 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息。
  6. 相应的交换器根据接收到的路由键查找相匹配的队列。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
  9. 关闭信道。
  10. 关闭连接。

消费者接收消息的过程:

  1. 消费者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
  4. 消费者确认(ack)接收到的消息。
  5. RabbitMQ 从队列中删除相应已经被确认的消息。
  6. 关闭信道。
  7. 关闭连接。

Connection 和 Channel

Connection 就是 TCP 连接,而 Channel(AMQP 信道) 是建立在 Connection 之上的虚拟连接,每个信道都会被指派一个唯一的 ID。

我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。

2.2 AMQP协议介绍

AMQP 是应用层的协议,AMQP 说到底还是一个通信协议,AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于 HTTP 中的方法(GET、POST、PUT、DELETE 等)。

3. 客户端开发向导

本章主要介绍 RabbitMQ 客户端开发的简单使用,按照一个生命周期的维度对连接、创建、生产、消费和关闭等几个方面进行笼统的介绍

3.1 连接RabbitMQ

方式一:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME); 
factory.setPassword(PASSWORD); 
factory.setVirtualHost(virtualHost); 
factory.setHost(IP_ADDRESS); 
factory.setPort(PORT); 
Connection conn = factory.newConnection(); 

方式二:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();

注意事项

  • Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,我们应该为每一个线程开辟一个 Channel,因为导致在网络上出现错误的通信帧交错,同时也会影响发送方确认(publisher confirm)机制的运行。
  • 不推荐在生产环境的代码上使用 isOpen 方法,因为这是一个重量级锁的方法,而且通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为
    Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其已经处于关闭状态,那么程序会抛出一个
    com.rabbitmq.client.ShutdownSignalException,我们只需捕获这个异常即可,同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。如下
public void validMethod(Channel channel)
{
    try {
        ... 
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
    ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

3.2 使用交换器和队列

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器.队列被声明为持久化的、非排他的、非自动删除的。

3.2.1 exchangeDeclare 方法详解

exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

Exchange.DeclareOk exchangeDeclare(String exchange,
                                    String type, boolean durable,
                                    boolean autoDelete, boolean internal,
                                    Map<String, Object> arguments) throws IOException;
参数 解释
exchange 交换器的名称
type 交换器的类型,常见的如 fanout、direct、topic
durable 设置是否持久化
autoDelete 设置是否自动删除。autoDelete 设置为 true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
internal 设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument 其他一些结构化参数

与 exchangeDeclare 师出同门的还有exchangeDeclarePassive。定义如下:Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;它主要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭。

3.2.2 queueDeclare 方法详解

只有两个重载方法:

方法一:创建一个由 RabbitMQ 命名的(类似这种amq.gen-LhQz1gv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列)、排他的、自动删除
的、非持久化的队列。

Queue.DeclareOk queueDeclare() throws IOException;

方法二:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,boolean autoDelete, Map<String, Object> arguments) throws IOException;
参数 解释
queue 队列的名称
durable 设置是否持久化
exclusive 为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;2. “首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
autoDelete 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
arguments 设置队列的其他一些参数

3.2.3 queueBind 方法详解

Queue.BindOk queueBind(String queue, String exchange, String routingKey,
                        Map<String, Object> arguments) throws IOException;
参数 解释
queue 队列名称
exchange 交换器的名称
routingKey 用来绑定队列和交换器的路由键
argument 定义绑定的一些参数

3.2.4 exchangeBind 方法详解

我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定,后者和前者的用法如
出一辙,

3.2.5 何时创建

提前创建的好处:

  1. 如果业务本身在架构设计之初已经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。
  2. 可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失。

在业务中创建:

  1. 如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。

3.3 发送消息

这条消息的投递模式(delivery mode)设置为 2,即消息会被持久化(即存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为 1,content-type为“text/plain”。可以自己设定消息的属性:

channel.basicPublish(exchangeName, routingKey,
                    new AMQP.BasicProperties.Builder()
                    .contentType("text/plain")
                    .deliveryMode(2)
                    .priority(1)
                    .userId("hidden")
                    .build()),
                    messageBodyBytes);

3.4 消费消息

RabbitMQ 的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用 Basic.Get 进行消费。

3.4.1 推模式

在推模式中,可以通过持续订阅的方式来消费消息,其中消费者标签(consumerTag)来区分是谁消费的。

boolean autoAck = false;
channel.basicQos(64); //限制推送消息的个数
channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body)
            throws IOException
        {
            String routingKey = envelope.getRoutingKey();
            String contentType = properties.getContentType();
            long deliveryTag = envelope.getDeliveryTag();
            // (process the message components here ...)
            channel.basicAck(deliveryTag, false);//false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
        }
});

上面代码中显式地设置 autoAck 为 false,然后在接收到消息之后进行显式 ack 操
作(channel.basicAck),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。

3.4.2 拉模式

这里讲一下拉模式的消费方式。通过 channel.basicGet 方法可以单条地获取消息,其
返回值是 GetRespone。

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

关键代码如下:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

3.4.3 推模式 vs 拉模式

Basic.Consume 将信道(Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

3.5 消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message
acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数:

  • false: 当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
  • true:RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

3.6 关闭连接

在应用程序使用完之后,需要关闭连接,释放资源:

channel.close();
conn.close();

显式地关闭 Channel 是个好习惯,但这不是必须的,在 Connection 关闭的时候,
Channel 也会自动关闭。

还可以添加监听事件 addShutdownListener,当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 shutdownCompleted:

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
    {
        ... 
    }
});

ShutdownSignalException 还提供了多个方法来分析关闭的原因:

public void shutdownCompleted(ShutdownSignalException cause)
{
    if (cause.isHardError())
    {
        Connection conn = (Connection)cause.getReference();
        if (!cause.isInitiatedByApplication())
        {
            Method reason = cause.getReason();
            ...
        }
        ...
    } else {
        Channel ch = (Channel)cause.getReference();
        ...
    }
}

标签:指南,交换器,队列,笔记,路由,消息,RabbitMQ,channel
From: https://www.cnblogs.com/theheboy/p/17066259.html

相关文章

  • shell/Linux 任务学习笔记整理2:head/tail命令
    注!!:笔记来源:(原文链接:)https://blog.csdn.net/zznnniuu/article/details/123155074      版权声明:本文为CSDN博主「zznnniuu」的原创文章原文链接:https://blog......
  • 13--linux常用操作 | 青训营笔记
    这是我参与「第五届青训营」伴学笔记创作活动的第13天Linux命令大全|菜鸟教程(runoob.com)1.ls命令ls可能是每个Linux用户在其终端中键入的第一个命令。它允许......
  • shell/Linux 任务学习笔记整理1:wc/awk/sed
    注: 笔记来源:(原文链接:)https://blog.csdn.net/qq_37085158/article/details/127170488一 wc:统计文件的字节数、单词数、行数wc命令来自于英文词组“Wordcount”的缩写,......
  • JavaScript学习笔记—DOM之初识
    document浏览器为我们提供了一个document对象,是一个全局变量代表整个网页...<body><buttonid="btn">点我一下</button><script>//获取btn对象cons......
  • 读Java8函数式编程笔记04_类库
    1. 默认方法1.1. 接口中定义的包含方法体的方法,方法名有default关键字做前缀1.1.1. 在任何接口中,无论函数接口还是非函数接口,都可以使用该方法1.2. Collection接口......
  • Java8学习笔记
    OracleJDK是基于OpenJDK源代码的商业版本,要学习Java新技术可以去OpenJDK官网学习。Lambda表达式介绍匿名内部类存在的问题newThread(newRunnable(){@Ove......
  • 10、CSS权威指南--第 6 章(p213)文本属性
    文本和字体之间有什么区别呢?简单而言,文本是内容,而文字是用于显示内容的。6.1 缩进和行内对齐块级方向指当前书写模式放置块级元素的方向。行内方向指块级元素中行内元......
  • 分块学习笔记
    分块优雅的暴力。分块的思想是通过划分和预处理来达到时间复杂度的平衡。分块后任取一区间,中间会包含整块和零散块。一般对零散块暴力处理,整块通过预处理的信息计算。......
  • JavaScript学习笔记—DOM简介
    DOM(DocumentObjectModel)文档对象模型使用JS去操作网页的一组对象DOM属于WebAPI的一部分。WebAPI中定义了非常多的对象,通过这些对象可以完成对网页的各种操作(添加删......
  • JavaSE学习笔记Day 1
     packagecom.baidu.demo;/***@authorbaozi*@version1.0*@since1.8*/publicclassDemo01{Stringname;/****@paramname*@return......