前言
目前有两套RocketMQ集群,集群A包含topic
名称为cluster_A_topic
,集群B包含topic
名称为cluster_B_topic
,在应用服务OrderApp
上通过RocketMQ Client
创建两个DefaultMQProducer
实例发送消息给集群A和集群B,架构图如下:
根据上述架构图,我们给出的示例代码如下:
// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
// 设置nameServer地址
producer1.setNamesrvAddr("192.168.2.230:9876");
try {
producer1.start();
// 发送消息
SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result1.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_A_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_A_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_A_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_A_topic 副本不可用!");
}
} catch (Exception e) {
e.printStackTrace();
}
// 创建第二个DefaultMQProducer
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
// 设置nameServer地址
producer2.setNamesrvAddr("192.168.2.231:9876");
try {
producer2.start();
// 发送消息
SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result2.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_B_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_B_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_B_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_B_topic 副本不可用!");
}
return "ok";
} catch (Exception e) {
e.printStackTrace();
} finally {
producer1.shutdown();
producer2.shutdown();
}
结果竟然报错了,报错内容时cluster_B_topic
不存在:
经过不断的测试,发现只有放在最前面启动的DefaultMQProducer
会生效,后面启动的DefaultMQProducer
发送消息就报错说对应的topic
不存在,而且报错的broker
竟然是前面启动的DefaultMQProducer
对应的broker
。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?
问题定位
首先说明一下,当前使用的RocketMQ Client
版本是4.8.0
。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸