一、源码地址下载
RocketMQ官网下载地址:https://github.com/apache/rocketmq/tags。
当前搭建的是4.8.0版本的rocketmq,下载zip压缩包至本地,并解压。
当解压后的RocketMQ源码导入IDEA。
二、源码环境搭建
1、启动NameServer
1、NameServer启动源码入口
启动namesrv命令如下:
nohup ./mqnamesrv >../logs/namesrv.log &
查看mqnamesrv文件详情,发现启动namesrv是通过org.apache.rocketmq.namesrv.NamesrvStartup调起的,NamesrvStartup为namesrv启动源码分析入口。
2、执行NamesrvStartup,启动NameServer
执行NamesrvStartup,启动namesrv报错,原因是未配置RocketMQ的环境变量。
2.1、RocketMQ环境变量配置
2.1.1、新建conf文件夹
在当前RocketMQ源码工程中创建conf文件夹,用于存放distributor子模块相关配置文件。
2.1.2、复制配置文件
将distribution模块中的logback_namesrv.xml、logback_broker.xml、broker.conf配置文件复制到新创建的conf文件夹下。
2.1.3、新建store文件夹
在conf文件夹下新建store文件夹,用于RocketMQ对消息的持久化存储。
2.1.4、修改配置文件详情
调整conf/broker.conf,broker.conf详情如下
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH ## 消息持久化存储根路径 storePathRootDir=D:/Source/mq_source/rocketmq/store ## 消息持久化存储路径 storePathCommitLog=D:/Source/mq_source/rocketmq/store/commitlog namesrvAddr=127.0.0.1:9876 brokerIP1=127.0.0.1 autoCreateTopicEnable=true
根据实际需要,调整日志输出位置,logback_namesrv.xml详情如下:
logback_broker.xml详情如下:
2.1.5、配置环境变量
配置NamesrvStartup启动时的环境变量如下:
6、指定配置文件
指定启动的配置文件,创建的conf配置文件夹下的broker.conf配置。
2.2、启动namesrv
出现如下提示,表示已经成功启动了namesrv。
在日志输出位置查看namesrv.log启动日志详情:
2、启动Broker
1、broker启动源码入口
启动broker命令如下:
nohup ./mqbroker -n ip:port -c ../conf/broker.conf >../logs/broker.log &
查看mqbroker文件详情,发现启动broker是通过org.apache.rocketmq.broker.BrokerStartup调起的,BrokerStartup为broker启动源码分析入口。
2、执行BrokerStartup,启动broker
2.1、broker子模块out文件夹依赖问题
启动broker,发现如下报错;
查看broker子模块,发现out文件夹未显示:
源码文件中的out文件夹是存在的。
在IDEA中可以设置一些忽略文件,这些文件不被显示,查看IDEA的配置详情: Editor -> File Types,发现out文件夹设置忽略了,移除out文件设置。
可以看到out文件夹正常显示,报错问题已解决。
2.2、配置环境变量
2.3、启动broker
查看启动日志,发现未找到指定的文件夹。
在RocketMQ中,默认获取根路径的配置信息,一般情况下,在默认的根路径没有指定的配置文件。
System.getProperty("user.home")
通过IDEA启动参数设置,调整user.home属性的值,调整内容如下:
在D:\Source\mq_source\rocketmq创建store文件夹下,创建commitlog文件夹,重启启动rocketmq。
再次查看启动日志
IDAE控制台出现如下提示,表示已经成功启动了broker。
3、Producer/Consumer消息发送测试
启动producer发现,broker没有自动创建Topic,配置文件中的autoCreateTopicEnable=true设置未生效。
配置broker启动时加载的配置文件,详情如下:
重启Broker,再次通过producer发送消息,发送详情如下:
consumer消费结果:
3.1、Producer示例代码
生产者代码详情
1 import org.apache.rocketmq.client.producer.DefaultMQProducer; 2 import org.apache.rocketmq.client.producer.SendResult; 3 import org.apache.rocketmq.common.message.Message; 4 import org.apache.rocketmq.remoting.common.RemotingHelper; 5 6 /** 7 * 同步发送 8 */ 9 public class SyncProducer { 10 public static void main(String[] args) throws Exception{ 11 // 实例化消息生产者Producer 12 DefaultMQProducer producer = new DefaultMQProducer("group_test"); 13 14 try{ 15 // 设置NameServer的地址 16 producer.setNamesrvAddr("127.0.0.1:9876"); 17 // 启动Producer实例 18 producer.start(); 19 20 for (int i = 0; i < 2; i++) { 21 // 创建消息,并指定Topic,Tag和消息体 22 Message msg = new Message("TopicTest", 23 "TagA", 24 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 25 ); 26 // 发送消息到一个Broker 27 SendResult sendResult = producer.send(msg); 28 System.out.printf("%s%n", sendResult); 29 } 30 }finally { 31 //如果不再发送消息,关闭Producer实例。 32 producer.shutdown(); 33 } 34 } 35 }
3.2、Consumer示例代码
消费者代码详情:
1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 5 import org.apache.rocketmq.common.consumer.ConsumeFromWhere; 6 import org.apache.rocketmq.common.message.MessageExt; 7 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; 8 import java.nio.charset.StandardCharsets; 9 import java.util.List; 10 11 /** 12 * 集群模式消费 13 */ 14 public class ClusterComuser { 15 public static void main(String[] args) throws Exception { 16 // 实例化消费者,指定组名: group_test 17 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test"); 18 // 指定Namesrv地址信息. 19 consumer.setNamesrvAddr("127.0.0.1:9876"); 20 // 订阅Topic 21 consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC 22 // 如果非第一次启动,那么按照上次消费的位置继续消费 23 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 24 // 默认消费模式 - 集群模式消费 25 consumer.setMessageModel(MessageModel.CLUSTERING); 26 27 // 注册回调函数,处理消息 28 consumer.registerMessageListener(new MessageListenerConcurrently() { 29 @Override 30 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 31 ConsumeConcurrentlyContext context) { 32 try { 33 for(MessageExt msg : msgs) { 34 Thread.sleep(500); 35 System.out.println("收到消息:" + " topic :" + msg.getTopic() + " ,tags : " + msg.getTags() + " ,msg : " + new String(msg.getBody(), StandardCharsets.UTF_8)); 36 } 37 } catch (Exception e) { 38 e.printStackTrace(); 39 return ConsumeConcurrentlyStatus.RECONSUME_LATER; 40 41 } 42 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 43 } 44 }); 45 // 启动消息者 46 consumer.start(); 47 System.out.printf("Consumer Started.%n"); 48 } 49 }标签:文件夹,启动,broker,源码,RocketMQ,apache,rocketmq,搭建 From: https://www.cnblogs.com/RunningSnails/p/17381191.html