首页 > 其他分享 >RocketMQ消息客户端生产与消费的基本实现

RocketMQ消息客户端生产与消费的基本实现

时间:2024-01-18 14:12:33浏览次数:27  
标签:RocketMQ 消费 return rocketMQTemplate RocketMQTemplate 消息 rocketmq 客户端

支撑环境

JDK: java version "1.8.0_391"
应用框架: org.springframework.boot:2.7.17
RocketMQ客户端SDK: rocketmq-spring-boot-starter:2.2.3

生产者

消息提供者

添加依赖

implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'

添加配置

application.yml文件中添加Rocket相关配置

rocketmq:
  #NameServer服务的ip端口
  name-server: 127.0.0.1:9876
  #生产者分组
  producer:
    group-name: tradeTxProducer

向Srping容器注入相关Bean

向Spring容器注入生产者和RocketMQTemplate的Bean对象;RocketMQTemplate用于管理生产者,提供与RocketMQ服务端的连接方法,早期版本的rocketmq-spring-boot-starter不需要自己提供RocketMQTemplate的Bean。

@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;
    @Value("${rocketmq.producer.group-name}")
    private String producerGroupName;

    /**
     * 普通消息生产者
     * @return
     */
    @Bean
    public DefaultMQProducer mqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr(nameServer);
        return producer;
    }

    /**
     * RocketMQ普通消息连接组件
     * @param mqProducer
     * @return
     */
    @Bean
    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer) {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setProducer(mqProducer);
        return rocketMQTemplate;
    }
}

业务代码发送消息

在业务代码中添加RocketMQTemplate,通过RocketMQTemplate发送消息

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/msg/general")
    public R<String> sendGeneralMsg(){
        R<String> r = R.ok("普通消息推送成功");

        String reqNo = TransactionIdGenerator.generateReqTransactionId();

        // 创建 RocketMQ 的 短信Message 实例
        Message<String> smsMessage = MessageBuilder
                .withPayload("您购买的产品已预约成功,请及时打款完成交易。")
                .setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
                .build();

        //发送短信通知消息
        SendResult smsSendResult = rocketMQTemplate.syncSend("smsTopic:tradeBuyTag", smsMessage);
        if (smsSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
            logger.info("产品预约成功客户提醒消息已经发送");
            return r;
        }
        return R.failure("普通消息发送失败");
    }

消费者

RocketMQ中顺序消息需要使用同步线程的消费者消费消息,其它类型的消息的消费者不做特别要求

添加依赖

implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'

添加配置

application.yml文件中添加Rocket相关配置

rocketmq:
  #NameServer服务的ip端口
  name-server: 127.0.0.1:9876

添加消息消费者

使用@RocketMQMessageListener注解,在注解中指定订阅的topic和消费者分组;
实现RocketMQListener接口,指定RocketMQListener的泛型是MessageExt时,onMessage方法会接收到消息的所有信息,其它类型默认接收的是消息的body部分。

@Component
@RocketMQMessageListener(
    topic = "smsTopic",
    consumerGroup = "smsConsumerGroup"
)
public class SMSTopicListener implements RocketMQListener<MessageExt> {

    private static final Logger logger = LoggerFactory.getLogger(SMSTopicListener.class);

    @Override
    public void onMessage(MessageExt msgExt) {
        // 执行本地事务逻辑,返回事务状态
        try {
            String bodyStr = new String(msgExt.getBody());
            logger.info("购买交易抢占基金份额消息消费开始:{}", bodyStr);
            //TODO  消费过程
            logger.info("购买交易抢占基金份额消息消费结束");
        } catch (Exception e) {
            logger.error("交易消息消费失败!", e);
            throw e;
        }
    }
}

标签:RocketMQ,消费,return,rocketMQTemplate,RocketMQTemplate,消息,rocketmq,客户端
From: https://www.cnblogs.com/zly1015/p/17972188

相关文章

  • RabbitMQ学习四 java客户端连接RabbitMQ
    RabbitMQ的工作模式,可以参考官网:https://www.rabbitmq.com/getstarted.html一、简单方式以下两种都是生产者直接发消息给队列,不通过交换机。且发送给队列的消息只能被消费一次。比如一个队列被C1和C2消费,在队列中的消息只会被一个消费者消费。生产者代码逻辑图代码如下:p......
  • RocketMQ 中冷热分离的随机索引模块详解
    作者:苏昌盛消息系统中随机索引的特点RocketMQ广泛使用于各类业务场景中,在实际生产场景中,用户通常会选择消息ID或者特定的业务Key(例如学号,订单号)来查询和定位特定的一批消息,进而定位分布式系统中的复杂问题。传统方案下,消息索引的存储是基于数据库系统或者基于本地文件系统实现......
  • RocketMQ 中冷热分离的随机索引模块详解
    作者:苏昌盛消息系统中随机索引的特点RocketMQ广泛使用于各类业务场景中,在实际生产场景中,用户通常会选择消息ID或者特定的业务Key(例如学号,订单号)来查询和定位特定的一批消息,进而定位分布式系统中的复杂问题。传统方案下,消息索引的存储是基于数据库系统或者基于本地文件系统实......
  • 记录eletron客户端win7打包及安装使用问题
    win7nodeV14环境配置不能使用msi包安装nodeV14.x,需要下载zip包,手动解压安装文件。下载,nodeV14.15.3下载地址下载完成后解压,并配置环境变量系统变量新增 NODE_PATH 为 C:\nodepath-xx\node_modules系统变量新增 NODE_SKIP_PLATFORM_CHECK 为 1系统变量 path 追加 ;C:\node......
  • RocketMQ服务端部署
    系统环境Java:openjdkversion"1.8.0_392"操作系统:腾讯云Ubuntu20.04镜像RocketMQ:rocketmq-5.1.4下载RocketMQ官网下载入口,见“下载安装”部分GitHub仓库下载Ubuntu镜像下载:#DownloadreleasefromtheApachemirror$wgethttps://dist.apache.org/repos/dist/re......
  • 当“低价高质”成行业共识,零食品牌还能靠什么拿捏消费者?
    文|螳螂观察作者|图霖年关将至,一年一度的“年货内卷赛”已一触即发。尤其是,2024年是疫情过后的首个春节,热闹必不可少,大众走亲访友的年礼更必不可少。而在这个赛场里,具备购买力但又尤其厌倦千篇一律传统年货形式的年轻人,是决定品牌能否抢占年货节制高点的关键。今年的情况尚待定......
  • 可靠消息服务事务(RocketMQ的分布式事务解决方案)
    系统环境Java:openjdkversion“1.8.0_382”rocketmq-all-5.1.4整体机制使用rocketmq的事务消息,分两个阶段保证分布式事务的最终一致性;一阶段:消息生产者(分布式事务发起方)发送半消息(消费者不接收半消息),之后完成本地事务的执行,根据执行结果选择将半消息投递给消费者或撤回半......
  • MFC---多线程(qq群聊的服务端和客户端)
    服务端//多线程+socket编程的一个联合使用//用互斥体进行线程同步socket编程临界区全局变量#include<WinSock2.h>#include<iostream>#include<windows.h>#include<process.h>#pragmacomment(lib,"ws2_32.lib")#defineMAX_CLNT256#defineMAX_BUF_S......
  • zabbix server获取的数据与客户端服务器实际数据不一致
    收到zabbix监控报警,提示某台服务器内存不足了: 我登录该服务器,发现内存明显还是充足的: 我在zabbixserver服务器上执行命令,以获取客户端的数据:/usr/local/zabbix/bin/zabbix_get-s客户端IP-p10050-k"vm.memory.size[available]"得到的结果为:3061157888这明显不......
  • RocketMQ——快速入门
    RocketMQ架构设计消息队列实现了消息投放和消息消费间的解耦,实现了异步处理消息的功能。RocketMQ作为消息中间件,在其存储消息的结构上实现了消息均衡投放、消息容灾、高可用(Dledger主从切换)、自动故障转移特点。先引入以下几个概念:Broker:实际存储消息的节点,接收来自生产者......