首页 > 其他分享 >分布式事务(二):基于可靠消息的分布式事务

分布式事务(二):基于可靠消息的分布式事务

时间:2023-01-30 18:33:05浏览次数:58  
标签:事务 log 可靠消息 getMessageId 消息 messageId transactionMessageService orders 分布式

项目使用技术

springboot、dubbo、zookeeper、定时任务、消息中间件MQ

一、项目结构

maven父子工程:

父工程:consis

子工程:api-service、order、product、message

api-service:该项目主要是提供接口调用的,还包含实体类、枚举等一些通用内容

order:该项目是专门处理订单相关操作的系统

product:该项目是专门处理产品相关操作的系统

message:该项目是提供消息服务的系统,好包括定时任务

它们的依赖关系如下图:

enter image description here

根据上一篇的原理分别介绍下各个系统的实现

二、order订单系统

核心代码:

@Override
@Transactional
public void add(Orders order) {
    String messageBody = JSONObject.toJSONString( order );
    //添加消息到数据库
    String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME );
    log.info(">>> 预发送消息,消息编号:{}", messageId);
    boolean flag = false;
    boolean success = false;
    try{

        Orders orders = orderDao.saveAndFlush( order );
        //int i = 1/0 ;
        log.info(">>> 插入订单,订单编号:{}", orders.getId());
        flag = true;
    }catch (Exception e){
        transactionMessageService.delete( messageId );
        log.info(">>> 业务执行异常删除消息,消息编号:{}", messageId, e);
        throw new RuntimeException( ">>> 创建订单失败" );
    }finally {
        if(flag){
            try {
                transactionMessageService.confirmAndSend( messageId );
                success = true;
                log.info(">>> 确认并且发送消息到实时消息中间件,消息编号:{}", messageId);

            }catch (Exception e){
                log.error(">>> 消息确认异常,消息编号:{}", messageId, e);
                if(!success){
                    transactionMessageService.delete( messageId );
                    throw new RuntimeException( ">>> 确认消息异常,创建订单失败" );
                }
            }
        }
    }
}

  • 插入订单表之前,首先创建预发送消息,保存到事务消息表中,此时消息状态为:未发送
  • 插入订单,如果插入订单失败则将事务消息表中预发送消息删除
  • 插入订单成功后,修改消息表预发送消息状态为发送中,并发送消息至mq
  • 如果发送消息失败,则订单回滚并删除事务消息表消息

三、message消息系统

核心代码一:

@Override
public void sendMessageToMessageQueue(String queueName,final String messageBody) {

    jmsTemplate.convertAndSend( queueName,messageBody );

    log.info(">>> 发送消息到mq 队列:{},消息内容:{}", queueName, messageBody);
}

  • 主要是activemq生产者讲消息发送至MQ消息中间件

核心代码二:

/**
 * 定时重发消息(每分钟)
 */
@Scheduled(cron = "0 */1 * * * ?")
public void    handler(){
    //查询transaction_message表中已发送但未被删除的消息
    List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING );
    if(list!=null && list.size() > 0){
        for (TransactionMessage message:list){
            try {
                transactionMessageService.retry( message.getMessageId() );
            } catch (Exception e) {
                log.warn(">>> 消息不存在,可能已经被消费,消息编号:{}", message.getMessageId());
            }
        }
    }
}

/**
 * 定时通知工作人员(每隔5分钟)
 */
@Scheduled(cron = "0 */5 * * * ?")
public void    advance(){
    List<Long> messages = transactionMessageService.queryDeadList();
    log.warn(">>> 共有:{}条消息需要人工处理", messages.size());
    String ids = JSONObject.toJSONString( messages );
    //发邮件或者是发送短信通知工作人员处理

}

  • 定时重发消息
  • 定时将死亡的消息通知给工作人员,进行人工补偿操作

四、product产品系统

核心代码:

@Transactional
@JmsListener( destination = Constant.ORDER_QUEUE_NAME)
public void    receiveQueue(String msg){
    boolean flag = false;
    Orders orders = JSONObject.parseObject( msg, Orders.class );
    log.info(">>> 接收到mq消息队列,消息编号:{} ,消息内容:{}", orders.getMessageId(), msg);

    TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() );
    try {
        //保证幂等性
        if(transactionMessage!=null){
            List<OrderDetail> list = orders.getList();
            for(OrderDetail detail : list){
                Product product = productService.findById( detail.getId() );
                Long skuNum = product.getProductSku() - detail.getNum();
                if(skuNum >= 0){
                    product.setProductSku( skuNum );
                    productService.update( product );
                }else {
                    throw new Exception( ">>> 库存不足,修改库存失败!" );
                }

            }
            //int i = 1 /0 ;
            flag = true;
        }

    }catch (Exception e){
        e.printStackTrace();
        throw new RuntimeException( e );
    }finally {
        if(flag){
            transactionMessageService.delete( orders.getMessageId() );
            DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() );
            if(dbLog!=null){
                dbLog.setState( "1" );//已处理成功
                dbLogService.update( dbLog );
            }
            log.info(">>> 业务执行成功删除消息! messageId:{}", orders.getMessageId());
        }
    }

}

  • 从mq消息中间件中监听并消费消息,将json消息转为订单对象
  • 根据消息编号查询该消息是否已被消费,保证幂等性
  • 如果消息未被消费(即存在此消息),则产品表扣减库存;如果已经消费(不存在此消息),则不做处理
  • 产品表扣减库存成功,则删除此消息,如果待处理消息日志表中有此消息,则更改状态为1,表示已处理;扣减失败,则不做处理

该项目源码已上传至github和码云,链接如下,希望喜欢的朋友都能给个star支持一下!谢谢~

github链接: https://github.com/wanglinyong/consis

码云链接: https://gitee.com/wanglinyong/consis

原文出处:

分布式事务(二):基于可靠消息的分布式事务

标签:事务,log,可靠消息,getMessageId,消息,messageId,transactionMessageService,orders,分布式
From: https://blog.51cto.com/u_14014612/6027595

相关文章

  • 多个线程下处理事务
     springboot项目都是声明式事务,在多个线程事务处理时,需要我们使用手动事务管理器@ResourceprivatePlatformTransactionManagerplatformTransactionManager;......
  • Zabbix分布式监控
    一、Zabbix简介1、zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。zabbix能监视各种网络参数,保证服务器系统的安全运营;并提......
  • docker+Jmeter实现分布式压测
    1、先编写一个基础dockerFile命名:jmbase#UseJava8slimJREFROMopenjdk:8-jre-slimMAINTAINERQJP#JMeterversionARGJMETER_VERSION=5.3#Installfewut......
  • 14.6 SQL Server事务日志备份
    SQLServer事务日志备份目录SQLServer事务日志备份简介使用T-SQL创建事务日志备份示例从事务日志备份还原数据库总结简介当数据库的恢复模式为FULL(完整)或BULK_LOGGED(大......
  • 基于XXL-JOB实现分布式任务调度的实现
    背景笔者以前在电商公司,我们需要在8月18号做大促活动,我们会提前一天给所有的用户推送活动信息,且需要根据用户画像生成不同的推送内容。当时我们总共有80万用户左右。经测试,......
  • 分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式
    本地消息表模式本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消......
  • 分布式协议与算法-Raft算法
    本文总结自:极客时间韩健老师的分布式协议与算法实战课程。大家都知道,Raft算法属于Multi-Paxos算法,它是在Multi-Paxos思想的基础上,做了一些简化和限制。关于Paxos算法,博主......
  • 学习笔记——redis事务、乐观锁、悲观锁
    2023-01-29一、redis事务与乐观锁相关命令1、redis事务(1)redis事务的含义redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序执行。事务在执行过程中,不......
  • 技术汇总:第六章:分布式自增长ID
    packageutil;importjava.lang.management.ManagementFactory;importjava.net.InetAddress;importjava.net.NetworkInterface;/**名称:IdWorker.java描述:分布......
  • Seata源码结构及事务模式介绍
    1.Seata是什么Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式......