首页 > 其他分享 >十、事务消息

十、事务消息

时间:2023-07-15 21:11:28浏览次数:32  
标签:事务 生产者 回滚 回查 消息 public

事务消息介绍及流程图

RocketMQ在4.3.0版中已经支持分布式事务消息,是通过二阶段提交事务回查来保证本地事务和发送消息的一致性。事务消息交互流程如下图所示。

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

 

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

RocketMQ4.x版本的事务消息

RocketMQ4.x版本是通过TransactionMQProducer类的sendMessageInTransaction方法发送事务消息,并设置TransactionListener。

public interface TransactionListener {

    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

executeLocalTransaction 是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:

LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息

LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。

LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。checkLocalTransaction回查本地事务状态决定是提交还是回滚。

checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

 

修改生产者MqProducer,增加OldVersionTrsactionMqProducer

@Slf4j
@Component
public class OldVersionTrsactionMqProducer implements InitializingBean, DisposableBean {

    private TransactionMQProducer transactionMQProducer;

    @Value("${rocketmq.namesrv}")
    private String namesrv;




    public SendResult sendTransactionMsg(Message msg, TransactionListener listener) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        transactionMQProducer.setTransactionListener(listener);
        return   transactionMQProducer.sendMessageInTransaction(msg, null);
    }

    @Override
    public void destroy() throws Exception {
        if (transactionMQProducer != null) {
            transactionMQProducer.shutdown();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        transactionMQProducer = new TransactionMQProducer("my-transaction-producer");
        transactionMQProducer.setNamesrvAddr(namesrv);
        transactionMQProducer.start();

    }
}

增加TransactionController:

@Slf4j
@RestController
public class TransactionController {

    @Autowired
    private OldVersionTrsactionMqProducer producer;

    @RequestMapping("/sendTranMsg")
    public List<SendResult> sendTranMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        List<SendResult> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message msg =
                    new Message("MyTopic", "*", String.valueOf(i),
                            ("Hello RocketMQ,Transaction message  " + i).getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.sendTransactionMsg(msg, new TransactionListener() {
                @Override
                public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                    log.info("executeLocalTransaction。。。执行本地事务");
                    String keys = message.getKeys();
                    int i1 = Integer.parseInt(keys);
                    if (i1 > 5) {
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    } else {
                        return LocalTransactionState.COMMIT_MESSAGE;
                    }

                }

                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                    log.info("checkLocalTransaction。。。执行事务回查");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            list.add(sendResult);

            System.out.printf("%s%n", sendResult);
        }

        return list;
    }
}

executeLocalTransaction方法中判断消息的key是否大于5,若是则回滚,否则提交。

 

接下来修改消费者Mq-Consumer启用OldVersionConsumer类来消费。重启生产者,消费者后,调用http://localhost:8001/sendTranMsg发送事务消息。查看消费者控制台:
 

 
发现只有6条数据。

RocketMQ5.0的事务消息

事务回查是在构造生产者时,用于检查确认异常半事务的中间状态。:

 ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
    producer = clientServiceProvider.newProducerBuilder()
            .setClientConfiguration(new ClientConfigurationBuilder().setEndpoints(proxy).build())
            .setTransactionChecker(new TransactionChecker() {
                @Override
                public TransactionResolution check(MessageView messageView) {
                    log.info("check。。。。执行事务检查");
                    return TransactionResolution.COMMIT;
                }
            }).build();

TransactionChecker用于事务回查。TransactionChecker的check方法返回值TransactionResolution含义与LocalTransactionState相同。之后调用生产者开启事务:

 Transaction transaction = null;
  try {
      transaction = producer.beginTransaction();
  } catch (ClientException e) {
      e.printStackTrace();
      continue;
  }

调用Transaction 进行提交或回滚。

首先新建存储事务消息类型的topic:

mqadmin.cmd updatetopic -n localhost:9876 -t TranTopic -c DefaultCluster -a +message.type=TRANSACTION

-n localhost:9876指定namesrv地址,-t TranTopic指定topic名字是TranTopic,-c DefaultCluster指定集群是DefaultCluster,-a +message.type=TRANSACTION指定topic存储事务消息。

 

修改生产者MqProducer增加TranController:

@Slf4j
@RestController
public class TranController implements InitializingBean, DisposableBean {

    private Producer producer;


    @Value("${rocketmq.proxy}")
    private String proxy;


    @Override
    public void destroy() throws Exception {
        if (producer != null) {
            producer.close();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
        producer = clientServiceProvider.newProducerBuilder()
                .setClientConfiguration(new ClientConfigurationBuilder().setEndpoints(proxy).build())
                .setTransactionChecker(new TransactionChecker() {
                    @Override
                    public TransactionResolution check(MessageView messageView) {
                        log.info("check。。。。执行事务检查");
                        return TransactionResolution.COMMIT;
                    }
                }).build();
    }

    @RequestMapping("/sengV5TranMsg")
    public List<SendReceipt> sengV5TranMsg() throws ClientException {
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        List<SendReceipt> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Transaction transaction = null;
            try {
                transaction = producer.beginTransaction();
            } catch (ClientException e) {
                e.printStackTrace();
                continue;
            }

            messageBuilder.setTopic("TranTopic")
                    .setBody(("Transaction Message " + i).getBytes(StandardCharsets.UTF_8));
            SendReceipt send = producer.send(messageBuilder.build(), transaction);
            list.add(send);

            // 执行本地事务
            log.info("执行本地事务");


            if (i > 7) {
                // 回滚
                transaction.rollback();
            } else {
                // 提交
                transaction.commit();
            }
        }

        return list;
    }

}

修改消费者订阅TranTopic topic,之后重启生产者和消费者。访问http://localhost:8001/sengV5TranMsg发送事务消息。查看消费者控制台:
 

 

RocketMQ事务消息原理

事务消息相对普通消息最大的特点就是半事务消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是半事务消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。

那如何实现消息回查?Broker会启动一个消息回查的定时任务,定时从事务消息queue中读取所有待反查的消息。针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求。然后根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。最后,提交或者回滚事务,将半消息标记为已处理状态【将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)】。 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中; 如果是回滚事务,则什么都不做。

rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

使用限制

  • 消息类型一致性(RocketMQ5.0)

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

  • 消费事务性

Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  • 中间状态可见性

Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  • 事务超时机制

Apache RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

标签:事务,生产者,回滚,回查,消息,public
From: https://www.cnblogs.com/shigongp/p/17556058.html

相关文章

  • .NET个人博客-使用Back进行消息推送
    使用Back推送消息到你的iPhone前言我的好友看了我的博客,给我提了个需求,让我搞个网站通知,我开始以为就是评论回复然后发送邮件通知。不过他告诉我网站通知是,当有人评论或者留言后,会通知到我这边来,消息是实时通知的,他说用的是Back,不需要发邮件,然后发了个GitHub链接给我,我觉得还不......
  • 关于 SAP ABAP 事务码 SM30 里的 Restrict Data Range
    SAPABAP事务码SM30里的RestrictDataRange区域的Enterconditions和Variant这两个选项有什么作用?SAPABAP中的SM30事务码用于维护表的条目。在使用SM30事务时,RestrictDataRange区域允许用户定义一些限制条件,可以帮助缩小查询或更改的数据范围。这对于大型表......
  • SQL注入问题、视图、触发器、事务、存储过程、函数、流程控制、索引、测试索引
    SQL注入问题连接MySQL服务器conn=pymysql.connect(host=‘127.0.0.1’port=3306user=‘root’password='1234'......
  • WPF 实现 Message 消息提醒控件
    WPF实现Message消息提醒控件控件:Message作者:WPFDevelopersOrg-驚鏵原文链接:https://github.com/WPFDevelopersOrg/WPFDevelopers框架使用.NET4至.NET6;VisualStudio2022;接着上一篇1)新增MessageListBoxItem.cs代码如下:新增了名为MessageTy......
  • Vue3 webSocket收到消息改变响应式全局对象从而实时改变界面
    需求在main.js中创建一个 响应式全局对象。通过WebSocket收到消息改变这个全局对象时,子组件应同步响应。效果:这几个标签框绑定的全局对象json main.js定义 响应式全局对象//全局对象constglobalData=reactive({extTelMonitorData:[{title:......
  • mysql8 索引、视图、事务、存储过程、触发器
    一、视图1、2、 二、触发器1、 三、事务(重要)1、  四、存储过程1、  五、函数1、  六、流程控制1、  七、索引(重要)1、 ......
  • python之数据库:SQL注入问题,视图,触发器,事务,存储过程,函数,流程控制,索引,慢查询
    SQL注入问题(了解现象)importpymysql#连接MySQL服务端conn=pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123',database='db8_3',charset='utf8',autocommit=True#......
  • rabbitMQ消息可靠性
    rabbitMQ消息可靠性 rabbitMQ交换机不负责消息持久化消息存储到队列中才可以手动开启持久化 生产者端:如何保证消息一定可以送达Exchange消费者端:自动ACK手动ACK RabbitMQ保证消息可靠性:1.保证消息一定可以送达Exchangeconfirm机制......
  • java代码向stream消息队列发送消息失败
    如何实现Java代码向Stream消息队列发送消息失败作为一名经验丰富的开发者,您可以教会刚入行的小白如何实现Java代码向Stream消息队列发送消息失败。本文将按照以下流程展示步骤,并提供相应的代码和注释。流程图以下是实现该功能的整体流程图:步骤动作1.创建Stream连接......
  • java回滚已提交的事务
    Java回滚已提交的事务在Java中,事务是一组数据库操作的逻辑单元,它要么全部成功执行,要么全部失败回滚。通常情况下,事务会被提交,也就是将数据库的更改持久化到磁盘上。然而,有时候我们可能需要撤销已提交的事务,这就是事务回滚。事务回滚的概念事务回滚是指将已提交的事务的所有更改......