首页 > 其他分享 >17-RabbitMQ高级特性-TTL队列/消息

17-RabbitMQ高级特性-TTL队列/消息

时间:2022-10-04 23:34:29浏览次数:46  
标签:String 17 ttl RabbitMQ TTL import RabbitMQHelper channel

TTL队列/消息

TTL: Time To Live, 生存时间

  • RabbitMQ支持消息的过期时间, 在消息发送时可以指定
  • RabbitMQ支持队列的过期时间, 从消息进入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会被自动清除

TTL队列代码实现

消费者

package com.dance.redis.mq.rabbit.ttl;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Receiver4TTLExchange {

    public static void main(String[] args) throws Exception {
        // TTL队列
        Channel channel = RabbitMQHelper.getChannel();
        // 声明正常的 exchange queue 路由规则
        String queueName = "test_ttl_queue";
        String exchangeName = "test_ttl_exchange";
        String exchangeType = "topic";
        String routingKey = "ttl.*";
        RabbitMQHelper.exchangeDeclare(channel, exchangeName, RabbitMQHelper.EXCHANGE_TYPE_TOPIC);

        Map<String, Object> arguments = new HashMap<>();
        // 指定队列的消息过期时间
        arguments.put("x-message-ttl", 6000);
        // 添加队列扩展参数
        RabbitMQHelper.queueDeclare(channel, queueName, true, arguments);
        channel.queueBind(queueName, exchangeName, routingKey);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(queueName, false, consumer);
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.ttl;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class Sender4TTLExchange {


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_ttl_exchange";
        String routingKey = "ttl.test";
        Map<String, Object> headers = new HashMap<>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers).build();
        String msg = "Hello World RabbitMQ 4 TTL Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
    }

}

TTL队列测试

启动消费者

启动生产者

查看消费者

消费成功

此时, 停止消费者, 查看控制台

从队列上的标记features, 也可以看到这是一个TTL队列

队列中是没有消息的,不要启动消费者, 直接启动生产者发送一条消息

查看控制台

可以看到有一条消息, 等待6秒再次查看

消息已经被删除

TTL消息代码实现

消费者

package com.dance.redis.mq.rabbit.ttl.message;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Receiver4TTLExchange {

    public static void main(String[] args) throws Exception {
        // TTL队列
        Channel channel = RabbitMQHelper.getChannel();
        // 声明正常的 exchange queue 路由规则
        String queueName = "test_ttl_queue_message";
        String exchangeName = "test_ttl_exchange_message";
        String routingKey = "ttl.message.*";
        RabbitMQHelper.exchangeDeclare(channel, exchangeName, RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        RabbitMQHelper.queueDeclare(channel, queueName, true, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(queueName, false, consumer);
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.ttl.message;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class Sender4TTLExchange {

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_ttl_exchange_message";
        String routingKey = "ttl.message.test";
        Map<String, Object> headers = new HashMap<>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                // TTL 消息时间 10秒
                .expiration("10000")
                .headers(headers).build();
        String msg = "Hello World RabbitMQ 4 TTL Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
    }

}

TTL消息测试

启动消费者

启动生产者

查看消费者

消费成功, 此时停止消费者, 查看控制台

队列上并没有TTL标记, 所以这不是TTL队列, 这个时候不要启动消费者, 直接启动生产者, 发送一条消息

可以看到有了一条消息, 等待10秒

消息没有了, 消息已被删除

标签:String,17,ttl,RabbitMQ,TTL,import,RabbitMQHelper,channel
From: https://www.cnblogs.com/flower-dance/p/16754822.html

相关文章

  • 18-RabbitMQ高级特性-死信队列
    死信队列死信队列:DLX,Dead-Letter-Exchange利用DLX,当消息在一个队列中变成死信(deadmessage)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLXDL......
  • 07-RabbitMQ核心API-Direct Exchange
    DirectExchange简介所有发送到directexchange的消息被转发到Routekey中指定的Queue注意:Direct模式可以使用RabbitMQ自带的Exchange(defaultexchange),所以不需......
  • 08-RabbitMQ核心API-Topic Exchange
    TopicExchange简介所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个T......
  • 09-RabbitMQ核心API-Fanout Exchange
    FanoutExchange简介不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的......
  • 10-RabbitMQ核心API-其他[Binding, Queue, Message, Virtual host]
    Binding绑定关系Exchange和Exchange,Queue之间的连接关系Binding中可以包含RouteKey或者参数Queue消息队列,实际存储消息数据Durability:是否持久化,Durable......
  • 11-RabbitMQ高级特性-消息如何保证100%的投递成功
    消息如何保证100%的投递成功什么是生产端的可靠性投递保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息进行补偿机制BAT/TMD......
  • 12-RabbitMQ高级特性-在海量订单产生的业务高峰期, 如何避免消息的重复消费问题
    幂等性概念详解幂等性是什么可以借鉴数据库的乐观锁机制比如执行一条更新库存的SQLupdatet_repssetcount=count-1,version=version+1whereversion......
  • 13-RabbitMQ高级特性-Confirm确认消息
    Confirm确认消息理解Confirm消息确认机制消息的确认,是指投递消息后,如果Broker收到消息,则会给我们生产者一个应答生产者进行接收应答用来确定这条消息是否正常的......
  • 04-基于CentOS7安装RabbitMQ3.10.7
    RabbitMQ安装与入门安装与启动我实在是找不到这么老的版本了,直接用最新版本的,按照道理来说,新版本是兼容老版本的官网地址https://www.rabbitmq.com/Erlang安......
  • 05-RabbitMQ控制台入门及其Java简单操作
    MQ控制台简单操作建立Exchange新建Exchange成功新建Queue新建Queue成功建立Exchange与Queue的关系建立关系成功路由键:就是指发送到Exchange的消息,通......