业务原因,需要在一个项目中与多方MQ进行业务通信;
步骤一,复制一份RocketMQProperties配置文件,避免与原来的冲突
package com.heit.road.web.config; import org.apache.rocketmq.common.topic.TopicValidator; import java.util.HashMap; import java.util.Map; public class MultipleRocketMQProperties { /** * The name server for rocketMQ, formats: `host:port;host:port`. */ private String nameServer; /** * Enum type for accessChannel, values: LOCAL, CLOUD */ private String accessChannel; private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer; /** * Configure enable listener or not. * In some particular cases, if you don't want the the listener is enabled when container startup, * the configuration pattern is like this : * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false> * <p> * the listener is enabled by default. */ private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer = new org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer(); public String getNameServer() { return nameServer; } public void setNameServer(String nameServer) { this.nameServer = nameServer; } public String getAccessChannel() { return accessChannel; } public void setAccessChannel(String accessChannel) { this.accessChannel = accessChannel; } public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer getProducer() { return producer; } public void setProducer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer) { this.producer = producer; } public static class Producer { /** * Group name of producer. */ private String group; /** * Millis of send message timeout. */ private int sendMessageTimeout = 3000; /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ private int compressMessageBodyThreshold = 1024 * 4; /** * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendFailed = 2; /** * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendAsyncFailed = 2; /** * Indicate whether to retry another broker on sending failure internally. */ private boolean retryNextServer = false; /** * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; /** * The property of "access-key". */ private String accessKey; /** * The property of "secret-key". */ private String secretKey; /** * Switch flag instance for message trace. */ private boolean enableMsgTrace = true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC; public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public int getSendMessageTimeout() { return sendMessageTimeout; } public void setSendMessageTimeout(int sendMessageTimeout) { this.sendMessageTimeout = sendMessageTimeout; } public int getCompressMessageBodyThreshold() { return compressMessageBodyThreshold; } public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) { this.compressMessageBodyThreshold = compressMessageBodyThreshold; } public int getRetryTimesWhenSendFailed() { return retryTimesWhenSendFailed; } public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; } public int getRetryTimesWhenSendAsyncFailed() { return retryTimesWhenSendAsyncFailed; } public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) { this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; } public boolean isRetryNextServer() { return retryNextServer; } public void setRetryNextServer(boolean retryNextServer) { this.retryNextServer = retryNextServer; } public int getMaxMessageSize() { return maxMessageSize; } public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public boolean isEnableMsgTrace() { return enableMsgTrace; } public void setEnableMsgTrace(boolean enableMsgTrace) { this.enableMsgTrace = enableMsgTrace; } public String getCustomizedTraceTopic() { return customizedTraceTopic; } public void setCustomizedTraceTopic(String customizedTraceTopic) { this.customizedTraceTopic = customizedTraceTopic; } } public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer getConsumer() { return consumer; } public void setConsumer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer) { this.consumer = consumer; } public static final class Consumer { /** * listener configuration container * the pattern is like this: * group1.topic1 = false * group2.topic2 = true * group3.topic3 = false */ private Map<String, Map<String, Boolean>> listeners = new HashMap<>(); public Map<String, Map<String, Boolean>> getListeners() { return listeners; } public void setListeners(Map<String, Map<String, Boolean>> listeners) { this.listeners = listeners; } } }
步骤二,复制一份@RocketMQMessageListener,并新增数据源参数soruce
,这里不采用原来的nameServer参数,可能是版本原因,这个参数目前并不支持多数据源
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.heit.road.web.config; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType; import java.lang.annotation.*; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MultipleRocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = ""; String ACCESS_KEY_PLACEHOLDER = ""; String SECRET_KEY_PLACEHOLDER = ""; String TRACE_TOPIC_PLACEHOLDER = ""; String ACCESS_CHANNEL_PLACEHOLDER = ""; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * <p> * <p> * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */ String soruce(); String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Maximum amount of time in minutes a message may block the consuming thread. */ long consumeTimeout() default 15L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
步骤三,复制一份RocketMQListenerContainer,替换参数RocketMQMessageListener
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.heit.road.web.config; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.utils.MessageUtil; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.support.RocketMQListenerContainer; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.SmartLifecycle; import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.MimeTypeUtils; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.Charset; import java.util.List; import java.util.Objects; @SuppressWarnings("WeakerAccess") public class MultipleRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQListenerContainer.class); private ApplicationContext applicationContext; /** * The name of the DefaultRocketMQListenerContainer instance */ private String name; private long suspendCurrentQueueTimeMillis = 1000; /** * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br> * >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private MessageConverter messageConverter; private RocketMQListener rocketMQListener; private RocketMQReplyListener rocketMQReplyListener; private MultipleRocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Type messageType; private MethodParameter methodParameter; private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } public int getDelayLevelWhenNextConsume() { return delayLevelWhenNextConsume; } public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; } public String getNameServer() { return nameServer; } public void setNameServer(String nameServer) { this.nameServer = nameServer; } public AccessChannel getAccessChannel() { return accessChannel; } public void setAccessChannel(AccessChannel accessChannel) { this.accessChannel = accessChannel; } public String getConsumerGroup() { return consumerGroup; } public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getConsumeThreadMax() { return consumeThreadMax; } public String getCharset() { return charset; } public void setCharset(String charset) { this.charset = charset; } public MessageConverter getMessageConverter() { return messageConverter; } public MultipleRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; return this; } public RocketMQListener getRocketMQListener() { return rocketMQListener; } public void setRocketMQListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } public RocketMQReplyListener getRocketMQReplyListener() { return rocketMQReplyListener; } public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) { this.rocketMQReplyListener = rocketMQReplyListener; } public MultipleRocketMQMessageListener getRocketMQMessageListener() { return rocketMQMessageListener; } public void setRocketMQMessageListener(MultipleRocketMQMessageListener anno) { this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode(); this.consumeThreadMax = anno.consumeThreadMax(); this.messageModel = anno.messageModel(); this.selectorType = anno.selectorType(); this.selectorExpression = anno.selectorExpression(); this.consumeTimeout = anno.consumeTimeout(); } public ConsumeMode getConsumeMode() { return consumeMode; } public SelectorType getSelectorType() { return selectorType; } public void setSelectorExpression(String selectorExpression) { this.selectorExpression = selectorExpression; } public String getSelectorExpression() { return selectorExpression; } public MessageModel getMessageModel() { return messageModel; } public DefaultMQPushConsumer getConsumer() { return consumer; } public void setConsumer(DefaultMQPushConsumer consumer) { this.consumer = consumer; } @Override public void destroy() { this.setRunning(false); if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { consumer.shutdown(); } setRunning(false); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); this.methodParameter = getMethodParameter(); log.debug("RocketMQ messageType: {}", messageType); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + messageModel + '}'; } public void setName(String name) { this.name = name; } public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @SuppressWarnings("unchecked") @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } } private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt)); Message<?> message = MessageBuilder.withPayload(replyContent).build(); org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message)); consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { log.info("Consumer replies message success."); } } @Override public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); } } private byte[] convertToBytes(Message<?> message) { Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders()); Object payloadObj = messageWithSerializedPayload.getPayload(); byte[] payloads; try { if (null == payloadObj) { throw new RuntimeException("the message cannot be empty"); } if (payloadObj instanceof String) { payloads = ((String) payloadObj).getBytes(Charset.forName(charset)); } else if (payloadObj instanceof byte[]) { payloads = (byte[]) messageWithSerializedPayload.getPayload(); } else { String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass()); if (null == jsonObj) { throw new RuntimeException(String.format( "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]", this.messageConverter.getClass(), payloadObj.getClass(), payloadObj)); } payloads = jsonObj.getBytes(Charset.forName(charset)); } } catch (Exception e) { throw new RuntimeException("convert to bytes failed.", e); } return payloads; } private Message<?> doConvert(Object payload, MessageHeaders headers) { Message<?> message = this.messageConverter instanceof SmartMessageConverter ? ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) : this.messageConverter.toMessage(payload, headers); if (message == null) { String payloadType = payload.getClass().getName(); Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null; throw new MessageConversionException("Unable to convert payload with type='" + payloadType + "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]"); } MessageBuilder<?> builder = MessageBuilder.fromMessage(message); builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); return builder.build(); } @SuppressWarnings("unchecked") private Object doConvertMessage(MessageExt messageExt) { if (Objects.equals(messageType, MessageExt.class)) { return messageExt; } else { String str = new String(messageExt.getBody(), Charset.forName(charset)); if (Objects.equals(messageType, String.class)) { return str; } else { // If msgType not string, use objectMapper change it. try { if (messageType instanceof Class) { //if the messageType has not Generic Parameter return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType); } else { //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint". //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter. return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter); } } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); throw new RuntimeException("cannot convert message to " + messageType, e); } } } } private MethodParameter getMethodParameter() { Class<?> targetClass; if (rocketMQListener != null) { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); } else { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); } Type messageType = this.getMessageType(); Class clazz = null; if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) { clazz = (Class) ((ParameterizedType) messageType).getRawType(); } else if (messageType instanceof Class) { clazz = (Class) messageType; } else { throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); } try { final Method method = targetClass.getMethod("onMessage", clazz); return new MethodParameter(method, 0); } catch (NoSuchMethodException e) { e.printStackTrace(); throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); } } private Type getMessageType() { Class<?> targetClass; if (rocketMQListener != null) { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); } else { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); } Type matchedGenericInterface = null; while (Objects.nonNull(targetClass)) { Type[] interfaces = targetClass.getGenericInterfaces(); if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) { matchedGenericInterface = type; break; } } } targetClass = targetClass.getSuperclass(); } if (Objects.isNull(matchedGenericInterface)) { return Object.class; } Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return actualTypeArguments[0]; } return Object.class; } private void initRocketMQPushConsumer() throws MQClientException { if (rocketMQListener == null && rocketMQReplyListener == null) { throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required"); } Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } } }
步骤四,配置文件添加参数,与原rocketmq参数区分开
multiplerocketmq: xdt: name-server: xdtIP:xdtPort producer: group: xdt_groups road: name-server: roadIP:roadPort producer: group: road_groups
步骤五,通过配置文件加载多数据源
package com.heit.road.web.config; import cn.hutool.core.date.DateUtil; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.support.BeanDefinitionValidationException; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.env.StandardEnvironment; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Configuration public class MultipleRocketMQConfig implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQConfig.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQMessageConverter rocketMQMessageConverter; public MultipleRocketMQConfig(RocketMQMessageConverter rocketMQMessageConverter, StandardEnvironment environment) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; } @Bean("road") @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"}) @ConfigurationProperties(prefix = "multiplerocketmq.road") public MultipleRocketMQProperties road() { return new MultipleRocketMQProperties(); } @Bean(value = "roadmq", destroyMethod = "destroy") @ConditionalOnMissingBean(name = "roadmq") @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server", "producer.group"}) public RocketMQTemplate roadMQProducer(@Qualifier("road") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) { return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter); } @Bean("xdt") @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"}) @ConfigurationProperties(prefix = "multiplerocketmq.xdt") public MultipleRocketMQProperties xdt() { return new MultipleRocketMQProperties(); } @Bean(value = "xdtmq", destroyMethod = "destroy") @ConditionalOnMissingBean(name = "xdtmq") @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server", "producer.group"}) public RocketMQTemplate xdtMQProducer(@Qualifier("xdt") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) { return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter); } private RocketMQTemplate createRocketMQTemplate(MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String nameServer = rocketMQProperties.getNameServer(); String groupName = producerConfig.getGroup(); Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); String ak = rocketMQProperties.getProducer().getAccessKey(); String sk = rocketMQProperties.getProducer().getSecretKey(); boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace(); String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic(); DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic); producer.setNamesrvAddr(nameServer); if (!StringUtils.isEmpty(accessChannel)) { producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); } producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); producer.setInstanceName(producer.getProducerGroup() + DateUtil.now()); RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setProducer(producer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); return rocketMQTemplate; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(MultipleRocketMQMessageListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } MultipleRocketMQMessageListener annotation = clazz.getAnnotation(MultipleRocketMQMessageListener.class); String topic = annotation.topic(); String consumerGroup = annotation.consumerGroup(); String soruce = annotation.soruce(); MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class); boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s_%s", topic, consumerGroup, counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, MultipleRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); MultipleRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, MultipleRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.getConsumer().setInstanceName(containerBeanName); container.getConsumer().setNamesrvAddr(rocketMQProperties.getNameServer()); container.start(); } catch (Exception e) { throw new RuntimeException(e); } } } private MultipleRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, MultipleRocketMQMessageListener annotation) { MultipleRocketMQListenerContainer container = new MultipleRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String soruce = annotation.soruce(); if (StringUtils.isEmpty(soruce)) { throw new RuntimeException(name + " 未指定数据源"); } MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class); container.setAccessChannel(AccessChannel.CLOUD); container.setTopic(annotation.topic()); container.setNameServer(rocketMQProperties.getNameServer()); String tags = annotation.selectorExpression(); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(annotation.consumerGroup()); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); return container; } private void validate(MultipleRocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
步骤六,准备多数据源listener
package com.heit.road.web.listener; import com.heit.road.web.config.MultipleRocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"}) @MultipleRocketMQMessageListener(soruce = "road", consumerGroup = "RoadRocketMqlistener6", topic = "road_test") public class RoadRocketMqlistener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("RoadRocketMqlistener 收到信息==》 " + message); } }
package com.heit.road.web.listener; import com.heit.road.web.config.MultipleRocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"}) @MultipleRocketMQMessageListener(soruce = "xdt", consumerGroup = "XdtRocketMqlistener6", topic = "road_test") public class XdtRocketMqlistener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("XdtRocketMqlistener 收到信息==》 " + message); } }
步骤七,编写测试方法,多数据源发送数据
// @Qualifier("xdtmq") @Lazy @Resource private RocketMQTemplate xdtmq; // @Qualifier("roadmq") @Lazy @Resource private RocketMQTemplate roadmq; @ApiOperation("测试用表-MQ信息测试") @PostMapping("sendTest") public BaseBack<?> sendTest() { xdtmq.asyncSend("road_test", "xdtmq123", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("尝试xdt================================================"); System.out.println("信息发送成功"); System.out.println("sendResult = " + sendResult); } @Override public void onException(Throwable e) { System.out.println("尝试xdt================================================"); System.out.println("信息发送失败"); e.printStackTrace(); } }); roadmq.asyncSend("road_test", "roadmq123", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("尝试road================================================"); System.out.println("信息发送成功"); System.out.println("sendResult = " + sendResult); } @Override public void onException(Throwable e) { System.out.println("尝试road================================================"); System.out.println("信息发送失败"); e.printStackTrace(); } }); return null; }
结果如下:
标签:return,springboot,数据源,rokectMQ,private,org,import,public,String From: https://www.cnblogs.com/tangzeqi/p/18675030