首页 > 其他分享 >SpringBoot29 - RocketMQ

SpringBoot29 - RocketMQ

时间:2023-02-24 20:25:33浏览次数:44  
标签:SpringBoot29 broker public RocketMQ 服务器 id rocketmq

SpringBoot整合RocketMQ

​ RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。

安装

​ windows版安装包下载地址:https://rocketmq.apache.org/

​ 下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件

​ RocketMQ安装后需要配置环境变量,具体如下:

  • ROCKETMQ_HOME
  • PATH
  • NAMESRV_ADDR (建议): 127.0.0.1:9876

​ 关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。

RocketMQ工作模式

​ 在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信。broker启动后会通知命名服务器自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须保障命名服务器先启动。

启动服务器

mqnamesrv		# 启动命名服务器
mqbroker		# 启动broker

​ 运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。

​ 运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。

测试服务器启动状态

​ RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。

tools org.apache.rocketmq.example.quickstart.Producer		# 生产消息
tools org.apache.rocketmq.example.quickstart.Consumer		# 消费消息
整合(异步消息)

步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

步骤②:配置RocketMQ的服务器地址

rocketmq:
  name-server: localhost:9876
  producer:
    group: group_rocketmq

​ 设置默认的生产者消费者所属组group。

步骤③:使用RocketMQTemplate操作RocketMQ

@Service
public class MessageServiceRocketmqImpl implements MessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }
            @Override
            public void onException(Throwable e) {
                System.out.println("消息发送失败!!!!!");
            }
        };
        rocketMQTemplate.asyncSend("order_id",id,callback);
    }
}

​ 使用asyncSend方法发送异步消息。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String id) {
        System.out.println("已完成短信发送业务(rocketmq),id:"+id);
    }
}

​ RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。

​ 使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。

总结

  1. springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
  2. 操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
  3. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener

标签:SpringBoot29,broker,public,RocketMQ,服务器,id,rocketmq
From: https://www.cnblogs.com/Ashen-/p/17153003.html

相关文章

  • RocketMQ - 消费者启动机制
    RocketMQ客户端中有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer和org.apache.rocketmq.client.consumer.DefaultMQPushConsumer......
  • Docker部署RocketMQ集群
    一、概述ApacheRocketMQ是阿里开源的一款高性能、高吞吐量、队列模型的消息中间件的分布式消息中间件。关于RocketMQ集群架构的详细介绍,请参考链接:https://blog.csdn.ne......
  • RocketMQ - 消费者概述
    消费流程消费者组:一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。消费者实例:一个消费者组程序部署了多个进程,每个进程都可以称为一个......
  • 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端
    承接【【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)】pullBlockIfNotFound方法通过该方法获取该Message......
  • RocketMQ - 生产者最佳实践总结
    相对消费者而言,生产者的使用更加简单,一般关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息常用消息类型消息类型优点缺点备注普通消息(并......
  • RocketMQ - 生产者消息发送流程
    RocketMQ客户端的消息发送通常分为以下3层业务层:通常指直接调用RocketMQClient发送API的业务代码。消息处理层:指RocketMQClient获取业务发送的消息对象后,一系列的参数......
  • Window部署RocketMQ
    预备环境JDK1.8、Maven、Git,具体安装可自行百度一、安装包下载从官网下载https://rocketmq.apache.org/release-notes选择合适的版本下载,我这里下载的ApacheRocketMQ......
  • Rocketmq的tag显示积压
    背景公司有一个topic,消费者160多个,全都使用了tag来区分消息,在压测的时候,发现一个问题,消费者触发了积压告警,压测的consumerA还没开始压测,平台显示consumerA的积压值在不停的......
  • RocketMQ启动
     1.启动1.1启动NAMESERVERWindows键+R进入至‘MQ文件夹\bin’下执行:startmqnamesrv.cmd2.启动BROKER从新windows+R打开cmd-->进入‘MQ文件夹\bin’下......
  • RocketMQ - 生产者启动流程
    生产者启动流程DefaultMQProducer是RocketMQ中默认的生产者实现核心属性:namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个则用分号分开。......