首页 > 其他分享 >RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战

时间:2024-01-04 12:35:40浏览次数:31  
标签:String producer SpringCloud Alibaba 消息 延时 new public RocketMQ

欢迎关注公众号:【11来了】 发送 “资料” 可领取深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景、中间件系列笔记和编程高频电子书!

作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!

消息追踪

设置消息追踪需要修改 broker 启动的配置文件,添加一行配置:traceTopicEnable=true 即可,操作如下:

# 进入到 rocketmq 的安装目录中
# 先复制一份配置文件
cp broker.conf custom.conf
# 在自定义配置文件中添加一行配置
vi custom.conf
## 添加配置
traceTopicEnable=true
# 杀死原来的 broker 进程,再重新启动即可
# 先查看原来 broker 进程 id
jps 
# 杀死 broker
kill -9 [进程id]
# 重新启动 broker,并指定配置文件
nohup sh bin/mqbroker -c conf/custom.conf ‐n localhost:9876 & autoCreateTopicEnable=true


在发送消息的时候,指定消息的 keys 就可以在 DashBoard 中观看到消息的追踪记录了


public class GlobalProducer {

    public static void main(String[] args) throws Exception {
        // true 即设置允许消息追踪
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 12; i++) {
            Message msg = new Message(
                    "Global-Orderly-Topic",
                    "Global_Orderly_Tag",
                    ("( " + i + " )message from GlobalProducer").getBytes());
            // 设置消息的 keys
            msg.setKeys("Global_Orderly_Tag");
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }
}

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_System


之后就可以在 DashBoard 中查看消息的追踪记录了:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_System_02

点击进去,查看消息追踪详细信息如下:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_System_03




延时消息实战

上边的案例使用了 SpringCloudStream 的 API 进行消息的收发,这里使用原生 API 进行消息收发实战,通过设置消息的延时时间,可以让消息等待指定时间之后再发送

5.x 之前,只能设置固定时间的延时消息

5.x 之后,可以自定义任意时间的延时消息

由于这里引入的 SpringCloudAlibaba 整合的 RocketMQ 是 4.9.4 版本的,因此只能设置固定时间的延时消息


延时时间有以下几种,通过 Leven 进行定位,如果 delayTimeLevel = 2,就是第二个延时时间 5s

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消费者代码如下:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("custom-delay-topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
    }
}


生产者代码如下:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3、把我们的生产者直接启动起来
        producer.start();

        // 4、创建消息、并发送消息
        for (int i = 0; i < 3; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-delay-topic",
                    "delayTag",
                    "CUSTOM_DELAY",
                    ("("+i+")Hello Message From Delay Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            // 设置定时的逻辑
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            message.setDelayTimeLevel(2);
            // 利用生产者对象,将消息直接发送出去
            producer.send(message);
        }
        System.out.println("Send Finished.");
    }
}



标签:String,producer,SpringCloud,Alibaba,消息,延时,new,public,RocketMQ
From: https://blog.51cto.com/u_16186397/9098774

相关文章

  • 使用springcloud 实现 蓝绿发布、灰度发布(金丝雀发布)
    介绍工作中经常要涉及到功能发布,这个时候也经常是业务系统最有可能遇到问题的时候,需要要尽量减少发布引起的风险。比如在系统负载比较小的时候使用。还有蓝绿发布、灰度发布等等,今天介绍一下这几种常见的发布,并使用springcloud实现。1.传统发布方式一个系统最初的时候,使用量小,用户......
  • SpringBoot2 整合 SpringCloud Feign 实例
    文章目录1.简介2.工程实例2.1注册中心springcloud-study-eureka-server2.1.1依赖pom.xml2.1.2配置文件application.properties2.1.3启动类EurekaServerApplication.java2.2服务提供者springcloud-study-hello-service2.2.1依赖pom.xml2.2.2配置文件application.ym......
  • SpringBoot2 整合 SpringCloud 的 Hystrix断路器 实例
    文章目录1.概述2.短路器3.SpringFeign使用Hystrix断路器3.1工程实例3.2修改Fegin模块3.3测试运行参考文献1.概述微服务架构中服务之间互相调用,单个服务通常会集群部署,由于网络等原因,服务不能保证100%可用,如果单个服务出现问题会出现请求的堆积,Servlet容器的线程资源......
  • RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战
    欢迎关注公众号:【11来了】发送“资料”可以下载Redis、JVM系列文章PDF版本!作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!SpringCloudAlibaba集成RocketMQ最佳实践SpringBoot相对于SSM来说已经很大程度上简化了开发,但是使用SpringBo......
  • springcloud动力节点-01Eureka
    SpringCloudEureka1.SpringCloudEureka简介注册发现中心Eureka来源于古希腊词汇,意为“发现了”。在软件领域,Eureka是Netflix在线影片公司开源的一个服务注册与发现的组件,和其他Netflix公司的服务组件(例如负载均衡、熔断器、网关等)一起,被SpringCloud社区整合......
  • springcloud动力节点-05Sleuth
    SpringCloudSleuth1.什么是链路追踪官网:https://spring.io/projects/spring-cloud-sleuth链路追踪就是:追踪微服务的调用路径2.链路追踪的由来在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个请求都会开成一......
  • springcloud动力节点-04Hystrix
    SpringCloudHystrix1.前言1.1什么是服务雪崩   服务雪崩的本质:线程没有及时回收。不管是调用成功还是失败,只要线程可以及时回收,就可以解决服务雪崩1.2服务雪崩怎么解决1.2.1修改调用的超时时长(不推荐)将服务间的调用超时时长改小,这样就可以让线程及时回收,保证服......
  • springcloud动力节点-03OpenFeign
    SpringCloudOpenFeign 1.说在前面上一节我们讲到Ribbon做了负载均衡,用Eureka-Client来做服务发现,通过RestTemplate来完成服务调用,但是这都不是我们的终极方案,终极方案是使用OpenFeign2.OpenFeign简介https://docs.spring.io/spring-cloud-open......
  • Spring Cloud动力节点-07Alibaba简介、注册、配置中心
    1.项目简介SpringCloudAlibaba致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用微服务的必需组件,方便开发者通过SpringCloud编程模型轻松使用这些组件来开发分布式应用服务。依托SpringCloudAlibaba,您只需要添加一些注解和少量配置,就可以将SpringClo......
  • springcloud动力节点-06Admin监控 Or Gateway网关
    SpringCloudAdmin 监控端点新建工程:admin-serverpom中springcloud版本号和版本控制要添加<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan......