首页 > 其他分享 >RocketMQ消息生产

RocketMQ消息生产

时间:2024-01-20 21:46:05浏览次数:26  
标签:String reqNo 发送 消息 生产 logger RocketMQ

本文只提供生产者和消费者部分的示例代码,其它配置部分见RocketMQ消息客户端生产与消费的基本实现

依赖框架

JDK: java version "1.8.0_391"
RocketMQ SDK: rocketmq-spring-boot-starter:2.2.3

消息分类

按消息类型

按照消息自身特性可分为:普通消息、顺序消息、事务消息、延时\定时消息。

普通消息

利用事件驱动的特性,为应用服务之间解耦,消息提供基本的数据传递。
使用示例见:RocketMQ消息客户端生产与消费的基本实现

顺序消息

消费者消费消息的顺序与生产者发送消息的顺序一致;使用这类消息需要遵循如下约束:

  1. 生产者发送所有顺序消息时提供唯一的hashKey,RocketMQ服务端根据hashKey散列匹配到一个固定的消息队列,保证所有顺序消息存储在同一个消费队列中;
  2. 生产者发送消息必须是单一生产者串行发送消,保证消息是顺序发送;
  3. 生产和消费消息时要有限重试,重试会阻碍后续消息的生产或消费;
  4. 消费者接收消息时需要指定消费模式为"ConsumeMode.ORDERLY",此模式为“顺序消费”,保证消费者用单一线程依次消费同一个队列的消息。
  5. 不同的消息组尽量分散在不同的队列中,保证消费负载均衡

生产者示例代码

// 创建 RocketMQ 的 短信Message 实例
        List<Message<String>> messageList = new ArrayList<>(1000);
        for (int i = 0; i < 1000; i++) {
            messageList.add(MessageBuilder
                    .withPayload("顺序消息" + i)
                    .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo + i)
                    .build());
        }
        // mq根据hashKey匹配固定的队列
        String hashKey = "CZ001";
        //发送短信通知消息
        SendResult smsSendResult = rocketMQTemplate.syncSendOrderly("smsOrderlyTopic:orderlyTag", messageList, hashKey);
        if (smsSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            logger.info("顺序消息已经发送");
            return r;
        }

消费者示例代码

@Component
@RocketMQMessageListener(
        topic = "smsOrderlyTopic",
        consumerGroup = "smsOrderlyConsumerGroup",
        consumeMode = ConsumeMode.ORDERLY
)
public class SMSOrderlyTopicListener implements RocketMQListener<MessageExt> {

    private static final Logger logger = LoggerFactory.getLogger(SMSTopicListener.class);

    @Override
    public void onMessage(MessageExt msgExt) {
        // 执行本地事务逻辑,返回事务状态
        try {
            String bodyStr = new String(msgExt.getBody());
            logger.info("消息消费开始:{}", bodyStr);
        } catch (Exception e) {
            logger.error("交易消息消费失败!", e);
            throw e;
        }
    }
}

事务消息

提供了分布式事务解决方案,见<<可靠消息服务事务>>

延时\定时消息

定时消息为RocketMQ 5.X版本新特性,最终展现效果与延时消息类似;延时消息指定了消息的发送时间与消费时间的时间差,定时消息指定了消息被消费的具体时间。
在RocketMQ 4.X版本中,将时间分为了18个等级,生产者通过指定时间等级来控制延时发送的时间,时间等级如图:
在RocketMQ 5.X版本中,可自定义延时时间和定时时间,不同版本时间范围不一致,文章编写当前的最新版本时间范围是:0-40天。
生产者代码示例:

//延时发送代码
String reqNo = TransactionIdGenerator.generateReqTransactionId();

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsMessage = MessageBuilder
                .withPayload("发送延时消息")
                .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
                .build();

        //发送短信通知消息
        SendResult smsSendResult = rocketMQTemplate.syncSendDelayTimeMills("smsTopic:delayTag", smsMessage, 20000L);
        if (smsSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            logger.info("发送延时消息已经发送");
            return r;
        }
//定时发送代码
String reqNo = TransactionIdGenerator.generateReqTransactionId();

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsMessage = MessageBuilder
                .withPayload("发送定时消息")
                .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
                .build();

        Date date = new Date();
        //发送短信通知消息
        SendResult smsSendResult = rocketMQTemplate.syncSendDeliverTimeMills("smsTopic:deliverTag", smsMessage, date.getTime() + 30000L);
        if (smsSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            logger.info("定时消息已经发送");
            return r;
        }

消费者示例代码与普通消费者一致。

按生产者发送类型

同步发送

生产者发送消息后同步等待消息发送的结果;示例代码见本文【消息分类】章节代码。

异步发送

生产者发送消息后,不阻塞线程,异步等待消息发送结果。
示例代码:

String reqNo = TransactionIdGenerator.generateReqTransactionId();

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsMessage = MessageBuilder
                .withPayload("发送普通消息。")
                .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
                .build();

        //发送短信通知消息
        rocketMQTemplate.asyncSend("smsTopic:asyncTag", smsMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                logger.info("异步消息已经发送");
            }

            @Override
            public void onException(Throwable e) {
                logger.error("异步消息发送失败");
            }
        });

异步发送方式不支持发送延时、定时消息;在发送顺序消息时需要控制消息的发送顺序。

单边发送

生产者发送消息后不需要接收消息的发送状态,示例代码:

String reqNo = TransactionIdGenerator.generateReqTransactionId();

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsMessage = MessageBuilder
                .withPayload("单边消息")
                .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
                .build();

        //发送短信通知消息
        //单边消息支持顺序发送
        rocketMQTemplate.sendOneWay("smsTopic:oneWayTag", smsMessage);

同步等待消息消费结果

生产者发送消息后阻塞线程,同步等待消费者完成消息消费,接收消费者消费完成后返回的结果。
生产者示例代码:

String reqNo = TransactionIdGenerator.generateReqTransactionId();
//同步等待
// 创建 RocketMQ 的 短信Message 实例
Message<String> smsMessage = MessageBuilder
        .withPayload("等待响应消息")
        .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
        .build();
//返回值为MessageExt会返回所有消息内容,其它类型只会返回body
//sendAndReceive支持发送顺序消息,消费者需要制定顺序接收
MessageExt messageExt = rocketMQTemplate.sendAndReceive("smsReplyTopic:replyTag", smsMessage, MessageExt.class, 5*1000L);
R<String> result = JSON.parseObject(new String(messageExt.getBody()), new TypeReference<R<String>>(){});
if (result.getCode().equals(R.SUCCESS)) {
    logger.info("成功接收到消费结果");
    return r;
}

这种发送方式下,消费者消费消息时必须返回消费结果,否者生产的线程会一直阻塞到等待超时时间。消费者示例代码:

@Component
@RocketMQMessageListener(
        topic = "smsReplyTopic",
        consumerGroup = "smsReplyConsumerGroup"
//        consumeMode = ConsumeMode.ORDERLY
)
public class SMSRocketMQReplyListener implements RocketMQReplyListener<MessageExt, R<String>> {

    private static final Logger logger = LoggerFactory.getLogger(SMSRocketMQReplyListener.class);

    @Override
    public R<String> onMessage(MessageExt message) {
        // 执行本地事务逻辑,返回事务状态
        try {
            String bodyStr = new String(message.getBody());
            logger.info("消息消费开始:{}", bodyStr);
            return R.ok("消息消费成功");
        } catch (Exception e) {
            logger.error("交易消息消费失败!", e);
            throw e;
        }
    }
}

异步等待消息消费结果

生产者发送消息后不阻塞线程,异步等待消费者完成消息消费,接收消费者消费完成后返回的结果。
生产者示例代码:

String reqNo = TransactionIdGenerator.generateReqTransactionId();

        //发送短信通知消息

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsSyMessage = MessageBuilder
                .withPayload("异步等待响应消息")
                .setHeader(RocketMQHeaders.KEYS, "sy_SMS_" + reqNo)
                .build();
        rocketMQTemplate.sendAndReceive("smsReplyTopic:replyTag", smsSyMessage, new RocketMQLocalRequestCallback<MessageExt>() {
            @Override
            public void onSuccess(MessageExt message) {
                try {
                    Thread.sleep(1000L*10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                logger.info("【异步等待响应消息】异步接收成功");
                logger.info("【异步等待响应消息】返回内容:{}", new String(message.getBody()));
            }

            @Override
            public void onException(Throwable e) {
                logger.info("【异步等待响应消息】异步接收失败");
            }
        }, 5*1000L);

消费者消费完成后需要返回消费结果,否则生产者接收失败。

标签:String,reqNo,发送,消息,生产,logger,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17976663

相关文章

  • 云计算-nacos入门以及生产配置举例
    生产上nacos配置使用简单举例,相关敏感信息已经去除nacos以ds的方式部署在华为云CCE的容器当中,后台微服务,sprintboot中写明nacos的依赖,dockerfile打包到镜像仓库,在CCE运行容器的时候,读取CCE中configmap获取配置项参数。好处是可以标准集中管理发布,适合变更发布运维详细请参考官方文......
  • 记录一下 ArrayBlockingQueue 消息堆积的问题
    前言由于之前这个系统的日志记录是被领导要求写表的,在不影响系统性能的前提下,日志的入库操作肯定是要改成异步进行的,当时利用ArrayBlockingQueue+线程+AOP简单的去实现了一下,但是初版代码测试下来发现了一个很严重的问题,就是日志丢失的问题,本文由此而来。初步构思代码实现逻辑实......
  • 在生产环境中使用uWSGI来运行Flask应用
    安装uwsgipipinstalluwsgi-ihttps://pypi.tuna.tsinghua.edu.cn/simple安装不上则使用以下命令:condainstall-cconda-forgeuwsgi当您成功安装uwsgi后,您可以通过以下步骤来测试uwsgi是否安装成功:创建一个Python脚本,例如app.py,其中包含以下内容:defapplication(env,start_res......
  • ABAP:C223批量创建生产版本
    采用BDC方式*&---------------------------------------------------------------------**&ReportZPPU011*&---------------------------------------------------------------------**&*&---------------------------------------------------------......
  • 5分钟教会你如何在生产环境debug代码
    前言有时出现的线上bug在测试环境死活都不能复现,靠review代码猜测bug出现的原因,然后盲改代码直接在线上测试明显不靠谱。这时我们就需要在生产环境中debug代码,快速找到bug的原因,然后将锅丢出去。生产环境的代码一般都是关闭sourcemap和经过混淆的,那么如何进行debug代码呢?我一......
  • 削峰填谷与应用间解耦:分布式消息中间件在分布式环境下并发流量控制的应用
    这是《百图解码支付系统设计与实现》专栏系列文章中的第(18)篇,也是流量控制系列的第(4)篇。点击上方关注,深入了解支付系统的方方面面。本篇重点讲清楚分布式消息中间件的特点,常见消息中间件的简单对比,在支付系统的应用场景,比如削峰填谷,系统应用间的解耦,事务消息等。内容偏入门介绍,已经......
  • 进程间通信(生产者消费者模型)
    【一】进程间通信介绍什么是进程间通信进程间通信(Inter-processCommunication,IPC)是指在不同进程之间传输数据或信号的机制。由于每个进程拥有自己独立的内存空间,所以不同进程之间无法直接访问对方的变量或数据结构。因此,操作系统提供了多种IPC机制来允许进程之间共享信息和协......
  • AI深度解析:实时分布式消息平台NSQ
    NSQ是一个由Go语言编写的高性能、可扩展且易于部署的实时消息处理平台,专为大规模系统设计。在今天的微服务架构及云计算环境中,NSQ提供了一种优雅而强大的方式来进行异步通信和解耦服务。此篇文章旨在从多个角度深入分析NSQ,助你了解其工作原理、特点以及应用场景。工作原理NSQ遵......
  • 生产调优思路
    上线前分析点分析系统压力点在哪里?压力点的每秒请求数?每个请求耗时?每个请求消耗的内存?整个系统的所有请求重复1-4。算出部署多少台机器?每个机器多少内存?压测之后分析点Eden区对象增长速率?YoungGC频率?一次YoungGC耗时?YoungGC过后多少对象存活?老年代的对象增长速率?F......
  • 魔搭+ 函数计算_ 一键部署,缩短大模型选型到生产的距离
    引言面对魔搭ModelScope社区提供的海量模型,用户希望快速进行选型并生产使用起来,但在此之前,却一定会面临算力管理难、模型部署难等一系列问题,那么能否实现快速把选定的模型部署在云端功能强大的GPU上,由云端负责服务,扩展,保护和监控模型服务,同时免于运维和管理云上算力等基础......