首页 > 其他分享 >并发扣减库存不使用分布式锁用幂等性怎么实现

并发扣减库存不使用分布式锁用幂等性怎么实现

时间:2023-07-05 16:35:25浏览次数:48  
标签:arr 库存 扣减 锁用 self item id 分布式

扣减库存接口

 /**
     * Notes:修改或删除库存信息  复核提交,移位完成,调拨完成 后触发
     * 原始库位扣减操作
     * User: sl
     * Date: 2023-04-11 17:31
     * @param $data
     * @param $type 1,复核完成   2调拨出库完成后扣减   3,源库位移位完成后
     * 移位不记录日志
     * @return bool
     **************二位数组*************************
     * [["id"=>"库存id","reduceTotalQty"=>"减少的库存总数量"]]
     *
     * [["id"=>"1","reduceTotalQty"=>"50"]]
     */
    public static function updateOrDelStock($operator,$type,$data=[])
    {
        \Log::channel("stockLock")->info("---------------修改库存信息------------------");
        \Log::channel("stockLock")->info(sprintf("操作人:%s",json_encode($operator,JSON_UNESCAPED_UNICODE)));
        \Log::channel("stockLock")->info(sprintf("参数:%s",json_encode($data)));
        $arr = [];
        foreach($data as $item){
            if(!isset($arr[$item["id"]])){
                $arr[$item["id"]] = 0;
            }
            $arr[$item["id"]] += $item["reduceTotalQty"];
        }
        $stockIds = array_keys($arr);
        $stockList = StockModel::getStockListByids($stockIds);
        $stockList = arrayChangeKeyByField($stockList,"id");
        try{
            self::startTransaction();
            foreach($stockList as $id=>$stockInfo){
                $reduceQty = $arr[$id] ?? 0;
                if($reduceQty <= 0){
                    continue;
                }
                if($stockInfo["total_qty"] < $reduceQty){
                    throw new InvalidRequestException(sprintf("库存id:%s,扣减库存失败,扣减数量不能大于库存总数量",$id));
                }

                if($stockInfo["useable_qty"] < $reduceQty){
                    throw new InvalidRequestException(sprintf("库存id:%s,扣减库存失败,扣减数量不能大于库存可用数量",$id));
                }

                $totalQty = $stockInfo["total_qty"] - $reduceQty;
                $useableQty = $stockInfo["useable_qty"] - $reduceQty;
                //开始扣减库存
                $update=[];
                $update["total_qty"] = $totalQty;
                $update["useable_qty"] = $useableQty;
                $update["update_uid"] = $operator["operator_id"] ?? 0;
                $update["update_name"] = $operator["operator_name"] ?? "";
                $update["update_time"] = time();
                $update["amount"] = \DB::raw("ROUND(purchase_prices*total_qty,2)");
                $update["standard_money_amount"] = \DB::raw("ROUND(standard_money_prices*total_qty,2)");
                $update["purchase_withoutamount"] = \DB::raw("ROUND(purchase_without_tax_price*total_qty,2)");
                StockModel::where("id",$id)->update($update);
//                if($totalQty ==  0  && $useableQty == 0){
//                    //进行库存汇总
//                    self::updateOrCreateStockSummary([$id]);
//                    //删除
//                    StockModel::where("id",$id)->delete();
//
//                }

                //进行库存汇总
                self::updateOrCreateStockSummary([$id]);
                //删除
                if($totalQty ==  0  && $useableQty == 0){
                    StockModel::where("id",$id)->delete();
                }

            }
            self::commitTransaction();
        }catch (\Throwable $e){
            \Log::channel("stockLock")->info(json_encode(ErrMsg::getExceptionInfo($e)));
            self::rollBackTransaction();
            throw new InvalidRequestException($e->getMessage());
        }

    }

上述红色代码是扣减库存,在并发或者重复请求,或者请求超时情况下会出现重复扣减库存的情况,

库存原来只有型号A 8个  现在要扣减8个库存,第一次扣减8个可能请求超时或者其他原因,扣减接口重试了一次,第二次请求又继续扣减库存,上述接口两个扣减都成功了;

 

修改后代码:

/**
     * Notes:修改或删除库存信息  复核提交,移位完成,调拨完成 后触发
     * 原始库位扣减操作
     * User: sl
     * Date: 2023-04-11 17:31
     * @param $data
     * @param $type 1,复核完成   2调拨出库完成后扣减   3,源库位移位完成后
     * 移位不记录日志
     * @return bool
     **************二位数组*************************
     * [["id"=>"库存id","reduceTotalQty"=>"减少的库存总数量"]]
     *
     * [["id"=>"1","reduceTotalQty"=>"50"]]
     */
    public static function updateOrDelStock($operator,$type,$data=[])
    {
        \Log::channel("stockLock")->info("---------------修改库存信息------------------");
        \Log::channel("stockLock")->info(sprintf("操作人:%s",json_encode($operator,JSON_UNESCAPED_UNICODE)));
        \Log::channel("stockLock")->info(sprintf("参数:%s",json_encode($data)));
        $arr = [];
        foreach($data as $item){
            if(!isset($arr[$item["id"]])){
                $arr[$item["id"]] = 0;
            }
            $arr[$item["id"]] += $item["reduceTotalQty"];
        }
        $stockIds = array_keys($arr);
        $stockList = StockModel::getStockListByids($stockIds);
        $stockList = arrayChangeKeyByField($stockList,"id");
        try{
            self::startTransaction();
            foreach($stockList as $id=>$stockInfo){
                $reduceQty = $arr[$id] ?? 0;
                if($reduceQty <= 0){
                    continue;
                }
                if($stockInfo["total_qty"] < $reduceQty){
                    throw new InvalidRequestException(sprintf("库存id:%s,扣减库存失败,扣减数量不能大于库存总数量",$id));
                }

                if($stockInfo["useable_qty"] < $reduceQty){
                    throw new InvalidRequestException(sprintf("库存id:%s,扣减库存失败,扣减数量不能大于库存可用数量",$id));
                }

                $totalQty = $stockInfo["total_qty"] - $reduceQty;
                $useableQty = $stockInfo["useable_qty"] - $reduceQty;
                //开始扣减库存
                $update=[];
                $update["total_qty"] = \DB::raw("total_qty-{$reduceQty}");
                $update["useable_qty"] = \DB::raw("useable_qty-{$reduceQty}");;
                $update["update_uid"] = $operator["operator_id"] ?? 0;
                $update["update_name"] = $operator["operator_name"] ?? "";
                $update["update_time"] = time();
                $update["amount"] = \DB::raw("ROUND(purchase_prices*total_qty,2)");
                $update["standard_money_amount"] = \DB::raw("ROUND(standard_money_prices*total_qty,2)");
                $update["purchase_withoutamount"] = \DB::raw("ROUND(purchase_without_tax_price*total_qty,2)");
                $bk = StockModel::where("id",$id)->where("total_qty",$stockInfo["total_qty"])->where("useable_qty",$stockInfo["useable_qty"])->update($update);
                if(!$bk){
                    throw new InvalidRequestException(sprintf("库存id:%s,扣减库存失败:可能存在相同库存数据重复扣减或者并发扣减情况",$id));
                }

                //进行库存汇总
                self::updateOrCreateStockSummary([$id]);
                //删除
                if($totalQty ==  0  && $useableQty == 0){
                    StockModel::where("id",$id)->delete();
                }

            }
            self::commitTransaction();
        }catch (\Throwable $e){
            \Log::channel("stockLock")->info(json_encode(ErrMsg::getExceptionInfo($e)));
            self::rollBackTransaction();
            throw new InvalidRequestException($e->getMessage());
        }

    }

 

标签:arr,库存,扣减,锁用,self,item,id,分布式
From: https://www.cnblogs.com/sunlong88/p/17528909.html

相关文章

  • HDFS集群搭建:完全分布式
    主要区别一就是各个角色在哪儿启动,完全分布式也就是各个角色分布在不通的节点上1、基础环境:部署配置NN:core-site.xmlDN:workers:node01SNN:hdfs-site.xmldfs.namenode.secondary.http.addressnode01:500902、角色启动时细节配置:dfs.namenode.name.dirdfs.datanode.data.......
  • 分布式锁
    zk分布式锁海豚调度通过zk来做分布式锁,保证同一个时刻只有一台Master的scheduler来执行或者只有一台worker执行任务的提交1.zk分布式锁核心流程算法2.海豚调度中分布式锁算法scheduler线程分布式锁实现流程图......
  • HDFS集群搭建:伪分布式
    HDFS集群搭建:伪分布式参考网址:hadoop官网前期准备:JAVA环境+SSH,hadoop用java开发,java移动性好,C++移植性好。问题:ssh远程登录有个弊端:通过SSH远程登录启动其JVM进程,由于SSH远程执行的时候是不会加载profile文件里面的环境变量的实操论证:在node1的profile中创建一个环境变量BI......
  • 分布式锁解决集群下的方法抢占执行
    问题描述:启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法 分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。分布式锁的解决方案:    sexnx(SETifNoteXists)命令在指定的key不......
  • 数据仓库MPP架构&分布式架构
    数据仓库MPP架构&分布式架构一、MPP架构1.1MPP架构概述MPP(MassivelyParallelProcessing)架构是一种分布式数据处理技术,能够通过将工作负载分散到多个节点上来提高数据处理性能。与传统的共享架构不同,MPP采用非共享架构(ShareNothing),将单机数据库节点组成集群,每个节点拥有独立......
  • 分布式锁
    1mysql唯一键2redislua脚本保证原子性setnxexpire 看门狗保证执行的时间大于锁的时间为了避免其他人误删除值和对应的用户一一对应保证唯一比如值为uid 3redisson 4redis多台奇数master超过一般就获取锁成功 5zk 参考 https://www.cnblogs.com/liuq......
  • 6.29 celery分布式异步任务框架
    1.celery:分步式异步任务框架 /1 异步任务/2 延迟任务/3 定时任务/4 celery架构消息中间件(broker):消息队列:可以使用redis,rabbitmq任务执行单元(worker):执行单元执行提交的任务任务执行结果存储(banckend):可以使用mysql,redis/5安装celery模块:cmd中......
  • 分布式数据库 Join 查询设计与实现浅析
    相对于单例数据库的查询操作,分布式数据查询会有很多技术难题。本文记录Mysql分库分表 和ElasticsearchJoin查询的实现思路,了解分布式场景数据处理的设计方案。文章从常用的关系型数据库MySQL的分库分表Join分析,再到非关系型ElasticSearch来分析Join实现策略。逐步......
  • 基于Redis分布式缓存
    1.安装包使用Redis分布式缓存需要安装Redis的支持包,可以通过nuget命令安装,如下:install-packageMicrosoft.Extensions.Caching.StackExchangeRedis 2.在Program.cs文件中注册builder.Services.AddStackExchangeRedisCache(option=>{option.Configuration......
  • 分布式事务的几种实现方式
    基础理论CAP理论一致性(Consistency):在分布式系统中所有的数据备份,在同一时刻都保持一致状态,如无法保证状态一致,直接返回错误;可用性(Availability):在集群中一部分节点故障,也能保证客户端访问系统并得到正确响应,允许一定时间内数据状态不一致;分区容错性(Partitiontolerance):分布式......