首页 > 其他分享 >springboot~kafka中延时消息的实现

springboot~kafka中延时消息的实现

时间:2023-08-22 18:11:12浏览次数:43  
标签:springboot 队列 kafka record 死信 延时 public String

应用场景

  • 用户下单5分钟后,给他发短信
  • 用户下单30分钟后,如果用户不付款就自动取消订单

kafka无死信队列

kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信队列实现的。

kafka有生产者拦截器

通过对生产者拦截器实现一个TTL的检查,然后再通过类似netty里的延时队列组件来实现消息的延时发送,发到咱们的死信队列里

  • ProducerInterceptorTTL源码
public class ProducerInterceptorTTL implements ProducerInterceptor<Integer, String>, ApplicationContextAware {

	// 消息延时,单位秒
	public static String TTL = "ttl";

	// 死信队列,延时后发送到的队列,我们称为死信队列
	public static String DEAD_TOPIC = "dead_topic";

	// 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取bean
	private static ApplicationContext applicationContext;

	// 时间轮,用于延时发送消息
	private static LindTimeWheel timeWheel = new LindTimeWheel(1000, 8);

	@Override
	public ProducerRecord onSend(ProducerRecord<Integer, String> record) {
		final String topic = record.topic();
		final Integer partition = record.partition();
		final Integer key = record.key();
		final String value = record.value();
		final Long timestamp = record.timestamp();
		final Headers headers = record.headers();
		long ttl = -1;
		String deadTopic = null;
		for (Header header : headers) {
			if (header.key().equals(TTL)) {
				ttl = toLong(header.value());
			}
			if (header.key().equals(DEAD_TOPIC)) {
				deadTopic = new String(header.value());
			}
		}
		// 消息超时判定
		if (deadTopic != null && ttl > 0) {
			// 可以放在死信队列中
			String finalDeadTopic = deadTopic;
			long finalTtl = ttl * 1000;
			timeWheel.addTask(() -> {
				System.out.println("消息超时了," + finalTtl + "需要发到topic:" + record.key());
				KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
				kafkaTemplate.send(finalDeadTopic, record.value());

			}, finalTtl);
		}
		// 拦截器拦下来之后改变原来的消息内容
		ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp,
				key, value, headers);
		// 传递新的消息
		return newRecord;

	}

	@Override
	public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

	}

	@Override
	public void close() {

	}

	@Override
	public void configure(Map<String, ?> map) {

	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

}
  • 注册拦截器
spring:
  kafka:
    producer:
      properties:
        interceptor.classes: com.ruoyi.lawyer.delay.ProducerInterceptorTTL
  • 延时消息在某个时间段之后会送出

标签:springboot,队列,kafka,record,死信,延时,public,String
From: https://www.cnblogs.com/lori/p/17649344.html

相关文章

  • SpringBoot集成Swagger报错
    pom.xml<!--swaggerui--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io......
  • springboot 单例并发问题
    Controller默认是单例的,不要使用非静态的成员变量,否则会发生数据逻辑混乱。正因为单例所以不是线程安全的。@RestController@RequestMapping(value="/concurrency")publicclasscontroller{privateStringname;@GetMapping("/test1")publicStringtest......
  • kafka相关命令
    删除topic:/bin/kafka-topics--delete--topictest--bootstrap-serverlocalhost:9092 新增topic:/bin/kafka-topics--create--topictest--bootstrap-serverlocalhost:9092查询组:/bin/kafka-consumer-groups.sh--bootstrap-server127.0.0.1:9092--describe--group......
  • 中小学教育综合管理平台源码,vue2+Java+springboot框架开发
    智慧校园电子班牌软件是出于校园考勤管理以及班级校园信息展示为目的的显示系统软件,电子班牌系统主要用于中小学教育的综合管理平台,融合了多媒体技术、语音技术、人脸识别、信息发布、后台管理等多种技术。智慧班牌通过以云平台为基础,结合互联网、物联网系统进行校园管理,实现学校数......
  • SpringBoot实现统一异常处理
    大家在使用SpringBoot开发项目的时候肯定都需要处理异常吧,没有处理异常那么异常信息直接显示给用户这是非常不雅观的,同时还可能造成用户误会,那么今天我们就来简单的写一下如何在SpringBoot项目中实现统一的异常处理。1.自定义异常类我们先定义一个自定义业务异常类,这个异常类继......
  • 基于springboot课程答疑系统
    随着信息互联网信息的飞速发展,无纸化作业变成了一种趋势,针对这个问题开发一个专门适应师生交流形式的网站。本文介绍了课程答疑系统的开发全过程。通过分析企业对于课程答疑系统的需求,创建了一个计算机管理课程答疑系统的方案。文章介绍了课程答疑系统的系统分析部分,包括可行性分析......
  • 基于springboot师生共评的作业管理系统设计与实现
    随着信息互联网信息的飞速发展,无纸化作业变成了一种趋势,针对这个问题开发一个专门适应师生作业交流形式的网站。本文介绍了师生共评的作业管理系统的开发全过程。通过分析企业对于师生共评的作业管理系统的需求,创建了一个计算机管理师生共评的作业管理系统的方案。文章介绍了师生共......
  • SpringBoot内嵌Tomcat连接池分析
    目录1Tomcat连接池1.1简介1.2架构图1.2.1JDK线程池架构图1.2.2Tomcat线程架构1.3核心参数1.3.1AcceptCount1.3.2MaxConnections1.3.3MinSpareThread/MaxThread1.3.4MaxKeepAliveRequests1.3.5ConnectionTimeout1.3.6KeepAliveTimeout1.4核心内部线程1.4.1Acceptor1.......
  • SpringBoot 测试实践 - 2:单元测试与集成测试
    单元测试vs.集成测试只编写单测,无法测试方法之间的集成情况,而且某些需求可能会修改多个方法,这可能会影响方法对应的单测,涉及到大量的相关单测的修改,这样的维护成本很高可以把重心放在完善集成测试上,专注从外部判断程序是否符合预期。对于一些非常重要的方法,增加单元测试可以减......
  • Kafka入门到精通技术文章
    Kafka入门到精通技术文章以下是一些从入门到精通Kafka的技术文章推荐:1.Kafka入门教程-这是一篇适合初学者的Kafka入门教程,介绍了Kafka的基本概念和架构,以及如何使用Kafka进行消息传输和处理。2.Kafka架构详解-这篇文章深入介绍了Kafka的架构,包括Kafka的主题(topics)、分区(p......