常见的延时任务方案
1、最轻量级(基于内存的线程池实现)一般用于短时间实时性较高,容许少量消息丢失
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Slf4j
@Component
public class MessageSender {
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void sendMessageWithDelay(String message, long delayInSeconds) {
scheduler.schedule(() -> {
try {
log.info("数据消息补偿,message={},delayTime={}",message,delayInSeconds);
//todo 做业务操作
}catch (Exception e){
log.error("数据消息补偿,brand={},orderNo={}",message,delayInSeconds,e);
}
}, delayInSeconds, TimeUnit.SECONDS);
}
}
2、轻量级(基于Redission的实现)推荐 一般用于实时性,消息可靠性,轻量级的综合选择
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.9.1</version>
</dependency>
消息发布器
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Slf4j
@RequiredArgsConstructor
@Component
public class RedissionDelayMessageSender {
private final RedissonClient redissonClient;
/**
* 发布延时消息
* @param param
* @param delayTime
*/
public void publishDelayMsg(JSONObject param,long delayTime) {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque("delay-channel-queue");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
int retryNum = Integer.parseInt(String.valueOf(param.getOrDefault("retryNum", "0"))) + 1;
log.info("当前回调重试次数={}", retryNum);
param.put("retryNum", String.valueOf(retryNum));
param.put("msg", param.getString("msg"));
delayedQueue.offer(param.toJSONString(), delayTime, TimeUnit.SECONDS);
log.info("延时消息发送成功,delayTime={},data={}", delayTime, param);
}
}
消息处理器
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: MsgHandler
* @Author: lmy
* @Description:
* @Version: 1.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissionDelayMessageHandler {
private final RedissonClient redissonClient;
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
@PostConstruct
public void startListener() {
threadPoolTaskExecutor.execute(this::handle);
}
public void handle(){
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque("delay-channel-queue");
while (true) {
try {
String data = blockingDeque.poll(2, TimeUnit.SECONDS);
log.info("监听到延迟消息:{}", data);
if (StringUtils.isNotBlank(data)) {
JSONObject jsonObject = JSONObject.parseObject(data);
int retryNum = Integer.parseInt(String.valueOf(jsonObject.getOrDefault("retryNum", "0")));
if (retryNum < 3){
jsonObject.put("retryNum",retryNum);
//todo 做业务
}else {
log.warn("重试已达最大次数3,退出重试,人工介入,param={}",data);
}
}
} catch (Exception e){
log.error("监听延迟发布消息失败", e);
}
}
}
}
3、消息可靠性高(基于MQ技术实现的延时消息)推荐 一般用于消息可靠性较高,不允许丢失
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Author:
*/
@Slf4j
@Component
public class RabbitMqDelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
@RabbitListener(queues = "queue-name")
public void listener(JSONObject entity, Channel channel, Message message) throws IOException {
String databaseName = entity.getString("database");
String tableName = entity.getString("table");
JSONArray data = entity.getJSONArray("data");
//todo 做业务
sendDelayMessage(data.toJSONString(), 10000L);
//消息手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 发送延迟消息
*
* @param msg 消息体
* @param delayTime 延迟时间
*/
public void sendDelayMessage(String msg, Long delayTime) {
rabbitTemplate.convertAndSend("delay-exchange", "delay-key", msg, message -> {
//默认接收毫秒
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
});
log.info("延迟消息发送成功,data={},delayTime={},",msg,delayTime );
}
}
4、消息可靠性较高(基于定时任务轮询实现的数据库持久化)
这个比较简单,自己实现吧