首页 > 系统相关 >在同一个Java进程中连接多个RocketMQ集群

在同一个Java进程中连接多个RocketMQ集群

时间:2023-07-30 15:37:41浏览次数:62  
标签:Java MQClientInstance clientId instance 集群 append sb RocketMQ

 RocketMQ使用场景:

  1. 作为消费者:用户应用 --> MQ集群A --> 权益应用 消息内容:客户开户/销户相关消息
  2. 作为生产者:权益应用 --> MQ集群B --> 信贷应用 消息内容:卡券事件消息
问题现象: 一个Java进程要连接多个RocketMQ集群时,作为消费者功能无法正常使用,作为生产者功能可以正常使用 原因:RocketMQ Client 有一个核心类 MQClientManager, 在我们需要使用 MQ Client 实例的时候,实际上都是通过它的 getAndCreateMQClientInstance 方法进行创建的;
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
  String clientId = clientConfig.buildMQClientId();
  MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
  if (null == instance) {
    instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
    MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
    if (prev != null) {
      instance = prev;
      log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    } else {
      log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    }
  }
  return instance;
}

 

我们可以看到它利用客户的配置信息生成一个固定的 clientId,以此去缓存 factoryTable 中查找,不存在才会创建全新一个实例。 那么,可以理解一个 clientID 仅能存在一个连接实例了,可这个 clientId 是怎么产生的呢?继续跟踪看看这段代码
public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());
    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
    sb.append(this.unitName);
  }
  return sb.toString();
}

  

代码层面上对 clientId 进行了约定,格式为 “ClientIp@InstanceName” 格式,当 unitName 不为空的时候还会在后面加上 “@unitName”。 解决方案:

方法1:设定不同的 instanceName:

instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); 从系统属性中读取出来的,也就是一般在 JVM 启动时设定的。当然此处可以改变,但是需要评估修改之后的影响。这是为什么多少 RocketMQ Client 都只能连接一个服务器的原因。

方法2:可以将RocketMQ Client升级到4.9.0,解决这个问题的:

public void changeInstanceNameToPID() {
  if (this.instanceName.equals("DEFAULT")) {
    this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
  }
}

  

方法3:可以设定不同的 unitName

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); producer1.setNamesrvAddr("192.168.2.230:9876"); producer1.setUnitName("producer1"); producer1.start();   DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1"); producer2.setNamesrvAddr("192.168.2.231:9876"); producer2.setUnitName("producer2"); producer2.start();

方法4:使用一个工具,将 MQClientInstance实例 放到工具创建的隔离环境中, 从而实现多个实例完全隔离的效果。

参考:来自平行世界的救赎_学习的锅,跳舞的桌的技术博客_51CTO博客 综上:前两种方法的逻辑是通过修改 clientId 实现多个实例,而方法 4 的逻辑则是 “既然你的缓存已经有这个 key,我就换个缓存”。

代码地址

github: https://github.com/vancoo/multi-mq-demo gitee: https://gitee.com/vancoo/multi-mq-demo

标签:Java,MQClientInstance,clientId,instance,集群,append,sb,RocketMQ
From: https://www.cnblogs.com/brithToSpring/p/17591494.html

相关文章

  • MAC中Java实现多版本JDK并存并随时切换的解决方案
    编辑SHELL配置文件,新版macOS默认使用的是zsh,打开终端,编辑zsh的配置文件.zshrcvi.zshrc在配置文件的末尾添加下面的内容:#>>>Java多版本共存exportJAVA_11_HOME=`/usr/libexec/java_home-v11`exportJAVA_17_HOME=`/usr/libexec/java_home-v17`#defaultJavaversi......
  • java使用线程池实现接口自动化中的并发测试
    importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;publicvoidtest()throwsExecutionException,InterruptedException{ExecutorServ......
  • Java之Stream流的收集
    Java之Stream流的收集收集Stream流的含义:就是把Stream流操作后的结果数据转到集合或者数组中去。Stream流只是方便操作集合/数组的手段。集合/数组才是开发中的目的。Stream流的收集方法名称说明Rcollect(Collectorcollector)开始收集Stream流,指定收集器Col......
  • java多线程
    1、什么是JUC官方文档+源码​ 面试高频问java.utiljava.util.concurrentjava.util.concurrent.atomicjava.util.concurrent.locks​ java,util工具包、包、分类业务:普通的线程代码ThreadRunnable没有返回值2、线程和进程线程和进程如果不能用一句话说出来的......
  • Java之Stream流综合案例
    Java之Stream流综合案例需求:某个公司的开发部门,分为开发一部和二部,现在需要进行年中数据结算。分析:员工信息至少包含了(名称、性别、工资、奖金、处罚记录)开发一部有4个员工,开发二部有5个员工。分别筛选出2个部门的最高工资的员工信息,封装成优秀员工对象。分别统......
  • Kafka集群安装/使用
    一.介绍Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。它主要用于处理消费者规模网站中的所有动作流数据。动作指(网页浏......
  • Java中过滤出ListA和ListB中字段相同的集合
    Java中过滤出ListA和ListB中age字段相同的User集合在Java中,List是一种常见的集合类型,它可以用来存储一组有序的数据。而对于List中存储的对象类型,我们可以使用泛型来进行限定。假设我们现在有两个List集合,分别为ListA和ListB,它们都存储了一些User对象。现在我们需要从这两个集合......
  • JavaScript、ECMA、CommonJs、NodeJS、TypeScript的关系
    返回JavaScript发布时间:1995发布公司:Netscape(网景)它是一种高级的解释型编程语言,简称JS它最初的设计目标是改善网页的用户体验。......
  • 二十、RocketMQ5.x消费重试
    消费重试指的是,消费者在消费某条消息失败后,ApacheRocketMQ服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。ApacheRocketMQ的消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策......
  • 设计模式-迭代器模式在Java中使用示例
    场景为开发一套销售管理系统,在对该系统进行分析和设计时,发现经常需要对系统中的商品数据、客户数据等进行遍历,为了复用这些遍历代码,开发人员设计了一个抽象的数据集合类AbstractObjectList,而将存储商品和客户等数据的类作为其子类AbstractObjectList类的子类ProductList和Custo......