首页 > 其他分享 >RabbitMq

RabbitMq

时间:2023-07-07 15:36:15浏览次数:44  
标签:false 队列 dead queue RabbitMq channel 消息

1,RabbitMq 简介

是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

官网
安装

2,RabbitMq 几个术语

1. Exchange - 交换机

生产者将消息发送给交换机,交换机按照一定规则分发消息给指定队列。消息根据交换机类型和 binding 可以投递到多个队列中。

常用的交换机有四种。

  1. 直连交换机

directExchange: 根据 routeKey 匹配队列

@Bean
public DirectExchange directExchangeDemo(){
    /*
    * 直连交换机
    * 一共四个参数:String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
    *       name: 名称
    *       durable: 持久化
    *       autoDelete:自动删除
    *       arguments:参数
    *
    * */
    return new DirectExchange("directExchangeTest",true,false);
}
  1. 扇形交换机

FanoutExchange:不用匹配 routekey,所有队列都能获取扇形交换机分发的消息

@Bean
public FanoutExchange fanoutExchangeDemo(){
    /* 扇形交换机 */
    return new FanoutExchange("fanoutExchangeTest",true,false);
}
  1. 主题交换机

TopicExchange: 增强版的直连交换机,路由键 routekey 中,* 代表匹配任意一个单词,# 代表匹配任意一个或多个单侧, . 代表一个部分(www.# 可以匹配 www.aaa)

@Bean
public TopicExchange topicExchangeDemo(){
    return new TopicExchange("topicExchangeTest1",true,false);
}
  1. 头部交换机

HeadersExchange : 通过头部键值对匹配队列的交换机

@Bean
public HeadersExchange headersExchangeDemo(){
    /* 头部交换机 */
    return new HeadersExchange("headersExchangeTest",true,false);
}

2. Broker

接收和分发消息的应用,就是 mq 的服务端。

3. Virtual host

虚拟分组,类似于 nameSpace。

4. Connection

publisher/customer 和 broker 直接的连接。

5. Channel

信道,复用 Connection。

6. Exchange

交换机,message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中。

7. Queue

最终消息被送到这里等待被 customer 取走。

8. Binding

exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key, Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

3. 消息队列大致使用过程

  1. 启动一个消息队列服务器
  2. 客户端连接到消息队列服务器,打开一个 channel
  3. 客户端声明一个 exchange,并设置相关属性
  4. 客户端声明一个 queue,并设置相关属性
  5. 客户端使用 routing key,在 exchage 和 queue 中建立绑定关系
  6. 生产者投递消息到 exchange,exchange 接收到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到对应的队列中。
  7. 消费者消费队列中的消息。

4,消息应答

创建消费者:

/**
 *  消费者消费消息
 *    1,消费哪个队列
 *    2,消费成功之后是否要自动应答 "true" 代表自动应答 "false" 手动应答
 *    3,消费者未成功消费的回调
 * */
channel.basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);

确认消费:

/**
 * 参数 1,消息标记
 *      2,false 表示只应答接收到那个传递的消息
 * 用于肯定确认:RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
 * 
 * multiple 的 true 和 false 代表不同意思:
 *        true 表示批量应答 channel 上未应答的消息,false 表示只应答当前 channel 上的消息。
 * */
Channel.basicAck(long deliveryTag, boolean multiple)

拒绝消费

/**
 *    参数 1,消息标记
 *         2,是否应答 channel 上所有未应答的消息
 *         3,是否重新入列
 *    用于否定消息
 * */ 
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

拒绝消费

/**    
 *    参数 1,消息标记
 *         3,是否重新入列
 *    用于否定消息,相比 basicNack 缺少 multiple 参数,不能批量确认
 * */
Channel.basicReject(long deliveryTag, boolean requeue)
手动确认 demo
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("yanqi");
        factory.setPassword("5211314");
        factory.setVirtualHost("love");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("Custom1 等待接收消息....");

        //消费消息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody());
                System.out.println(message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        //取消消息
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag);
            System.out.println("消息消费被中断");
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3,消费成功
         * 4.消费者未成功消费的回调
         */
        channel.basicConsume("中华艺术宫", false, deliverCallback, cancelCallback);
    }
}

5,队列持久化

//durable:true 表示队列持久化,false 表示不持久化,重启 rabbitmq 队列就没了

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
/**
 * 生成一个队列
 * 1.队列名称
 * 2.队列里面的消息是否持久化 默认消息存储在内存中
 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
 * 5.其他参数
 */
channel.queueDeclare("中华艺术宫", false, false, false, null);

6,消息持久化

//props 中添加 MessageProperties.PERSISTENT_TEXT_PLAIN
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
/**
 * 发送一个消息
 * 1.发送到那个交换机
 * 2.路由的 key 是哪个
 * 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
 * 4.发送消息的消息体
 */
channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

7,预取值

  Channel 上未确认的缓冲区,通过 basicQos(int prefetchCount) 设置值,避免缓冲区无限制未确认大小。通过设置预取值,还可以根据不同消费者性能问题实现不公平分发。

8,发布确认

生成者将 Channel 设置成 confirm 模式,一旦消息被投递到所有匹配的队列之后, broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。

发布确认
public class PushlierConfirm {
    public static void main(String[] args) {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("yanqi");
        factory.setPassword("5211314");
        factory.setVirtualHost("love");

        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {

            //发布确认
            channel.confirmSelect();

            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare("中华艺术宫", false, false, false, null);

            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
             * 4.发送消息的消息体
             */
            int i = 0;
            while(true){
                String message = "hello world--" + (++i);
                channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                boolean b = channel.waitForConfirms();
                if(b){
                    System.out.println("消息 " + i + " 发布成功!");
                }else{
                    System.out.println("消息 " + i + " 发布失败!");
                }

                Thread.sleep(3_000);
            }

            //System.out.println("消息发送完毕");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

9,死信队列

无法被消费的消息。

来源:
1)消息 TTL 过期
2)队列达到最大长度,无法再添加数据到 mq 中
3)被拒绝的消息,并且 requeue = false

声明死信队列 demo
public static void rejectCustom() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    /**
     * 声明死信队列
     * queueDeclare(String queue,
     *              boolean durable,
     *              boolean exclusive,
     *              boolean autoDelete,
     *              Map<String, Object> arguments)
     *   queue:队列名
     *   durable:是否持久化
     *   exclusive:该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
     *   autoDelete:是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
     *   arguments:其他参数
     */
    String dead_queue = "dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 私信队列绑定交换机
     * */
    String dead_exchange = "dead_exchange";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, "dead_routing");

    //声明正常队列
    String normal_queue = "normal_queue";
    Map<String, Object> params = new HashMap<>();
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", "dead_routing");
    channel.queueDeclare(normal_queue, false, false, false, params);


    //等待接收消息
    System.out.println("等待接收消息----");
    channel.basicConsume(normal_queue, false, (consumerTag, message) -> {
        System.out.println(new String(message.getBody(), "UTF-8"));
        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    }, consumerTag -> {
        System.out.println("消费失败");
    });
}

10,延时队列

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

设置超时时间
1)队列的 TTl,队列中的消息一旦过了 TTL 时间未被消费,就会丢弃(有死信队列就放到死信队列中)
2)消息的 TTL,即使消息过期,也不一定被马上丢弃

Rabbitmq 插件实现延迟队列

延时队列
//消息队列设置延时,投送消息到普通队列,ttl 时间内未被消费,投送到死信队列
public static void delay_queue() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    //死信队列
    String dead_queue = "delay_dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 死信队列绑定交换机
     * */
    String dead_exchange = "delay_dead_exchange";
    String delay_routing_key = "delay_routing";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, delay_routing_key);

    //声明带有 ttl 的队列
    String queue = "delay_queue";
    Map<String, Object> params = new HashMap<>();

    //设置队列的 ttl 时间
    params.put("x-message-ttl", 5000);
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", delay_routing_key);
    channel.queueDeclare(queue, false, false, false, params);

    channel.basicPublish("", queue, null, "延时队列数据:1".getBytes());
    channel.basicPublish("", queue, null, "延时队列数据:2".getBytes());
    channel.basicPublish("", queue, null, "延时队列数据:3".getBytes());
}

/**
 * 消息延时
 * 消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
 * 可以用 java DelayQueue
 */
public static void delay_message() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    //死信队列
    String dead_queue = "delay_dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 死信队列绑定交换机
     * */
    String dead_exchange = "delay_dead_exchange";
    String delay_routing_key = "delay_routing";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, delay_routing_key);

    //声明带有 ttl 的队列
    String queue = "delay_queue";
    Map<String, Object> params = new HashMap<>();

    //设置队列的 ttl 时间
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", delay_routing_key);
    channel.queueDeclare(queue, false, false, false, params);

    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("10000").build(), "延时队列数据:1".getBytes());
    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("3000").build(), "延时队列数据:2".getBytes());
    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("300").build(), "延时队列数据:3".getBytes());
}

标签:false,队列,dead,queue,RabbitMq,channel,消息
From: https://www.cnblogs.com/cnff/p/14781671.html

相关文章

  • 2023-07-06:RabbitMQ中的AMQP是什么?
    2023-07-06:RabbitMQ中的AMQP是什么?答案2023-07-06:AMQPAMQP(AdvancedMessageQueuingProtocol)是一个应用层协议的开放标准,旨在设计面向消息的中间件。基于AMQP协议的客户端和消息中间件可以自由地传递消息,不受客户端、中间件产品或开发语言的限制。其目标是实现一种被广泛应用......
  • Linux安装RabbitMQ详细教程
    一、环境准备1、RabbitMQ版本和Erlang版本兼容性关系https://www.rabbitmq.com/which-erlang.html2、ErLang安装教程https://blog.csdn.net/laterstage/article/details/131513793?spm=1001.2014.3001.55013、RabbitMQ的安装依赖于erlang所以先安装4、RabbitMQ下载链接weg......
  • rabbitmq在springboot中实战技巧
    一.简介rabbitmq是基于AMQP(AdvancedMessageQueuingProtocol:高级消息队列协议),采用Erlang语言编写的消息队列。二、mq能用来做什么异步处理:将非核心业务(比如日志、邮件、监控等)从主流程剥离,提升主流程的响应时效。削峰:当并发大的情况下,可以将消息暂存在消息队列中,消费者按照......
  • docker启动RabbitMQ以及常见问题解决
    docker启动MQ容器下载docker镜像dockersearchrabbitmqdockerpullrabbitmqdockerrun-d--hostnamemy-rabbit--namerabbit-p15672:15672-p5672:5672rabbitmq:latest启动容器后浏览器无法访问dockerexec-it3b124f0c9712/bin/bashrabbitmq-pluginsenab......
  • RabbitMQ03
    1.RabbitMQ死信队列1.1死信队列简介在实际开发项目是,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。死信队列:RabbitMQ中并不是直接声明......
  • rabbitmq 开启 virtual host
    由于我的rabbitmq架设在测试服务期。导致我本地测试的mq消息,经常被服务器消费掉。所以通过添加v-host,可以创建专属v-host域下的消息进行生产和消费。 一新增用户点击Admin,点击右边Users 输入UsernamePassword,并且Tags给与Admin权限 二创建virtualhost点击vi......
  • 制作有延迟插件的rabbitmq镜像
    插件Git官方地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchangeDockerfileFROMrabbitmq:3.8.2-managementADD./rabbitmq_delayed_message_exchange-3.8.0.ez/pluginsRUNchown-Rrabbitmq./plugins/rabbitmq_delayed_message_exchange-3.8.0.ezRUNrabb......
  • RabbitMQ02
    1.rabbitmq五种消息模型1.1work消息模型-工作队列模型工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时......
  • RabbitMQ消息持久化
    我们看下之前启动idea测试消息发送的时候在后台生成的一条消息,现在已经在消息队列里面还没有被消费。 现在我们重启下RabbitMQ,执行linux命令:dockerrestartmq看上图实时显示的错误信息,失去连接了,接下来刷新这个页面,可以发现这个对象没有了。 说明rabbit消息并不会持久化,不......
  • Python操作RabbitMq
    Python操作RabbitMq:pika--队列)安装使用其他读取方法安装pipinstallpika使用importpika#建立连接#user:账号 pwd:密码userx=pika.PlainCredentials(user,pwd)#hosh:rabbitmq所在的ip port:端口号parameters=pika.ConnectionParameters(host,int(port)......