首页 > 其他分享 >快速入门一篇搞定RocketMq-实现微服务实战落地

快速入门一篇搞定RocketMq-实现微服务实战落地

时间:2024-05-05 19:11:31浏览次数:36  
标签:搞定 入门 启动 sh RocketMq apache rocketmq 消息

1、RocketMq介绍

RocketMQ起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在2016年开源分布式消息中间件,并逐渐成为Apache顶级项目。现在是Apache的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。

官网地址:https://rocketmq.apache.org

快速开始文档:https://rocketmq.apache.org/docs/

Github地址:https://github.com/apache/rocketmq

2、RocketMq架构说明

RocketMQ的架构主要由Producer(消息生产者)、Consumer(消息消费者)、Broker(消息中转角色)和Name Server(网络路由角色)四个核心组件组成。Name Server负责维护Broker集群和Topic信息的路由中心,而Broker负责存储和传输消息。RocketMQ采用类似于Kafka的发布订阅模型,支持消息的顺序传输和事务性传输,同时可以配置不同的消息过滤规则和重试策略。

3、下载

查看微服务对应版本信息,下载相关版本。查看连接:https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明#2021x-分支

根据自己使用的Spring Cloud Alibaba Version选择对应的版本进行下载即可。这里下载4.4.0版本,下载地址:https://rocketmq.apache.org/download 下载成功后,为一个压缩包文件。把文件上传linux

使用命令解压zip文件并重命名文件夹命令:

unzip rocketmq-all-4.4.0-bin-release.zip -d rocketmq-4.4.0

解压成功后,如图:

4、启动

进入rocketmq-4.4.0目录,查看目录结构。

  • benchmark:性能测试相关的资源,如果想要了解RocketMQ的基准测试,可以考虑使用该压测工具。这个工具可以模拟生产者和消费者来测试RocketMQ集群的性能。
  • bin:里面是一些可执行文件,管理rocketmq服务
  • conf:里面就是一些配置文件,包括broker配置文件和logback配置文件
  • lib:所依赖的第三方jar包
4.1、启动Name Server命令
nohup sh bin/mqnamesrv -n 192.168.42.130:9876 > /dev/null 2>&1 &   # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败

启动成功后,默认启动日志在root目录下。可以查看启动日志信息:

tail -f ~/logs/rocketmqlogs/namesrv.log

输出下面信息启动成功:

也可以通过端口9876查看是否启动成功

ps -ef|grep 9876

4.2 启动Broker命令
nohup sh bin/mqbroker -n 192.168.42.130:9876 -c conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 & # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败

启动日志和启动Name Server日志在一个文件夹里面。查看启动日志信息:

tail -n 50 ~/logs/rocketmqlogs/broker.log

可以通过jps 查看启动信息如果能看到 NamesrvStartup 和 BrokerStartup 的话就表明单机版的 RocketMQ 启动成功了

4.3 Rocketmq服务关闭

关闭 MQ使用 bin 目录下的mqshutdown关闭服务

sh bin/mqshutdown namesrv #关闭namesrv服务

sh bin/mqshutdown broker #关闭broker服务 
4.4 启动脚本命令参数修改

在启动的过程中,如果服务器内存不足或者满足不了启动脚本里面的默认内存配置,启动的时候会启动报错。这是因为 apache-rocketmq/bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内存太大,而系统实际内存却太小导致启动失败。解决办法就是修改runbroker.sh 和 runserver.sh里的内存配置,调小一些即可。

首先先备份一份runbroker.sh 和 runserver.sh文件,以防万一改错了。

cp runserver.sh runserver.sh.init

cp runbroker.sh runbroker.sh.init

修改:runserver.sh脚本文件,找到配置JVM参数的内容,把JVM配置参数调小:

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"

修改:runbroker.sh脚本文件

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m"

5、测试消息

通过上面的步骤,RocketMQ就启动成功了。接下来我们可以在服务器上面通过提供的测试脚本进行消息测试,验证RocketMq是否可以正常使用。

生产者发送消息:

export NAMESRV_ADDR=127.0.0.1:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

通过输出内容,我们可以查看到消息发送成功了。下面运行监听脚本。测试消费者接受消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

成功拿到消息,可以说明RocketMq服务启动成功了。

6、监控程序rocketmq-console

6.1、配置rocketmq-console

rocketmq-externals是RocketMq的扩展插件项目。GitHub地址: https://github.com/apache/rocketmq-externals 之前rocketmq-console也在rocketmq-externals项目中。如今在GitHub apache/rocketmq-externals 项目下已经找不到 rocketmq-console模块了,官方已经从 apache/rocketmq-externals 独立出来并更名为 rocketmq-dashboard。 我们可以查看RocketMq官网配置仪表板说明 :RocketMQ 仪表板 |MQ (apache.org)

https://rocketmq.apache.org/docs/deploymentOperations/04Dashboard/

根据提示可以下载到源码内容

Github下载地址:https://github.com/apache/rocketmq-dashboard

如果是 5.0 版本的直接拉取最新的代码

 git clone https://github.com/apache/rocketmq-dashboard.git  

releases标签中的rocketmq-dashboard-1.0.0版本试用于5.0版本以下的。

https://github.com/apache/rocketmq-dashboard/releases/tag/rocketmq-dashboard-1.0.0

下载成功后,使用IDEA打开修改配置,改一下namesrvAddr配置项即可,如果没有指定默认就是localhost:9876,如果namesrvAddr是集群环境,每个节点使用;隔开。本地测试运行,运行成功后打包发布的linux系统。

mvn clean package -Dmaven.test.skip=true #跳过测试
6.2 启动rocketmq-console

指定NameServer的地址和启动端口(8830)以及输出日志。由于内部不够,设置JVM参数启动,如果使用的linux系统内存足够可以忽略jvm参数。启动命令如下:

nohup java -jar -Xmx256M -Xms256M -XX:MaxMetaspaceSize=128M -XX:MetaspaceSize=128M rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &

不指定JVM参数:

nohup java -jar  rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &

执行成功后,查看启动日志:

 tail -f ~/logs/consolelogs/rocketmq-console.log 

启动成功。开放8830端口进行公网访问。

监控成功。可以在集群导航中查看当前节点部署节点。

也可以看到上面测试的数据输出:

7、微服务连接RockerMq

安全组需要开放10909、10911端口和9876端口,其中10909是VIP通道,10911是非VIP通道,9876是对外连接提供端口。不然连接发送会报错发送超时 sendDefaultImpl call timeout; nested exception is org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

maven引入依赖

        <!--RocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

配置中心加入RocketMq配置

rocketmq: # rocketMQ配置
  # name server地址
  name-server: 192.168.42.130:9876
  consumer:
    pull-batch-size: 10
    group: blog_message
  producer:
    group: blog_message
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false

编写RocketEnhanceConfig文件,解决不支持Java时间类型配置

@Configuration
public class RocketEnhanceConfig {

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}
7.1 编写消息生产者:
@Slf4j
@Service
public class RocketStorage implements IDataStorage {
  
		@Autowired(required = true)
    private RocketMQTemplate rocketMQTemplate;

		@Value("${rocketMq.topic:blog_notify_sow}")  
    private String topic;

    @Override
    public void store(String value, Integer type, Long timestamp) {
        String message = String.format("%s,%s,%s",value,type,timestamp);
        rocketMQTemplate.convertAndSend(topic,message); //发送数据
        log.info("RocketMQ|data sent,value: {}, type:{}, timestamp: {}", value, type, timestamp);
    }

    @Override
    public String getType() {
        return "RocketMQ";
    }
}

编写接口:IDataStorage

/**
 * 数据发送到Mq里...
 */
public interface IDataStorage {

    /**
     * persistence data
     *
     * @param value 接收内容
     * @param type  数据类型
     * @param timestamp 当前时间(时间戳)
     */
    void store(String value,Integer type,Long timestamp);

    String getType();

}

在Controller中调用接口发送数据。

@RestController
@RequestMapping("/dataStorage")
public class DataStorageController {

    @Autowired
    private IDataStorage dataStorage;

    @GetMapping
    public Response sendDataStorage(String value){
        dataStorage.store(value,type,System.currentTimeMillis());
        return Response.success();
    }

}
7.2 编写消息消费者

编写一个RocketMq消息监听类实现消息监听 RocketDataConsumer :

@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "blog_message",topic = "blog_notify_sow")
public class RocketDataConsumer implements RocketMQListener {

    @PostConstruct
    public void post() {
        log.warn("***** RocketMq Data Consumer Activated");
    }

    @Autowired
    @Qualifier("dataPersist")
    private IDataPersist dataPersist;


    @Override
    public void onMessage(Object o) {
        log.info("RocketMq 接收到的信息 . . . . . .:{}",o);
        dataPersist.put(o.toString(),1,System.currentTimeMillis());
    }
}
7.3 测试消息发送和接收

启动项目,通过postman调用接口:

调用接口后,发现接口调用成功了。下面我们查看控制台消息消费者是否接收到消息。

通过上面输出的消息可以看到消息接收成功了。

标签:搞定,入门,启动,sh,RocketMq,apache,rocketmq,消息
From: https://www.cnblogs.com/sowler/p/18173752

相关文章

  • 移位操作搞定两数之商
    五一漫长的假期,外面的世界是人山人海,反而在家刷题算得上一个好的休闲方式。刚好我开始写这道题:Giventwointegers `dividend` and `divisor`,dividetwointegers **without** usingmultiplication,division,andmodoperator.Theintegerdivisionshouldtruncate......
  • 入门推荐系统业务的书单
    最近出差在外,为解决吃饭问题,花了不少时间在美团上翻阅当地的美食,从侧面接触了推荐系统。作为职业程序员,恰逢五一假期,因此花了一些时间查阅了一些资料,期望可以进一步了解推荐系统,拓展知识面。公司内部从事推荐系统领域的同事推荐了如下书籍:从零开始构建企业级推荐系统推荐系统......
  • PHP-入门指南(全)
    PHP入门指南(全)原文:zh.annas-archive.org/md5/d36bde355b2574844946c8150420db7b译者:飞龙协议:CCBY-NC-SA4.0前言开发网站是当今的优先事项,以便您的业务在互联网上有所存在。设计和开发是任何网站的基础步骤。PHP通常用于网站和Web应用程序开发。PHP是一种通用的服务......
  • Metasploit-即时入门(全)
    Metasploit即时入门(全)原文:annas-archive.org/md5/FDEA350254319975F23617766073DAB6译者:飞龙协议:CCBY-NC-SA4.0第一章. 快速入门Metasploit欢迎阅读《快速入门Metasploit》。本书特别为您提供了设置Metasploit所需的所有信息。您将学习Metasploit的基础知识,开始......
  • 嵌入式入门
    最近买了两块开发板,一块是精英STM32F103开发板,用来学习stm32,另一块是阿尔法Linux开发板,用来学习uboot,linux内核以及驱动、操作系统移植这些,感觉挺有意思的。能学的,想学的东西挺多的,主要是想自己设计一些小东西,从电路原理图的设计,再到芯片元器件选型,绘制PCB,还有各种外设驱动,接口......
  • Vue入门到关门之组件
    一、组件1、什么是组件在Vue.js中,组件是构建用户界面的可重用和独立的模块。每个Vue组件都封装了自己的模板、逻辑和样式,使得代码可以更加模块化、可维护性更高。通过组件化,你可以将界面拆分成独立的、可复用的部分,每个部分都有自己的功能和样式,这样可以更容易地管理复杂的界面,......
  • Vue入门到关门之第三方框架elementui
    1、什么是ElementUI?ElementUI是一个基于Vue.js的组件库,它提供了丰富的UI组件和一套完整的解决方案,用于快速构建现代化的Web应用程序。ElementUI的目标是帮助开发者快速构建出美观、易用的界面,并提供了丰富的组件,包括但不限于按钮、表单、表格、对话框、菜单、导航、布......
  • Vue入门到关门之Vue项目工程化
    一、创建Vue项目1、安装node环境官网下载,无脑下一步,注意别放c盘就行Node.js—RunJavaScriptEverywhere(nodejs.org)需要两个命令npm---->pipnode--->python装完检查一下,helloworld检测,退出crtl+c2、搭建vue项目环境装cnpm这个包,下载东西会快很多,装模块......
  • JEMSPath 入门
    JMESPath是一种查询语言,专门用于处理JSON对象。JMESPath规则和基本语法包括:字段访问:使用点.来访问JSON对象中的字段。例如,obj1.obj2.key。索引列表:通过索引来访问列表中的元素。索引是基于零的,例如arr1[0].arr2[0].key。管道表达式(|):用于将一个表达式......
  • 1. SpringBoot 入门
    1.SpringBoot简介SpringBoot是由Pivotal团队提供的全新框架,可以帮助我们开发基于Spring的、独立的、生产级的应用程序。​其中SpringBoot的官网是:SpringBootReferenceDocumentationSpringBoot的主要目标是:为所有Spring开发提供更快的入门体验开箱即用,提供了自动配......