首页 > 编程语言 >Java: RocketMQ事务消息的优雅使用

Java: RocketMQ事务消息的优雅使用

时间:2023-02-17 14:12:54浏览次数:45  
标签:事务 Java RocketMQLocalTransactionState Message 优雅 return message public RocketMQ

背景

在项目中,技术方案需要使用事务消息来保证最终一致性达到实现业务的目的。但在一个服务中有多个业务需要使用事务消息发送不同的消息类型到不同的Topic时,Rocket MQ的本地事务执行方法对这个Case的支持不是很好。
接下来先记录一下基础的MQ事务的使用方式,在记录一下针对上述场景中的代码实现

基础概念

RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。
引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

发送流程

  1. Producer向broker发送半消息(Half Message)

其实半消息本质上就是往 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic发送了一条消息,保存在特殊队列中

  1. Producer得到发送结果响应

    • SEND_OK
    • FLUSH_DISK_TIMEOUT
    • FLUSH_SLAVE_TIMEOUT
    • SLAVE_NOT_AVAILABLE
  2. 根据发送结果执行本地事务,如果失败的话半消息对Consumer不可见,本地事务不执行

  3. 本地事务执行完成后返回状态

    • Commit:本地事务执行成功,消息对Consumer可见,Consumer可以消费消息

      但不保证Consumer一定消费成功,但这一块已经不属于事务消息的范畴了,不做额外处理的话,利用MQ的ack机制,保证最终一致性,注意Consumer要保证幂等性

    • Rollback:本地事务回滚,消息对Consumer不可见,Consumer无法消费

    • Unknown:如果本地事务返回Unknown状态,或者MQ迟迟收不到Producer的返回结果,那MQ会发起回查回调Producer的检查本地事务方法,重新返回状态

      这个属于补偿机制,默认回查15次

基础使用

  • Maven引用
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-version}</version>
        </dependency>
  • Producer
    • Send Transaction Message
      @Component
      public class MessageProducer {
      	@Resource
      	private RocketMQTemplate rocketMQTemplate;
      
      	public void sendTxMessage() {
      		Message<BusinessMessage> message = MessageBuilder
      				.withPayload(new BusinessMessage("Msg"))
      				.build();
      
      		TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("tx_topic",message,null);
      	}
      }
      
    • Local Transaction Listener
      @RocketMQTransactionListener
      @Slf4j
      public class SalesOrderSubmitTxMessageListener implements RocketMQLocalTransactionListener {
      	@Override
      	public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
      		RocketMQLocalTransactionState result;
      
      		try {
      			final BusinessMessage event = getBusinessMessage(message);
      			if (event.val == 1)
      				result = RocketMQLocalTransactionState.UNKNOWN;
      			if (event.val == 2)
      				result = RocketMQLocalTransactionState.ROLLBACK;
      			if (event.val == 3)
      				result = RocketMQLocalTransactionState.COMMIT;
      
      		} catch (Exception ex) {
      			result = RocketMQLocalTransactionState.ROLLBACK;
      			if (!(ex instanceof BizException)) {
      				log.error("Error occurred.", ex);
      			}
      		}
      		return result;
      	}
      
      	@Override
      	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      		RocketMQLocalTransactionState result;
      
      		try {
      			final BusinessMessage event = getBusinessMessage(message);
      			if (event.val == 1)
      				result = RocketMQLocalTransactionState.UNKNOWN;
      			if (event.val == 2)
      				result = RocketMQLocalTransactionState.ROLLBACK;
      			if (event.val == 3)
      				result = RocketMQLocalTransactionState.COMMIT;
      
      		} catch (Exception ex) {
      			result = RocketMQLocalTransactionState.ROLLBACK;
      			if (!(ex instanceof BizException)) {
      				log.error("Error occurred.", ex);
      			}
      		}
      		return result;
      	}
      	/**
      	 * Get event instance from message payload.
      	 */
      	private SalesOrderSubmitMessage getBusinessMessage(Message msg) throws JsonProcessingException {
      
      		final String json = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
      		final ObjectMapper objectMapper = new ObjectMapper();
      
      		return objectMapper.readValue(json, BusinessMessage.class);
      	}
      }
      
  • Consumer
    @Component
    @RocketMQMessageListener(
    		topic = ("tx_topic"),
    		consumerGroup = ("test-tx-message-group")
    )
    @Slf4j
    public class BusinessEventMsgHandler implements RocketMQListener<BusinessMessage> {
    	@Override
    	public void onMessage(BusinessMessage event) {
    		log.info("Consumer success");
    	}
    }
    

从Producer发送事务消息到本地事务执行,再到返回Commit,Consumer可以消费该topic,整个链路完成了。

问题描述

但如果业务A需要发Message A到topic A的事务消息,业务B是需要发Message B到topic B的事务消息,那用一个RocketMQLocalTransactionListener来区分不同的业务消息及不同的topic就很勉强了,所以需要一个通用处理来识别当前执行的本地事务是属于哪个业务的,然后去执行它对应的本地事务逻辑

优雅处理

定义事务处理接口

想要确认一条事务消息属于什么业务有两点

  • 业务消息类型
  • 业务Topic

那不同的业务事务处理器就需要实现这两个接口,这样j就能找到对应的业务处理器

public interface TxMessageProcessor<T> {

    /**
     * Get Tx message type
     *
     * @return {@link Object}
     */
    default Class<T> getMessageType() {
        return (Class<T>) ((ParameterizedType) getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
    }

    /**
     * Get  topic of the corresponding transaction message
     *
     * @return Topic
     */
    String getTopic();

    /**
     * Execute local transaction.
     *
     * @param message Message object {@link Object}
     * @param context Arg of message
     * @return {@link RocketMQLocalTransactionState}
     */
    RocketMQLocalTransactionState executeLocalTransaction(T message, TxProcessContext context);

    /**
     * Check local transaction
     *
     * @param message Message object {@link Object}
     * @return {@link RocketMQLocalTransactionState}
     */
    RocketMQLocalTransactionState checkLocalTransaction(T message);

}

事务通用处理器

@RocketMQTransactionListener
@Slf4j
public class DefaultRocketMQTransactionDispatcher<T> implements RocketMQLocalTransactionListener {

    @Resource
    private List<TxMessageProcessor<T>> processors;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            TxMessageProcessor<T> processor = getMatchProcessor(processors, message);
            T txMessage = getMatchMessage(message, processor.getMessageType());

            TxProcessContext context = (TxProcessContext) o;

            log.info("Start execute transaction,message type: {}, message: {}, context: {} ", txMessage.getClass().getSimpleName(), txMessage, context);
            return processor.executeLocalTransaction(txMessage, context);

        } catch (Exception ex) {
            log.error("Error occurred in executeLocalTransaction.", ex);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        try {
            TxMessageProcessor<T> processor = getMatchProcessor(processors, message);
            T txMessage = getMatchMessage(message, processor.getMessageType());

            log.info("Check local transaction,message type: {}, message: {}", txMessage.getClass().getSimpleName(), txMessage);
            return processor.checkLocalTransaction(txMessage);
        } catch (Exception ex) {
            log.error("Error occurred in checkLocalTransaction.", ex);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    public TxMessageProcessor getMatchProcessor(List<TxMessageProcessor<T>> processors, Message message) {
        String topic = (String) message.getHeaders().get(TxMessageConst.ROCKETMQ_TOPIC);
        String messageType = (String) message.getHeaders().get(TxMessageConst.MESSAGE_TYPE);

        return processors.stream()
                .filter(processor -> processor.getMessageType().getName().equals(messageType))
                .filter(processor -> processor.getTopic().equals(topic))
                .findFirst()
                .orElseThrow();
    }

    private T getMatchMessage(Message message, Class<T> msgType) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        final String json = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
        return mapper.readValue(json, msgType);
    }

}

定义通用常量

public class TxMessageConst {
    public static final String ROCKETMQ_TOPIC = "rocketmq_TOPIC";

    public static final String MESSAGE_TYPE = "message_type";

    public static Message buildTxMessage(Object message) {
        return MessageBuilder
                .withPayload(message)
                .setHeader(TxMessageConst.MESSAGE_TYPE, message.getClass().getName())
                .build();
    }
}

业务处理器

@Component
@Slf4j
public class SubmitMessageProcessor implements TxMessageProcessor<SubmitMessage> {

    @Override
    public String getTopic() {
        return "submit_topic";
    }

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(SubmitMessage message, TxProcessContext context) {
        log.info("Start execute transaction, message: {}, context: {} ", message, context);
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(SubmitMessage message) {
        log.info("Check local transaction, message: {} ", message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

生产者

    public void sendSubmitTxMessage() {

        Message msg = TxMessageConst.buildTxMessage(new SubmitMessage(99999, "I am submit test Msg", "Submit Message Description:1"));

        TxProcessContext context = new TxProcessContext(new Object());

        TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction(
                "submit_topic",
                msg,
                context);
    }

测试

image
从message的header取出业务消息类型和业务topic,拿到业务处理器,从而执行对应的业务逻辑
image

标签:事务,Java,RocketMQLocalTransactionState,Message,优雅,return,message,public,RocketMQ
From: https://www.cnblogs.com/tanhaoo/p/17127202.html

相关文章

  • java二维数组
    1.查找1)顺序查找SeqSearch.java2)二分查找【二分法,放在算法讲解】2.顺序查找有一个数列:白眉鹰王、金毛狮王、紫衫龙王、青翼蝠王猜数游戏:从键盘中任意输入一个名称,判......
  • javascript的一些基础知识
    随手记录一些javascript的一些基础知识,之前只是简单用到javascript,并没有了解其中的概念。1. JavascriptObject:InJavaScript,almost"everything"isanobject.......
  • java基础巩固-详解泛型
    java泛型(generics)为jdk5引入的新特性,泛型提供了编译时类型安全检测机制,可以在编译时检测到非法的类型。泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参......
  • JAVA 学习笔记(五)
      子类通过方法的重写机制可以隐藏继承父类的方法,把父类的状态和行为改变为子类自己的状态和行为.假如父类中有一个方法myMethod(),一旦子类重写了超类的方法myMethod......
  • JAVA 抽象类
    抽象类 在java语言继承层次结构中,位于上层的类更具有通用性,甚至更加抽象,这些类封装的方法被重写的可能更大.java用关键字  abstract 格式为: [访问控制符]a......
  • 找素数(java)
    什么是素数?质数又称素数。一个大于1的自然数,除了1和它自身外,不能被其他自然数整除的数叫做质数;否则称为合数(规定1既不是质数也不是合数)。实际案例比如我们想找出1-1000......
  • 2023-02-Java面试经历
    2022年12月中旬不幸被裁,拿N+1撤了,临近过年在家休息了一个月,元宵节之前从山西返杭的,2月7号开始投简历,谈谈最近面试的经历吧,期望能在2月底3月初左右入职吧....................
  • 1.1 Java介绍
    1.1Java介绍Java之父:詹姆斯·高斯林(JamesGosling)。Java三大平台:JavaSE:Java语言的(标准版),用于桌面应用的开发,是其他两个版本的基础。JavaME:Java语言的(小型版......
  • 下载JDK8 用Java写了hello world
     跟着b站上狂神说Java自己学的,因为感觉他的视频有一个完整清晰的体系,然后评价很好 一.下载和安装  1.下载  百度进JavaDownloads|Oracle中国直接注册下载......
  • Java 面向对象
    一、什么是面向对象面向对象编程:OOP,Object-OrientedProgramming。本质:以类的方式组织代码,以对象的形式组织(封装)数据。抽象三大特性:封装、继承、多态二、类与......