首页 > 其他分享 >不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息

不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息

时间:2022-11-19 14:02:27浏览次数:58  
标签:case MQClientInstance DefaultMQProducer topic cluster 应用服务 new NameServer Rocket

前言

目前有两套RocketMQ集群,集群A包含​​topic​​名称为​​cluster_A_topic​​,集群B包含​​topic​​名称为​​cluster_B_topic​​,在应用服务​​OrderApp​​上通过​​RocketMQ Client​​创建两个​​DefaultMQProducer​​实例发送消息给集群A和集群B,架构图如下:

不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息_缓存

根据上述架构图,我们给出的示例代码如下:

// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
// 设置nameServer地址
producer1.setNamesrvAddr("192.168.2.230:9876");
try {
producer1.start();
// 发送消息
SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result1.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_A_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_A_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_A_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_A_topic 副本不可用!");
}
} catch (Exception e) {
e.printStackTrace();
}
// 创建第二个DefaultMQProducer
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
// 设置nameServer地址
producer2.setNamesrvAddr("192.168.2.231:9876");
try {
producer2.start();
// 发送消息
SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result2.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_B_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_B_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_B_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_B_topic 副本不可用!");
}
return "ok";
} catch (Exception e) {
e.printStackTrace();
} finally {
producer1.shutdown();
producer2.shutdown();
}

结果竟然报错了,报错内容时​​cluster_B_topic​​不存在:

不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息_缓存_02

经过不断的测试,发现只有放在最前面启动的​​DefaultMQProducer​​会生效,后面启动的​​DefaultMQProducer​​发送消息就报错说对应的​​topic​​不存在,而且报错的​​broker​​竟然是前面启动的​​DefaultMQProducer​​对应的​​broker​​。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

问题定位

首先说明一下,当前使用的​​RocketMQ Client​​版本是​​4.8.0​​。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸

标签:case,MQClientInstance,DefaultMQProducer,topic,cluster,应用服务,new,NameServer,Rocket
From: https://blog.51cto.com/u_15773567/5870396

相关文章

  • 【Azure 应用服务】Azure Web App 服务默认支持一些 Weak TLS Ciphers Suite,是否有办
    问题描述当AzureWebApp进行安全扫描后,发现依旧支持很多弱TLS加密套件(WeakTLSCiphersSuite),那么是否有办法来关闭这些弱的加密套件呢?在WindowsIIS环境中,可以通过......
  • RocketMQ 重试机制详解及最佳实践
    作者:斜阳引言本文主要介绍在使用RocketMQ时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在RocketMQ中合理使用重试机制,帮助构建弹性,高可用......
  • 解析 RocketMQ 多样消费功能-消息过滤
    作者:徒钟什么是消息过滤在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。......
  • 初识RocketMQ基础概念(一)
    20年入职ryx公司后,刚好接触到一个线上问题,遂开了一个研究课题,一场针对ApacheRocketMQ的无烟战争正式打开。一开始刚接触这玩意,只是通过百度搜索简单了解下,后来发现很多文......
  • RocketMQ 5.0 API 与 SDK 的演进
    作者:艾阳坤RocketMQ5.0SDK采用了全新的API,使用gRPC作为通信层的实现,并在可观测性上做了很大幅度的提升。全新统一的API此处的API并不单单只是接口上的定义,同......
  • 让数据流动起来,RocketMQ Connect 技术架构解析
    作者:周波WhyRocketMQConnect在业务系统,或者大数据系统中不同数据源之间的数据同步是十分常见的,传统的点对点的数据同步工具,在面临越来越多的数据源点对点的数据同步会......
  • rocketMq springboot2 发送广播消息
    广播消息:一个点发送,所有有监听订阅的程序都能收到消息。应用场景:一个配置更新了,其他点都需要知道配置更新需加载。 mq创建主要是创建组时与队列有点区别mqadminup......
  • rocketMq springboot2 发送有序消息
    有序消息:所有信息往mq中,在broker.conf配置文件中指定产生队列数量。如果是普通队列时,所有消息,会分发到默认队列的各个队列中。是无序的。有序则是,所有消息发送,都指定一个......
  • rocketMq springboot2接入配置
    rocketmq的接入配置。 引入jar包<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependen......
  • rocketMq 安装与配置
    下载rocketmq-all-4.7.1-bin-release.zip上传到服务器 /app/rocketmq,并且解压#mkdir-p/app/rocketmq#tar-zxvfrocketmq-all-4.7.1-bin-release.zip[root@lo......