首页 > 其他分享 >RabbitMQ -- 延迟队列(死信队列中的消息TTL过期)

RabbitMQ -- 延迟队列(死信队列中的消息TTL过期)

时间:2023-11-22 18:03:13浏览次数:38  
标签:exchange 队列 springframework -- 死信 org import message

用来存放需要在指定时间被处理的元素队列,队列中的元素希望在指定时间被取出和处理

使用场景:

  1. 订单在十分钟内未支付自动取消
  2. 新创建的店铺,如果在十天之内没有上传过商品,则自定发送消息提醒
  3. 用户注册成功后,如果三天内没有登录则进行短信提醒
  4. 用户发起退款后,如果三天内没有得到处理则通知相关运营人员
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参与会议


1) 整合Springboot

新建模块 springboot-rabbitmq

引入依赖

repositories {
    maven { url = uri("https://maven.aliyun.com/repository/public/") }
    mavenLocal()
    mavenCentral()
    maven { url = uri("https://repo.spring.io/milestone") }
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    
    // fastjson
    implementation("com.alibaba:fastjson:1.2.80")
    // spring-boot-starter-amqp
    implementation("org.springframework.boot:spring-boot-starter-amqp:2.6.6")
    // spring-rabbit-test
    testImplementation("org.springframework.amqp:spring-rabbit-test:2.4.3")
}

配置 application.properties

spring.rabbitmq.host=166.166.166.92
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
2) 队列TTL

创建两个队列QA、QB ,两者对立TTL分别为10s、40s, 然后再创建一个交换机X和死信交换机,类型为direct,创建死信队列QD :  配置、生产消息、消费消息

配置文件类代码

import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

/**
 * TTL队列 配置文件类代码
 */
@Configuration
class TtlQueueConfig {

    companion object {
        const val X_EXCHANGE = "X" // 普通交换机名称
        const val Y_DEAD_LETTER_CHANGE = "Y" // 死信交换机名称
        const val QUEUE_A = "QA" // 普通队列名称
        const val QUEUE_B = "QB" // 普通队列名称
        const val DEAD_LETTER_QUEUE = "QD" // 死信队列名称
    }

    // 声明X交换机
    @Bean("xExchange")
    fun xExchange() = DirectExchange(X_EXCHANGE)

    // 声明死信交换机
    @Bean("yExchange")
    fun yExchange() = DirectExchange(Y_DEAD_LETTER_CHANGE)

    // 声明队列 QA -- TTL 10s
    @Bean("queueA")
    fun queueA(): Queue {
        val arguments = mapOf(
            // 设置死信交换机
            "x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
            // 设置死信 RoutingKey
            "x-dead-letter-routing-key" to "YD",
            // 过期时间
            "x-message-ttl" to 10 * 1000
        )
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build()
    }

    // 声明队列 QB -- TTL 40s
    @Bean("queueB")
    fun queueB(): Queue {
        val arguments = mapOf(
            // 设置死信交换机
            "x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
            // 设置死信 RoutingKey
            "x-dead-letter-routing-key" to "YD",
            // 过期时间
            "x-message-ttl" to 40 * 1000
        )
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build()
    }

    // 声明死信队列 QD
    @Bean("queueD")
    fun queueD(): Queue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build()

    // 绑定
    @Bean
    fun queueABindingX(@Qualifier("queueA") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
        BindingBuilder.bind(queue).to(exchange).with("XA")
    // 绑定
    @Bean
    fun queueBBindingX(@Qualifier("queueB") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
        BindingBuilder.bind(queue).to(exchange).with("XB")

    // 绑定
    @Bean
    fun queueDBindingY(@Qualifier("queueD") queue: Queue, @Qualifier("yExchange") exchange: DirectExchange): Binding =
        BindingBuilder.bind(queue).to(exchange).with("YD")
}

生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import java.util.Date

@RestController
@RequestMapping("ttl")
class SendMsgController {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    // 开始发消息
    @GetMapping("/sendMsg/{message}")
    fun sendMsg(@PathVariable message: String){
        println("当前时间:${Date()}, 发送一条消息给两个TTL队列:$message")
        
        rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s的队列:$message")
        rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s的队列:$message")
    }
}

消费者

import com.rabbitmq.client.Channel
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.util.Date


@Component
class DeadLetterQueueConsumer {
    // 接收消息
    @RabbitListener(queues = ["QD"])
    fun receiveD(message: Message, channel: Channel){
        val msg = String(message.body)
        println("当前时间:${Date()},收到死信队列的消息:$msg")
    }
}
3)延时队列优化

在生产者发消息时指定延时时间 TTL

注意:在消息属性上设置TTL的方式,消息肯能并不会按时“死亡”,第一条消息延时很长,第二条很短,第二条不会优先执行,只已第一个消息为准(需要使用RabbitMQ插件)

创建QC队列,不设置过期时间

配置文件类:

const val QUEUE_C = "QC"
// 声明队列 QC
@Bean("queueC")
fun queueC(): Queue {
    val arguments = mapOf(
        // 设置死信交换机
        "x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
        // 设置死信 RoutingKey
        "x-dead-letter-routing-key" to "YD",
    )
    return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build()
}
// 绑定
@Bean
fun queueCBindingY(@Qualifier("queueC") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
    BindingBuilder.bind(queue).to(exchange).with("XC")

生产者:

// 开始发消息 消息 + TTL
@GetMapping("sendExpirationMsg/{message}/{ttl}")
fun sendExpirationMsg(@PathVariable message: String, @PathVariable ttl: String) {
    println("当前时间:${Date()}, 发送一条时长${ttl}毫秒TTL消息给队列QC:$message")
    rabbitTemplate.convertAndSend(
        "X", "XC", message
    ) {
        it.messageProperties.expiration = ttl
        return@convertAndSend it
    }
}
4)RabbitMQ插件实现延迟队列 -- 交换机延迟

rabbitmq_deleyed_message_exchange 插件

下载:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

安装:复制到rabbitmq 插件文件夹

$: sudo cp exchange.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins/
$: sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
$: sudo systemctl restart rabbitmq-server

配置类

import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.CustomExchange
import org.springframework.amqp.core.Queue
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class DelayedQueueConfig {
    companion object{
        const val DELAYED_EXCHANGE_NAME = "delayed.exchange"// 交换机
        const val DELAYED_QUEUE_NAME = "delayed.queue"      // 队列
        const val DELAYED_ROUTING_KEY = "delayed.routingkey"// RoutingKey
    }

    // 声明交换机
    @Bean
    fun delayedExchange(): CustomExchange{
        val arguments = mapOf( "x-delayed-type" to "direct" )
        /**
         * 1. 交换机名称
         * 2. 交换机类型
         * 3. 是否需要持久化
         * 4. 是否需要自动删除
         * 5. 其他参数
         */
        return CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments)
    }

    // 声明队列
    @Bean
    fun delayedQueue(): Queue {
        return Queue(DELAYED_QUEUE_NAME)
    }

    // 绑定
    @Bean
    fun delayedQueueBindingDelayedExchange(
        @Qualifier("delayedQueue") queue: Queue,
        @Qualifier("delayedExchange") exchange: CustomExchange
    ): Binding{
        return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs()
    }
}

生产者

// 开始发消息 基于插件的 消息及延迟时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
fun sendMsg(@PathVariable message: String, @PathVariable delayTime: Int) {
    println("当前时间:${Date()}, 发送一条时长${delayTime}毫秒消息给延迟队列DelayedQueue:$message")
    rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message) {
        it.messageProperties.delay = delayTime
        return@convertAndSend it
    }
}

消费者

@Component
class DelayQueueConsumer {
    // 监听消息
    @RabbitListener(queues = [DelayedQueueConfig.DELAYED_QUEUE_NAME])
    fun receiveDelayQueue(message: Message){
        val msg = String(message.body)
        println("当前时间:${Date()},收到延迟队列的消息:$msg")
    }
}


标签:exchange,队列,springframework,--,死信,org,import,message
From: https://blog.51cto.com/soldatyxd/8519113

相关文章

  • 智能监控如何最大化保障生产工人权益,助力电焊车间智能化?
    电焊车间加装监控可以加强对电焊车间的生产过程监控,保障员工的生产工作安全,提高工作效率,降低生产成本。但是传统的监控只能单一的去“看”,并不能最大化发挥视频监控的作用,而智能视频监控就不一样。它可以有效提高安全监控效率,最大化地保障安全生产,那么,具体方案包括哪些内容呢?1、视......
  • Android 的PAI 简介
    PAI简介在Google的Android操作系统中,PAI(PreinstalledAppsInfrastructure)预安装程序基础设施是指在设备出厂时预先安装在系统中的一组应用程序。这些应用程序通常是由设备制造商或运营商选择的,并且它们在设备启动时就已经存在,用户可以在使用设备时直接访问这些应用。预安装介绍......
  • Spring Cloud +UniApp +MySql框架开发的智慧工地云平台源码
    智慧工地是指通过信息化技术、物联网、人工智能技术等手段,对建筑工地进行数字化、智能化、网络化升级,实现对施工全过程的实时监控、数据分析、智能管理和优化调控。智慧工地的建设可以提高工地的安全性、效率性和质量,降低施工成本,是建筑行业数字化转型升级的重要抓手。主要围绕“人......
  • 朋友圈如何设置定时发送?
    人日常中不可缺少的一件事,同时也是企业用来触达客户的重要渠道,下面一起来了解下微信朋友圈怎么定时发送呢?① 一键多号发布朋友圈② 重发、转发、二次编辑③ 一页知“圈”一个页面查看所有好友的朋友圈,点赞、评论样样不落,提高好友互动性,拉进和客户之间的距离,还能实现一键转发功能......
  • 探秘互联网医院系统的技术内幕:代码解析与创新
    随着科技的飞速发展,互联网医院系统正日益改变着传统医疗服务的面貌。这些系统的背后,隐藏着精密而创新的技术。本文将深入研究互联网医院系统的技术内幕,透过代码解析,揭示这些系统如何实现医疗服务数字化的伟大使命。1.实时通信模块:WebSocket的魔力互联网医院系统中,实时通信是在线问......
  • 无人机遥控器方案定制_MTK平台无人设备手持遥控终端PCB板开发
    随着科技的不断发展和无人机技术的逐步成熟,无人机越来越受到人们的关注。作为一种高新技术,无人机的应用范围不断拓展,包括农业、环境监测、城市规划、运输物流等领域。同时,无人机的飞行控制技术也得到了不断的优化和提升。早期,无人机的飞行控制大多以机械方式为主,控制方式相对较为简......
  • 软件系统测试有哪些类型和方法?
    在软件开发过程中,系统测试是确保软件质量和稳定性的重要环节。不同类型的软件系统测试覆盖了不同的测试需求,而不同的测试方法则能够有效地提高测试效果。一、常见的软件系统测试类型:1、功能测试:验证软件是否按照需求规格说明书中定义的功能完成。2、性能测试:评估和......
  • 作为一个Android初级开发工程师,该如何进阶?
    前言现今Android行业初级人才已逐渐饱和化,但中高级人才却依旧很稀缺,身边HR朋友经常遇到的情况是:100份简历里只有2、3个比较合适的候选人,大部分的人都是不合格的!有97%的Android技术人都会面临这些困境(或许也是你的困惑):缺乏技术广度和深度:如果你长期在小型软件公司或外包工......
  • Java下跌,Kotlin闯进前15,后生可畏
    近年来,Android开发由Java转Kotlin似乎成为了一种潮流。谷歌甚至曾公开表示:“Android的开发将越来越以Kotlin为先。”当前,作为移动开发中Java的劲敌,Kotlin在Tiobe流行指数中表现强劲。根据TIOBE11月发布的编程语言排行榜,Kotlin以1.15%的占比位列第15,较之10月上升3位。而在今......
  • 添加索引 yii获取sql
    //添加索引sqlALTERTABLE`work_map`ADDINDEXidx_wmp_region_id(`wmp_region_id`)仓库工作单方案准备列表,展示角色所配置城市的工作单信息短信消息模版调整gitremoteupdateorigin--pruneyii2获取当前sql$query->createCommand()->getRawSql();......