首页 > 编程语言 >RocketMQ源码(一):源码环境搭建

RocketMQ源码(一):源码环境搭建

时间:2023-05-08 11:47:11浏览次数:53  
标签:文件夹 启动 broker 源码 RocketMQ apache rocketmq 搭建

一、源码地址下载

  RocketMQ官网下载地址:https://github.com/apache/rocketmq/tags

 0

  当前搭建的是4.8.0版本的rocketmq,下载zip压缩包至本地,并解压。

  0

  当解压后的RocketMQ源码导入IDEA。 

  0

二、源码环境搭建

1、启动NameServer

1、NameServer启动源码入口

  启动namesrv命令如下:

nohup ./mqnamesrv >../logs/namesrv.log &

  查看mqnamesrv文件详情,发现启动namesrv是通过org.apache.rocketmq.namesrv.NamesrvStartup调起的,NamesrvStartup为namesrv启动源码分析入口。

  

2、执行NamesrvStartup,启动NameServer

  执行NamesrvStartup,启动namesrv报错,原因是未配置RocketMQ的环境变量。

 

2.1、RocketMQ环境变量配置

2.1.1、新建conf文件夹

  在当前RocketMQ源码工程中创建conf文件夹,用于存放distributor子模块相关配置文件。 

 

2.1.2、复制配置文件

  将distribution模块中的logback_namesrv.xml、logback_broker.xml、broker.conf配置文件复制到新创建的conf文件夹下。

 

2.1.3、新建store文件夹

  在conf文件夹下新建store文件夹,用于RocketMQ对消息的持久化存储。

 

2.1.4、修改配置文件详情

  调整conf/broker.conf,broker.conf详情如下

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
## 消息持久化存储根路径
storePathRootDir=D:/Source/mq_source/rocketmq/store
## 消息持久化存储路径
storePathCommitLog=D:/Source/mq_source/rocketmq/store/commitlog
namesrvAddr=127.0.0.1:9876
brokerIP1=127.0.0.1
autoCreateTopicEnable=true

   根据实际需要,调整日志输出位置,logback_namesrv.xml详情如下: 

   

   logback_broker.xml详情如下: 

  

2.1.5、配置环境变量

  配置NamesrvStartup启动时的环境变量如下:

  

6、指定配置文件

  指定启动的配置文件,创建的conf配置文件夹下的broker.conf配置。

 

2.2、启动namesrv

  出现如下提示,表示已经成功启动了namesrv。

  

  在日志输出位置查看namesrv.log启动日志详情:

  

2、启动Broker

1、broker启动源码入口

  启动broker命令如下:

nohup ./mqbroker -n ip:port -c ../conf/broker.conf  >../logs/broker.log &

  查看mqbroker文件详情,发现启动broker是通过org.apache.rocketmq.broker.BrokerStartup调起的,BrokerStartup为broker启动源码分析入口。

  

2、执行BrokerStartup,启动broker

2.1、broker子模块out文件夹依赖问题

  启动broker,发现如下报错;

 

  查看broker子模块,发现out文件夹未显示:

  0

  源码文件中的out文件夹是存在的。

  0

  在IDEA中可以设置一些忽略文件,这些文件不被显示,查看IDEA的配置详情: Editor -> File Types,发现out文件夹设置忽略了,移除out文件设置。

  0

  可以看到out文件夹正常显示,报错问题已解决。

  0

2.2、配置环境变量

  0

2.3、启动broker

  查看启动日志,发现未找到指定的文件夹。

  0

  在RocketMQ中,默认获取根路径的配置信息,一般情况下,在默认的根路径没有指定的配置文件。

System.getProperty("user.home")

  通过IDEA启动参数设置,调整user.home属性的值,调整内容如下:

  0

  在D:\Source\mq_source\rocketmq创建store文件夹下,创建commitlog文件夹,重启启动rocketmq。

再次查看启动日志

  0

  IDAE控制台出现如下提示,表示已经成功启动了broker。

  0

3、Producer/Consumer消息发送测试

  启动producer发现,broker没有自动创建Topic,配置文件中的autoCreateTopicEnable=true设置未生效。

  0

  配置broker启动时加载的配置文件,详情如下:

  0

  重启Broker,再次通过producer发送消息,发送详情如下:

  0

  consumer消费结果:

 0

3.1、Producer示例代码

  生产者代码详情

 1 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 2 import org.apache.rocketmq.client.producer.SendResult;
 3 import org.apache.rocketmq.common.message.Message;
 4 import org.apache.rocketmq.remoting.common.RemotingHelper;
 5 
 6 /**
 7  * 同步发送
 8  */
 9 public class SyncProducer {
10     public static void main(String[] args) throws Exception{
11         // 实例化消息生产者Producer
12         DefaultMQProducer producer = new DefaultMQProducer("group_test");
13 
14         try{
15             // 设置NameServer的地址
16             producer.setNamesrvAddr("127.0.0.1:9876");
17             // 启动Producer实例
18             producer.start();
19 
20             for (int i = 0; i < 2; i++) {
21                 // 创建消息,并指定Topic,Tag和消息体
22                 Message msg = new Message("TopicTest",
23                         "TagA",
24                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
25                 );
26                 // 发送消息到一个Broker
27                 SendResult sendResult = producer.send(msg);
28                 System.out.printf("%s%n", sendResult);
29             }
30         }finally {
31             //如果不再发送消息,关闭Producer实例。
32             producer.shutdown();
33         }
34     }
35 }

3.2、Consumer示例代码

  消费者代码详情:

 1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 5 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 6 import org.apache.rocketmq.common.message.MessageExt;
 7 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 8 import java.nio.charset.StandardCharsets;
 9 import java.util.List;
10 
11 /**
12  * 集群模式消费
13  */
14 public class ClusterComuser {
15     public static void main(String[] args) throws Exception {
16         // 实例化消费者,指定组名: group_test
17         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
18         // 指定Namesrv地址信息.
19         consumer.setNamesrvAddr("127.0.0.1:9876");
20         // 订阅Topic
21         consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
22         // 如果非第一次启动,那么按照上次消费的位置继续消费
23         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
24         // 默认消费模式 - 集群模式消费
25         consumer.setMessageModel(MessageModel.CLUSTERING);
26 
27         // 注册回调函数,处理消息
28         consumer.registerMessageListener(new MessageListenerConcurrently() {
29             @Override
30             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
31                                                             ConsumeConcurrentlyContext context) {
32                 try {
33                     for(MessageExt msg : msgs) {
34                         Thread.sleep(500);
35                         System.out.println("收到消息:" + " topic :" + msg.getTopic() + " ,tags : " + msg.getTags() + " ,msg : " + new String(msg.getBody(), StandardCharsets.UTF_8));
36                     }
37                 } catch (Exception e) {
38                     e.printStackTrace();
39                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
40 
41                 }
42                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43             }
44         });
45         // 启动消息者
46         consumer.start();
47         System.out.printf("Consumer Started.%n");
48     }
49 }
 

标签:文件夹,启动,broker,源码,RocketMQ,apache,rocketmq,搭建
From: https://www.cnblogs.com/RunningSnails/p/17381191.html

相关文章

  • 【python】http.server搭建局域网文件传输
    1、起因  因为测试需要向平板传输apk安装文件,插数据线比较麻烦,同一局域网起个服务方便又快捷,速度也快,linux下类似 2、官网文档  python3.11  https://docs.python.org/3/library/http.server.html  python2.7(自行了解)  https://docs.python.org/2.7/......
  • ReentrantReadWriteLock源码分析
    ReentrantLock是互斥锁,若存在读多写少同时保证线程安全的场景,ReentrantLock效率比较低,此时需要用到ReentrantReadWriteLock。一、ReentrantReadWriteLock介绍ReentrantReadWriteLock是可重入的读写锁,实现了ReadWriteLock接口,ReadWriteLock是读写锁的顶级接口,定义了readL......
  • ReentrantLock源码分析
    一、ReentrantLock介绍ReentrantLock是JDK1.5引入的,实现Lock接口的互斥锁。保证多线程的环境下,共享资源的原子性。与Synchronized的非公平锁不同,ReentrantLock的实现公平锁、非公平锁。ReentrantLock是重入锁,重入是指,同一个线程可以重复执行加锁的代码段。二、ReentrantLock......
  • java基于springboot+vue非前后端分离的学生成绩管理系统、学生信息管理系统,附源码+数
    1、项目介绍java基于springboot+vue非前后端分离的学生成绩管理系统、学生信息管理系统。本文首先介绍了学生成绩管理的技术发展背景与发展现状,然后遵循软件常规开发流程,首先针对系统选取适用的语言和开发平台,根据需求分析制定模块并设计数据库结构,再根据系统总体功能模块的设计......
  • Nacos 的单机部署搭建
    Nacos是阿里巴巴的产品,主要用来做微服务的注册中心和配置中心,界面美观,功能强大,在国内非常受欢迎。本篇博客主要介绍如何搭建单机版的Nacos,为编写后续的博客做准备。后面也会介绍Nacos集群的搭建,搭建过程都很简单。官方建议使用2.x的版本,本篇博客将以当前最新的2.2.2版......
  • Vulkan学习笔记之开发环境搭建
    一、概述最近因为工作需要开始学习Vulkan的相关知识,作为初学者,发现相对较好的学习资料莫过于vulkan-tutorial,在自己学习Vulkan的过程中,决定将自己的理解记录下来,一是为了加深记忆,二是为了分享给大家一起探讨学习,因此有了本系列文章,开发环境搭建是本系列文章的第一篇。二、开发环......
  • MyCat05——基于Docker搭建MySQL主从复制
    1安装mysql的镜像如果服务器较少,为了充分发挥现有服务器的利用率,可以使用容器化技术来安装mysql。如果服务器没有docker,需要先安装dockeryuminstall-ydockerdocker安装后,启动服务servicedockerstart下载mysql5.7版本的docker镜像dockerpullmysql:5.72启动mysql服务启动......
  • StatefulSet扩缩容源码分析
    k8sv1.15.0Informer监听cmd/kube-controller-manager/app/apps.go作为StatefulSet资源控制器,StatefulSetController通过PodInformer、StatefulSetInformer、PersistentVolumeClaimInformer、ControllerRevisionInformer来监听事件。扩缩容StatefulSetpodManagementPolicyPa......
  • 使用 NutUI 搭建「自定义业务风格」的组件库 | 京东云技术团队
    本文介绍,如何使用NutUI组件库,搭建一套为专属业务风格的业务组件库。NutUI是一款京东风格的移动端组件库。NutUI目前支持Vue和React技术栈,支持Taro多端适配。当下的实现方式一般组件库,都会给用户提供修改主题的方式。比如在NutUI组件库中,给用户提供了两种方式:修改CSS变量,Nu......
  • 搭建openldap与phpldapadmin
    现在很多ldap文档不太适合centos7下安装使用ldap,于是自己搓了一篇作为留底LDAP基础概念:在开始部署前,要了解一些LDAP知识点:条目entry区别名/唯一标识名DistinguishedName/DN属性attributeLDAP的条目(entry)是具有区别名(DistinguishedNamed/DN唯一标识名)的属性(attribute)。DN......