RocketMQ使用场景:
- 作为消费者:用户应用 --> MQ集群A --> 权益应用 消息内容:客户开户/销户相关消息
- 作为生产者:权益应用 --> MQ集群B --> 信贷应用 消息内容:卡券事件消息
MQClientManager
, 在我们需要使用 MQ Client 实例的时候,实际上都是通过它的 getAndCreateMQClientInstance
方法进行创建的;
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
我们可以看到它利用客户的配置信息生成一个固定的 clientId,以此去缓存 factoryTable 中查找,不存在才会创建全新一个实例。 那么,可以理解一个 clientID 仅能存在一个连接实例了,可这个 clientId 是怎么产生的呢?继续跟踪看看这段代码
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
代码层面上对 clientId 进行了约定,格式为 “ClientIp@InstanceName” 格式,当 unitName 不为空的时候还会在后面加上 “@unitName”。 解决方案:
方法1:设定不同的 instanceName:
instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); 从系统属性中读取出来的,也就是一般在 JVM 启动时设定的。当然此处可以改变,但是需要评估修改之后的影响。这是为什么多少 RocketMQ Client 都只能连接一个服务器的原因。方法2:可以将RocketMQ Client升级到4.9.0
,解决这个问题的:
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); } }