首页 > 其他分享 >最大努力通知【分布式事务解决方案】

最大努力通知【分布式事务解决方案】

时间:2024-08-02 18:28:15浏览次数:16  
标签:事务 accountPay 解决方案 充值 MQ 消息 通知 接收 分布式

优质博文:IT-BLOG-CN

一、概述

最大努力通知也是一种解决分布式事务的方案,下面是一个充值的例子:

交互流程:
【1】账户系统调用充值系统接口;
【2】充值系统完成支付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进行重复通知;
【3】账户系统接收到充值结果通知修改充值状态[实时性与数据一致性可以延迟];
【4】账户系统未接收到通知会主动调用充值系统的接口查询充值结果[当用户查询状态时,状态为失败时,回查充值系统]

通过上边的例子我们总结最大努力通知方案的目标: 发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括:
【1】有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
【2】消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同:
【1】解决方案思想不同: 可靠消息一致性发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知给接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
【2】两者的业务应用场景不同: 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
【3】技术解决方向不同: 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)

二、解决方案

方案一: 本方案是利用 MQ的 ack机制由 MQ向接收通知方发送通知,流程如下:

【1】发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。 注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果;
【2】接收通知方监听 MQ;
【3】接收通知方接收消息,业务处理完成回应ack;
【4】接收通知方若没有回应ack 则 MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用 RocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限;
【5】接收通知方可通过消息校对接口来校对消息的一致性[幂等性];

方案二: 本方案也是利用 MQ的 ack机制,与方案一不同的是应用程序向接收通知方发送通知,如下图:

交互流程如下:
【1】发起通知方将通知发给MQ。 使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ;
【2】通知程序监听 MQ,接收 MQ的消息。 方案一中接收通知方直接监听 MQ,方案二中由通知程序监听 MQ。通知程序若没有回应 ack则 MQ会重复通知。
【3】通知程序通过互联网接口协议(如Http、WebService)调用接收通知方案接口,完成通知。 通知程序调用接收通知方案接口成功就表示通知成功,即消费 MQ消息成功,MQ将不再向通知程序投递通知消息;
【4】接收通知方可通过消息校对接口来校对消息的一致性。

方案一和方案二的不同点:
1、方案一中接收通知方与 MQ对接,即接收通知方监听 MQ,此方案主要应用与内部应用之间的通知。
2、方案二中由通知程序与 MQ对接,通知程序监听MQ,收到 MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

三、RocketMQ实现最大努力通知型事务

业务说明: 本实例通过 RocketMQ中间件实现最大努力通知分布式事务,模拟充值过程。 本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是 bank1数据库,其中有张三账户。充值系统的数据库使用 bank1_pay数据库,记录了账户的充值记录。 业务流程如下图:

交互流程如下:
【1】用户请求充值系统进行充值;
【2】充值系统完成充值将充值结果发给MQ;
【3】账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额;
【4】账户系统也可以主动查询充值系统的充值结果查询接口,增加金额;

导入 Maven依赖:

<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq‐spring‐boot‐starter</artifactId> 
    <version>2.0.2</version> 
</dependency>

配置 RocketMQ: 在 application-local.propertis中配置 RocketMQ nameServer地址及生产组:

rocketmq.producer.group = producer_bank2 
rocketmq.name‐server = 127.0.0.1:9876

【核心代码】: 支付系统服务端实现,发送通知消息服务。实现如下功能:
【1】充值接口;
【2】充值完成要通知;
【3】充值结果查询接口;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
 
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {
 
    @Autowired
    AccountPayDao accountPayDao;
 
    @Autowired
    RocketMQTemplate rocketMQTemplate;
 
    //插入充值记录
    @Transactional
    @Override
    public AccountPay insertAccountPay(AccountPay accountPay) {
        int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
        if(success>0){
            //发送通知,使用普通消息发送通知
            accountPay.setResult("success");
            rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
            return accountPay;
        }
        return null;
    }
 
    //查询充值记录,接收通知方调用此方法来查询充值结果
    @Override
    public AccountPay getAccountPay(String txNo) {
        AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
        return accountPay;
    }
}

核心代码: 被通知服务 Service端代码实现如下:

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
 
    @Autowired
    AccountInfoDao accountInfoDao;
 
    @Autowired
    PayClient payClient;
 
    //更新账户金额
    @Override
    @Transactional
    public void updateAccountBalance(AccountChangeEvent accountChange) {
        //幂等校验
        if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){
            return ;
        }
        int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
        //插入事务记录,用于幂等控制
        accountInfoDao.addTx(accountChange.getTxNo());
    }
 
    //远程调用查询充值结果,这里的事务ID指的是消息的唯一ID,如果是事务生成的ID,消息接收端在没有收到消息的时候是不可能知道的。
    @Override
    public AccountPay queryPayResult(String tx_no) {
 
        //远程调用
        AccountPay payresult = payClient.payresult(tx_no);
        if("success".equals(payresult.getResult())){
            //更新账户金额
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号
            accountChangeEvent.setAmount(payresult.getPayAmount());//金额
            accountChangeEvent.setTxNo(payresult.getId());//充值事务号
            updateAccountBalance(accountChangeEvent);
        }
        return payresult;
    }
}

核心代码: 被通知服务监听类代码如下:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
 
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {
 
    @Autowired
    AccountInfoService accountInfoService;
 
    //接收消息
    @Override
    public void onMessage(AccountPay accountPay) {
        log.info("接收到消息:{}", JSON.toJSONString(accountPay));
        if("success".equals(accountPay.getResult())){
            //更新账户金额
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(accountPay.getAccountNo());
            accountChangeEvent.setAmount(accountPay.getPayAmount());
            accountChangeEvent.setTxNo(accountPay.getId());
            accountInfoService.updateAccountBalance(accountChangeEvent);
        }
        log.info("处理消息完成:{}", JSON.toJSONString(accountPay));
    }
}

四、小结

最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务; 最大努力通知方案需要实现如下功能:
【1】消息重复通知机制;
【2】消息校对机制;

标签:事务,accountPay,解决方案,充值,MQ,消息,通知,接收,分布式
From: https://blog.csdn.net/zhengzhaoyang122/article/details/140859324

相关文章

  • 淘客返利系统中的分布式事务处理与保障一致性的方案
    淘客返利系统中的分布式事务处理与保障一致性的方案大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何在淘客返利系统中处理分布式事务,并确保数据的一致性。分布式事务处理是微服务架构中的一个关键问题,它涉及到多个服务和数据库的......
  • 从传统监控到智能化升级:EasyCVR视频汇聚平台的一站式解决方案
    随着科技的飞速发展和社会的不断进步,视频监控已经成为现代社会治安防控、企业管理等场景安全管理中不可或缺的一部分。而在视频监控领域,EasyCVR视频汇聚平台凭借其强大的多协议接入能力,在复杂多变的网络环境中展现出了卓越的性能和广泛的应用前景。一、EasyCVR视频汇聚平台的多协......
  • 从传统监控到智能化升级:EasyCVR视频汇聚平台的一站式解决方案
    随着科技的飞速发展和社会的不断进步,视频监控已经成为现代社会治安防控、企业管理等场景安全管理中不可或缺的一部分。而在视频监控领域,EasyCVR视频汇聚平台凭借其强大的多协议接入能力,在复杂多变的网络环境中展现出了卓越的性能和广泛的应用前景。一、EasyCVR视频汇聚平台的多协......
  • [分布式]并发访问
    分布式系统设计中的并发访问解决方案|得物技术引言随着互联网信息技术的飞速发展,数据量不断增大,业务逻辑也日趋复杂,对系统的高并发访问、海量数据处理的场景也越来越多。如何用较低成本实现系统的高可用、易伸缩、可扩展等目标就显得越发重要。为了解决这一系列问题,系统架构也......
  • 探索在线教育平台开发:需求分析与云朵课堂解决方案
    一、深入剖析在线教育平台开发需求分析核心功能模块化讲师端:聚焦于教学创新与互动体验。支持高清视频直播、音频直播及图文教程发布;实时互动功能包括文字聊天、语音/视频连麦,以及教学白板与外设接入,让课堂生动有趣。此外,讲师还需具备课程管理、作业布置与批改、随堂测试设计、......
  • FlexibleBI工业智能质检系统:提升质量管理与生产效率的智能解决方案
    在现代制造业中,质量管理是至关重要的一环。我们的工业智能质检系统专注于通过人工智能赋能的预测分析,为客户提供全方位的质量控制和尺寸分析工具。该系统类似于市面上的高端质检软件,但我们强调的是完全自主可控和国产化的三坐标测量机(CMM)尺寸公差质量管理工具。本文将为您详细......
  • 常见的内存泄漏及其解决方案
    内存泄漏是Java开发中一个常见且令人头疼的问题,即使在使用垃圾回收机制的Java中,也无法完全避免内存泄漏的出现。当对象不再需要时却仍然占据着内存,导致内存使用量不断增加,最终可能导致OutOfMemoryError。本文将深入探讨Java中常见的内存泄漏及其解决方案,附带详细的代码示例,帮......
  • maven 常见问题及解决方案
    1.resolutionwillnotbereattempteduntiltheupdateintervalofnexus强制更新mvncleaninstall-U2.Couldnotfindartifact如果可以通过其他途径获取到相关的jar包,可以把jar包安装到本地仓库:示例:demo.jar包上传后,项目中设置的依赖为<dependency><gr......
  • Instrospect 推出全球首个 GDDR7 显存测试系统测试解决方案
    固态技术协会JEDEC于3月6日正式发布JESD239GDDR7显存标准,JESD239GDDR7提供的带宽是GDDR6的两倍,每台设备最高可达192GB/s。JESD239GDDR7是第一个使用脉幅调制(PulseAmplitudeModulation,PAM)接口进行高频操作的JEDEC标准DRAM。其PAM3接口提高了高频操......
  • 【问题解决方案】npm install报错问题:npm ERR! - 多种解决方案,总有一种可以解决
    @[toc]1.问题重述安装package.json里面的包,使用npminstall但是报错2.解决方案方案1.确认根目录正确确认自己的目录是根目录(也就是处于./package.json可以找到的位置)例如--根目录----package.json----其他文件----其他文件方案2.确认文件名正确确认自己的pack......