用来存放需要在指定时间被处理的元素队列,队列中的元素希望在指定时间被取出和处理
使用场景:
- 订单在十分钟内未支付自动取消
- 新创建的店铺,如果在十天之内没有上传过商品,则自定发送消息提醒
- 用户注册成功后,如果三天内没有登录则进行短信提醒
- 用户发起退款后,如果三天内没有得到处理则通知相关运营人员
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参与会议
1) 整合Springboot
新建模块 springboot-rabbitmq
引入依赖
repositories {
maven { url = uri("https://maven.aliyun.com/repository/public/") }
mavenLocal()
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation("org.springframework.boot:spring-boot-starter-test")
// fastjson
implementation("com.alibaba:fastjson:1.2.80")
// spring-boot-starter-amqp
implementation("org.springframework.boot:spring-boot-starter-amqp:2.6.6")
// spring-rabbit-test
testImplementation("org.springframework.amqp:spring-rabbit-test:2.4.3")
}
配置 application.properties
spring.rabbitmq.host=166.166.166.92
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
2) 队列TTL
创建两个队列QA、QB ,两者对立TTL分别为10s、40s, 然后再创建一个交换机X和死信交换机,类型为direct,创建死信队列QD : 配置、生产消息、消费消息
配置文件类代码
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
/**
* TTL队列 配置文件类代码
*/
@Configuration
class TtlQueueConfig {
companion object {
const val X_EXCHANGE = "X" // 普通交换机名称
const val Y_DEAD_LETTER_CHANGE = "Y" // 死信交换机名称
const val QUEUE_A = "QA" // 普通队列名称
const val QUEUE_B = "QB" // 普通队列名称
const val DEAD_LETTER_QUEUE = "QD" // 死信队列名称
}
// 声明X交换机
@Bean("xExchange")
fun xExchange() = DirectExchange(X_EXCHANGE)
// 声明死信交换机
@Bean("yExchange")
fun yExchange() = DirectExchange(Y_DEAD_LETTER_CHANGE)
// 声明队列 QA -- TTL 10s
@Bean("queueA")
fun queueA(): Queue {
val arguments = mapOf(
// 设置死信交换机
"x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
// 设置死信 RoutingKey
"x-dead-letter-routing-key" to "YD",
// 过期时间
"x-message-ttl" to 10 * 1000
)
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build()
}
// 声明队列 QB -- TTL 40s
@Bean("queueB")
fun queueB(): Queue {
val arguments = mapOf(
// 设置死信交换机
"x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
// 设置死信 RoutingKey
"x-dead-letter-routing-key" to "YD",
// 过期时间
"x-message-ttl" to 40 * 1000
)
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build()
}
// 声明死信队列 QD
@Bean("queueD")
fun queueD(): Queue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build()
// 绑定
@Bean
fun queueABindingX(@Qualifier("queueA") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
BindingBuilder.bind(queue).to(exchange).with("XA")
// 绑定
@Bean
fun queueBBindingX(@Qualifier("queueB") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
BindingBuilder.bind(queue).to(exchange).with("XB")
// 绑定
@Bean
fun queueDBindingY(@Qualifier("queueD") queue: Queue, @Qualifier("yExchange") exchange: DirectExchange): Binding =
BindingBuilder.bind(queue).to(exchange).with("YD")
}
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import java.util.Date
@RestController
@RequestMapping("ttl")
class SendMsgController {
@Autowired
private lateinit var rabbitTemplate: RabbitTemplate
// 开始发消息
@GetMapping("/sendMsg/{message}")
fun sendMsg(@PathVariable message: String){
println("当前时间:${Date()}, 发送一条消息给两个TTL队列:$message")
rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s的队列:$message")
rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s的队列:$message")
}
}
消费者
import com.rabbitmq.client.Channel
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.util.Date
@Component
class DeadLetterQueueConsumer {
// 接收消息
@RabbitListener(queues = ["QD"])
fun receiveD(message: Message, channel: Channel){
val msg = String(message.body)
println("当前时间:${Date()},收到死信队列的消息:$msg")
}
}
3)延时队列优化
在生产者发消息时指定延时时间 TTL
注意:在消息属性上设置TTL的方式,消息肯能并不会按时“死亡”,第一条消息延时很长,第二条很短,第二条不会优先执行,只已第一个消息为准(需要使用RabbitMQ插件)
创建QC队列,不设置过期时间
配置文件类:
const val QUEUE_C = "QC"
// 声明队列 QC
@Bean("queueC")
fun queueC(): Queue {
val arguments = mapOf(
// 设置死信交换机
"x-dead-letter-exchange" to Y_DEAD_LETTER_CHANGE,
// 设置死信 RoutingKey
"x-dead-letter-routing-key" to "YD",
)
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build()
}
// 绑定
@Bean
fun queueCBindingY(@Qualifier("queueC") queue: Queue, @Qualifier("xExchange") exchange: DirectExchange): Binding =
BindingBuilder.bind(queue).to(exchange).with("XC")
生产者:
// 开始发消息 消息 + TTL
@GetMapping("sendExpirationMsg/{message}/{ttl}")
fun sendExpirationMsg(@PathVariable message: String, @PathVariable ttl: String) {
println("当前时间:${Date()}, 发送一条时长${ttl}毫秒TTL消息给队列QC:$message")
rabbitTemplate.convertAndSend(
"X", "XC", message
) {
it.messageProperties.expiration = ttl
return@convertAndSend it
}
}
4)RabbitMQ插件实现延迟队列 -- 交换机延迟
rabbitmq_deleyed_message_exchange 插件
下载:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
安装:复制到rabbitmq 插件文件夹
$: sudo cp exchange.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins/
$: sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
$: sudo systemctl restart rabbitmq-server
配置类
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.CustomExchange
import org.springframework.amqp.core.Queue
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class DelayedQueueConfig {
companion object{
const val DELAYED_EXCHANGE_NAME = "delayed.exchange"// 交换机
const val DELAYED_QUEUE_NAME = "delayed.queue" // 队列
const val DELAYED_ROUTING_KEY = "delayed.routingkey"// RoutingKey
}
// 声明交换机
@Bean
fun delayedExchange(): CustomExchange{
val arguments = mapOf( "x-delayed-type" to "direct" )
/**
* 1. 交换机名称
* 2. 交换机类型
* 3. 是否需要持久化
* 4. 是否需要自动删除
* 5. 其他参数
*/
return CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments)
}
// 声明队列
@Bean
fun delayedQueue(): Queue {
return Queue(DELAYED_QUEUE_NAME)
}
// 绑定
@Bean
fun delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") queue: Queue,
@Qualifier("delayedExchange") exchange: CustomExchange
): Binding{
return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs()
}
}
生产者
// 开始发消息 基于插件的 消息及延迟时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
fun sendMsg(@PathVariable message: String, @PathVariable delayTime: Int) {
println("当前时间:${Date()}, 发送一条时长${delayTime}毫秒消息给延迟队列DelayedQueue:$message")
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message) {
it.messageProperties.delay = delayTime
return@convertAndSend it
}
}
消费者
@Component
class DelayQueueConsumer {
// 监听消息
@RabbitListener(queues = [DelayedQueueConfig.DELAYED_QUEUE_NAME])
fun receiveDelayQueue(message: Message){
val msg = String(message.body)
println("当前时间:${Date()},收到延迟队列的消息:$msg")
}
}