有序消息:所有信息往mq中,在broker.conf配置文件中指定产生队列数量。
如果是普通队列时,所有消息,会分发到默认队列的各个队列中。是无序的。
有序则是,所有消息发送,都指定一个队列进行发送,而消费的消息也是一个接着一个消费。
主要应用场景:比如商品库存扣减,银行余额处理,订单处理结果等
[root@localhost ~]# mqadmin consumerProgress -g GID-LOG RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime %RETRY%GID-LOG broker-a 0 19 19 N/A 0 2022-10-24 20:37:52 TOPIC-SYS-MESSAGE broker-a 0 4356 4356 N/A 0 2022-11-14 14:16:05 TOPIC-SYS-MESSAGE broker-a 1 4370 4370 N/A 0 2022-11-14 14:16:05 TOPIC-SYS-MESSAGE broker-a 2 4371 4371 N/A 0 2022-11-14 14:18:52 TOPIC-SYS-MESSAGE broker-a 3 4363 4363 N/A 0 2022-11-14 14:18:52 TOPIC-SYS-MESSAGE broker-a 4 4372 4372 N/A 0 2022-11-14 14:16:05 TOPIC-SYS-MESSAGE broker-a 5 4358 4358 N/A 0 2022-11-14 14:18:52 TOPIC-SYS-MESSAGE broker-a 6 4354 4354 N/A 0 2022-11-14 14:15:36 TOPIC-SYS-MESSAGE broker-a 7 4338 4338 N/A 0 2022-11-14 14:16:04
消息生产者:
主要是使用发送的方法是这个syncSendOrderly,指定accountId 发送
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.ccjr.api.utils.MqUtils; import com.ccjr.base.BaseMqRequest; import com.ccjr.commons.constants.RocketMqConstants; import lombok.extern.slf4j.Slf4j; /** * @desc:业务系统订单计算 * @author 陈惟鲜 * @date 2021年7月27日 下午2:01:02 */ @Slf4j @Component public class OrderHandleSender { @Autowired private RocketMQTemplate rocketMQTemplate; /**发送订单回报更新*/ public void sendMessage(BaseMqRequest baseMqRequest) { // 发送前调用 String jsonMsg = JSON.toJSONString(baseMqRequest); String destination = RocketMqConstants.TOPIC_A + ":" + RocketMqConstants.TAG_BIZ_ORDER_A; String accountId = baseMqRequest.getAccountId(); SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, MessageBuilder.withPayload(jsonMsg).build(), accountId); log.info("sendMessage to topic {} sendResult={}", destination, sendResult); } }
消息消费者:
指定消费模式:
consumeMode = ConsumeMode.ORDERLY
import java.util.Date; import org.apache.commons.lang.StringUtils; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; /** * @desc:2、委托交易完成后通知订单处理 * @author 陈惟鲜 * @date 2021年7月27日 下午1:47:31 */ @Slf4j @Component @RocketMQMessageListener(topic = RocketMqConstants.TOPIC_A, consumerGroup = RocketMqConstants.G_TAG_BIZ_ORDER_A, selectorExpression = RocketMqConstants.TAG_BIZ_ORDER_A, consumeMode = ConsumeMode.ORDERLY) public class OrderHandleListener implements RocketMQListener<String> { @Override public void onMessage(String messageContext) { Date startTime = new Date(); BaseMqRequest baseRequest = null; Exception ex = null; EntrustOrderResultDTO entrustOrderResultDTO = null; // 合并回报汇总 try { String msg = "订单更新账单监听"+RocketMqConstants.TAG_BIZ_ORDER_HANDLE_RESULT+" 订单更新执行:\n"+messageContext; log.info(msg); }catch(Exception e) { log.error("接收处理异常", e); ex = e; }finally { if (baseRequest != null) { LogVo logVo = logSaveHelper.initLogVoOrderHandleListener(baseRequest, ex, entrustOrderResultDTO, startTime); // 入库mongodb logMongoHelper.saveLog(logVo, MongoNameConstants.MG_LOG_BIZ); } } } }
标签:TOPIC,14,broker,发送,springboot2,2022,org,import,rocketMq From: https://www.cnblogs.com/a393060727/p/16888974.html