首页 > 编程语言 >领域驱动模型DDD(四)——Eventuate Tram Saga源码讲解

领域驱动模型DDD(四)——Eventuate Tram Saga源码讲解

时间:2023-09-13 11:23:50浏览次数:44  
标签:调用 return Tram 步骤 Eventuate step 源码 compensating public

前言

虽然本人一直抱怨《微服务架构设计模式》中DDD模式下采用的Eventuate Tram Saga不算简单易用,但是为了更加深入了解原文作者的设计思路,还是花了点时间去阅读源码,并且为了自己日后自己返回来看的懂,就斗胆地对整个Eventuate Tram Saga从注册到执行的代码运行流程进行注释解读下,其中若是有什么错误疏漏以及需要改进的地方,希望各位在评论区指正。

源码讲解

1:Saga流程如何被记录注册

1-1 CreateOrderSaga类

  //1-1构建流程,为了便以后续讲解,我们将一个step到下一个step之间的所有方法概括成一个“步骤”
  public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {
      private SagaDefinition<CreateOrderSagaData> sagaDefinition =
              //初始化起点,调用1-2的step
              step()
              //调用1-3 的invokeLocal方法,this::reject是一个输入CreateOrderSagaData类型参数返回CommandWithDestination的方法
                .invokeLocal(this::create)
                .withCompensation(this::reject)
              //步骤一
              //调用1-2step
              .step()
              //调用1-3 的invokeParticipant方法
                .invokeParticipant(this::reserveCredit)
                .onReply(CustomerNotFound.class, this::handleCustomerNotFound)
                .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)
              //步骤二
              //调用1-6step
              .step()
                .invokeLocal(this::approve)
              .build();
  }

1-2 step初始化方法(saga流程的起点)

  //1-2创建sagaDefinition的源码:因为实现了SimpleSaga,SimpleSaga默认方法
  //此处的step对应1-1开头调用的step,1-1中后面的两个step并不调用此方法
  default StepBuilder<Data> step() {
    SimpleSagaDefinitionBuilder<Data> builder = new SimpleSagaDefinitionBuilder<>();
    return new StepBuilder<>(builder);
  }

1-3 StepBuilder类

  //1-3StepBuilder内部方法:
  public class StepBuilder<Data>{
      private final SimpleSagaDefinitionBuilder<Data> parent;
      //初始化时,这里parent被赋值为一个new SimpleSagaDefinitionBuilder;之后会用上第一次创建SimpleSagaDefinitionBuilder
      public StepBuilder(SimpleSagaDefinitionBuilder<Data> builder) {
        this.parent = builder;
      }
     //此处第一次时传入1-1中的this::create
      public LocalStepBuilder<Data> invokeLocal(Consumer<Data> localFunction) {
        return new LocalStepBuilder<>(parent, localFunction);
      }
      //此处传入1-1中的this::reserveCredit方法
      public InvokeParticipantStepBuilder<Data> invokeParticipant(Function<Data, CommandWithDestination> action) {
        //调用1-6的withAction方法
        return new InvokeParticipantStepBuilder<>(parent).withAction(Optional.empty(), action);
      }
  }

1-4 LocalStepBuilder类

  //1-4根据CreateOrderSaga 中的流程可得知,后续invokeLocal方法:
  public class LocalStepBuilder<Data>  {
     private final SimpleSagaDefinitionBuilder<Data> parent;
     private final Consumer<Data> localFunction;
     private Optional<Consumer<Data>> compensation = Optional.empty();
     //设置父节点,设置执行方法
     public LocalStepBuilder(SimpleSagaDefinitionBuilder<Data> parent, Consumer<Data> localFunction) {
        this.parent = parent;
        this.localFunction = localFunction;
     }
     //设置补偿方法,传入1-1中的this::reject
     public LocalStepBuilder<Data> withCompensation(Consumer<Data> localCompensation) {
        this.compensation = Optional.of(localCompensation);
        return this;
     }
     //再次调用step方法,此处的step方法对应的是1-1中后面的两个step方法    
     public StepBuilder<Data> step() {
        //调用1-5中的方法,将step之前接收的每个“步骤”放入到SimpleSagaDefinitionBuilder的List中
        parent.addStep(makeLocalStep());
        //创建一个新的“步骤”并承接之前的“步骤”
        return new StepBuilder<>(parent);
     }
     //创建一个“步骤”
     private LocalStep<Data> makeLocalStep() {
        return new LocalStep<>(localFunction, compensation, localExceptionSavers, rollbackExceptions);
    }
    //对用1-1中的build方法,将最后一个“步骤”放入流程列表中,调用1-5中的方法创建一个SimpleSagaDefinition对象
     public SagaDefinition<Data> build() {
        //添加最后一个“步骤”并调用1-5中build()方法
        parent.addStep(makeLocalStep());
        return parent.build();    
    }
  }

1-5 SimpleSagaDefinitionBuilder类

  //1-5
  public class SimpleSagaDefinitionBuilder<Data> {
      //存放“步骤”列表
      private List<SagaStep<Data>> sagaSteps = new LinkedList<>();
      //添加“步骤节点”到列表中
      public void addStep(SagaStep<Data> sagaStep) {
        sagaSteps.add(sagaStep);
      }
      //将所有步骤节点整合构建整个大“流程业务”
      public SagaDefinition<Data> build() {
    //将整个“步骤”节点列表传入2-3List<Step> steps内
        return new SimpleSagaDefinition<>(sagaSteps);
      }
  }

1-6 InvokeParticipantStepBuilder类

   //1-6
   public class InvokeParticipantStepBuilder<Data> implements WithCompensationBuilder<Data> {
      private Optional<ParticipantInvocation<Data>> action = Optional.empty();
    
      public InvokeParticipantStepBuilder(SimpleSagaDefinitionBuilder<Data> parent) {
        this.parent = parent;
      }
      //调用1-7的构造函数,1-1的this::reserveCredit方法传给ParticipantInvocationImpl构建出上面Optional<ParticipantInvocation<Data>> action
      InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate, Function<Data, CommandWithDestination> action) {
        this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, action));
        return this;
      }
      
      public StepBuilder<Data> step() {
         addStep();
         return new StepBuilder<>(parent);
      }
      //本例中因为只使用了withAction方法,所以compensation, actionReplyHandlers, compensationReplyHandlers都为bull
      //调用2-7的中ParticipantInvocationStep的构造方法,将action赋予participantInvocation
      private void addStep() {
         parent.addStep(new ParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers));
      }
   }

1-7 ParticipantInvocationImpl类

    //1-7 AbstractParticipantInvocation实现ParticipantInvocation接口
    public class ParticipantInvocationImpl<Data, C extends Command> extends AbstractParticipantInvocation<Data> {
      private final boolean notification;
      //commandBuilder对应的就是1-6中withAction方法action参数
      private final Function<Data, CommandWithDestination> commandBuilder;

      public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder) {
        this(invocablePredicate, commandBuilder, false);
      }

      @Override
      public boolean isSuccessfulReply(Message message) {
        return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));
      }

      //被2-7调用,执行Function<Data, CommandWithDestination> commandBuilder;并传入值为false的notification
      @Override
      public CommandWithDestinationAndType makeCommandToSend(Data data) {
        return new CommandWithDestinationAndType(commandBuilder.apply(data), notification);
      }
    }

至此所有Saga步骤注册完整的业务流程完毕。

2、Saga实例化工厂如何运作

2-0 sagaInstanceFactory创建createOrderSaga

    sagaInstanceFactory.create(createOrderSaga, data);

2-1 SagaInstanceFactory类

 //2-1Saga实例化工厂
 public class SagaInstanceFactory {
  private Logger logger = LoggerFactory.getLogger(this.getClass());

  private ConcurrentMap<Saga<?>, SagaManager<?>> sagaManagers = new ConcurrentHashMap<>();

  public <SagaData> SagaInstance create(Saga<SagaData> saga, SagaData data) {
    SagaManager<SagaData>  sagaManager = (SagaManager<SagaData>)sagaManagers.get(saga);
    if (sagaManager == null)
      throw new RuntimeException(("No SagaManager for " + saga));
    //调用2-2的create方法
    return sagaManager.create(data);
  }

  private <SagaData> SagaManager<SagaData> makeSagaManager(SagaManagerFactory sagaManagerFactory, Saga<SagaData> saga) {
    SagaManagerImpl<SagaData> sagaManager = sagaManagerFactory.make(saga);
    sagaManager.subscribeToReplyChannel();
    return sagaManager;
  }
}

2-2 SagaManagerImpl类

 //2-2
 public class SagaManagerImpl<Data> implements SagaManager<Data> {
  @Override
  public SagaInstance create(Data sagaData, Optional<String> resource) {

    SagaInstance sagaInstance = new SagaInstance(getSagaType(),null,"????",null,SagaDataSerde.serializeSagaData(sagaData), new HashSet<>());
    //使用数据库保存saga实例
    sagaInstanceRepository.save(sagaInstance);

    String sagaId = sagaInstance.getId();
    //此步骤没什么意义
    saga.onStarting(sagaId, sagaData);

    resource.ifPresent(r -> {
      if (!sagaLockManager.claimLock(getSagaType(), sagaId, r)) {
        throw new RuntimeException("Cannot claim lock for resource");
      }
    });
    //getStateDefinition()获取SagaDefinition,即1-1中。
    //调用2-4start方法,启动saga流程,构建第一个“步骤”的SagaActions
    SagaActions<Data> actions = getStateDefinition().start(sagaData);

    actions.getLocalException().ifPresent(e -> {
      throw e;
    });
    //过程操作,传入参数:saga.getSagaType()是获取sagaData(操作数据)的类名,比CreateOrderSagaData并修改成固定格式
    //sagaId:唯一sagaId
    //sagaInstance:saga实例化
    //sagaData:要操作的数据
    //actions:第一个“步骤”的SagaActions
    //调用下面的processActions方法
    processActions(saga.getSagaType(), sagaId, sagaInstance, sagaData, actions);

    return sagaInstance;
      }

    private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions<Data> actions) {
      //进行循环
      while (true) {
      //如果传入的actions存在执行报错则信息执行if内的方法
        if (actions.getLocalException().isPresent()) {

          actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder
                  .withPayload("{}")
                  .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name())
                  .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName())
                  .build());

        } else {
          // only do this if successful
          //如果成功,通过消息队列发送给接收方。
          //第一次进来时:因为1-1中构建的流程内,“步骤一”是调用本地方法,因此不需要发送消息
          //第二次进来时:由于1-1中构建的流程内,“步骤二”是调用参与者方法,因此需要发送消息给参与者
          String lastRequestId = sagaCommandProducer.sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel());
          //第一次进来时:lastRequestId第一个步骤时为null
          //第二次进来时:返回一个请求Id
          sagaInstance.setLastRequestId(lastRequestId);
          //第一次进来时:更新“步骤一”sagaInstance实例状态信息:更新是否是最后节点(布尔值),更新是否需要补偿(布尔值),更新是否报错,更新更新的状态(对应2-5中executeStep方法的newState)
          //第二次进来时:更新“步骤二”sagaInstance实例状态
          updateState(sagaInstance, actions);

          sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData)));
           //执行第一个步骤时,并不是最后一个步骤节点所以不进入if中
          if (actions.isEndState()) {
            performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData);
          }
          //使用数据库更新sagaInstance实例状态
          sagaInstanceRepository.update(sagaInstance);
          //public boolean isReplyExpected() {return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local;}
          //第一次进来时:在2-5的step.makeStepOutcome过程中因为将local设置为true,所以执行第一个步骤时actions.isReplyExpected()为false
          if (actions.isReplyExpected()) {
            break;
          } else {
            //模拟成功回复本地动作或通知,调用下面的simulateSuccessfulReplyToLocalActionOrNotification方法
            //第一次进来时:传入“步骤一”的SagaActions,返回“步骤二”的SagaActions,继续循环
            //第二次进来时:传入“步骤二”的SagaActions,返回“步骤三”的SagaActions
            actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions);
          }

        }
      }
    }

     //模拟成功回复本地动作或通知
      private SagaActions<Data> simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions<Data> actions) {
        //获取1-1中的整个业务流程后,调用2-4中的handleReply方法,并设置REPLY_OUTCOME和REPLY_TYPE头为success,最后返回“步骤二”SagaActions,重新进入上面的while循环中
        return getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder
            .withPayload("{}")
            .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name())
            .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName())
            .build());
      }
  }

2-3 AbstractSimpleSagaDefinition类

   //2-3
   public abstract class AbstractSimpleSagaDefinition<Data, Step extends ISagaStep<Data>,
            ToExecute extends AbstractStepToExecute<Data, Step>,
            Provider extends AbstractSagaActionsProvider<Data,?>> {

        protected Logger logger = LoggerFactory.getLogger(this.getClass());
        //steps接受1-5中的sagaSteps
        protected List<Step> steps;

        public AbstractSimpleSagaDefinition(List<Step> steps) {
            this.steps = steps;
        }
        //被2-4start方法调用
        protected Provider firstStepToExecute(Data data) {
       //SagaExecutionState.startingState()返回SagaExecutionState(-1, false);
       //初始化时步骤节点currentlyExecuting为-1(因为0为第一个“步骤”,所以设置-1为起点),是否补偿compensating为false
       //开始执行下一个步骤节点(此处执行的是第一个“步骤”)
            return nextStepToExecute(SagaExecutionState.startingState(), data);
        }
        //被2-4的handleReply方法调用
        protected Provider sagaActionsForNextStep(String sagaType, String sagaId, Data sagaData, Message message,
                                              SagaExecutionState state, Step currentStep, boolean compensating) {
            //此处分两种情况:本地调方法用CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));【例如1-1中的“步骤一”】;远程调用参与者方法用getParticipantInvocation(compensating).get().isSuccessfulReply(message);【例如1-1中的“步骤二”】
            //由于“步骤一”是本地方法且头中包含信息为success,直接调用下方的nextStepToExecute方法
            if (currentStep.isSuccessfulReply(compensating, message)) {
                return nextStepToExecute(state, sagaData);
            } else if (compensating) {
                return handleFailedCompensatingTransaction(sagaType, sagaId, state, message);
            } else {
                return nextStepToExecute(state.startCompensating(), sagaData);
            }
        }

        protected Provider nextStepToExecute(SagaExecutionState state, Data data) {
            int skipped = 0;
            //初始化compensating为false无需补偿
            boolean compensating = state.isCompensating();
            //初始化compensating为false,所以下个要执行的步骤节点+1,如果为true说明出错需要回滚,因此direction初始化为-1
            int direction = compensating ? -1 : +1;
            //第一次进入时:direction初始化为-1,所以i初始值为0,说明从第一个步骤开始执行;i必须小于节点长度;i根据direction来判断是否需要回滚到上一个节点还是进入下一个阶段
            //第二次进入时:步骤一执行后state.getCurrentlyExecuting()变为0,所以i变为1,steps.get(i)获取“步骤二”。由于compensating依然为false,所以不进行回滚继续向下执行
            for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) {
               //获取步骤节点
                Step step = steps.get(i);
                //每个步骤节点中有正常执行的方法事务,可能有补偿事物。
                //因此使用compensating进行判断。如果需要补偿且存在补偿事务,或则不需要补偿,以上两种情况则为true
                //step.hasCompensation(data)和step.hasAction(data)返回值都是布尔值。
                //step.hasCompensation(data)用来判断是否存在补偿事务,step.hasAction(data)直接返回true
                if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) {
                    //makeStepToExecute指定执行步骤,调用SimpleSagaDefinition方法中的makeStepToExecute方法
                    //传入参数:当前跳过的skipped计数,是否需要补偿,以及“步骤”
                    //makeSagaActionsProvider调用2-4的makeStepmakeStepToExecuteToExecute方法,构建一个StepToExecute对象,
                    ToExecute stepToExecute = makeStepToExecute(skipped, compensating, step);
                    //makeSagaActionsProvider传入:执行节点,处理数据,节点初始状态,调用2-4中的makeSagaActionsProvider方法
                    return makeSagaActionsProvider(stepToExecute, data, state);
                } else
                    //如果需要补偿但没有补偿事务
                    //跳过计数+1
                    skipped++;
            }
            return makeSagaActionsProvider(makeEndStateSagaActions(state));
        }

        protected Provider handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) {
            logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message);
            return makeSagaActionsProvider(SagaActions.<Data>builder()
                    .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState()))
                    .withIsEndState(true)
                    .withIsCompensating(state.isCompensating())
                    .withIsFailed(true)
                    .build());
        }

        protected SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) {
            return SagaActions.<Data>builder()
                    .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState()))
                    .withIsEndState(true)
                    .withIsCompensating(state.isCompensating())
                    .build();
        }

    }

2-4 SimpleSagaDefinition类

    //2-4
    public class SimpleSagaDefinition<Data>{

      public SimpleSagaDefinition(List<SagaStep<Data>> steps) {
        super(steps);
      }

      @Override
      public SagaActions<Data> start(Data sagaData) {
         //执行2-3中的firstStepToExecute方法,启动第一个流程
        return toSagaActions(firstStepToExecute(sagaData));
        //构建SagaActions完毕,返回并继续2-2中的create方法内getStateDefinition().start(sagaData)后的代码
      }

      //被2-2中的processActions方法调用
      @Override
      public SagaActions<Data> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) {
         //将前一个“步骤”中的之前被JSON格式化(2-5的makeSagaActions把newState进行JSON格式化)的newState此处的currentState进行解码
        SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState);
         //state.getCurrentlyExecuting()的值为“0”,因为初始化currentlyExecuting的计数是-1,而在2-5 中执行currentState.nextState(size())已经把当前“步骤”计数+1.而,所以step.get(0)获取了整个业务流程中的“步骤一”
        SagaStep<Data> currentStep = steps.get(state.getCurrentlyExecuting());
        //获取前一个“步骤”是否需要回滚
        boolean compensating = state.isCompensating();

        currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> {
              handler.accept(d, m);
              return null;
        }));
        //sagaActionsForNextStep会根据是否需要补偿来判断是采用nextStepToExecute(2-3)方法,还是调用handleFailedCompensatingTransaction(2-3)方法。
        SagaActionsProvider<Data> sap = sagaActionsForNextStep(sagaType, sagaId, sagaData, message, state, currentStep, compensating);
        return toSagaActions(sap);
       }


      //被2-3的nextStepToExecute中调用
      @Override
        protected StepToExecute<Data> makeStepToExecute(int skipped, boolean compensating, SagaStep<Data> step) {

         return new StepToExecute<>(step, skipped, compensating);
      }   

      //被2-3的nextStepToExecute中调用
      @Override
      protected SagaActionsProvider<Data> makeSagaActionsProvider(StepToExecute<Data> stepToExecute, Data data, SagaExecutionState state) {
        //调用2-5的executeStep方法
        return new SagaActionsProvider<>(() -> stepToExecute.executeStep(data, state));
      }
    }

2-5 executeStep方法

    //2-5  StepToExecute类中的方法,被2-4中的makeSagaActionsProvider方法调用
    public SagaActions<Data> executeStep(Data data, SagaExecutionState currentState) {
     //nextState方法执行:SagaExecutionState(compensating ? currentlyExecuting - size : currentlyExecuting + size, compensating)
     //protected int size() {return 1 + skipped;}
     //需要补偿:则下一个状态为当前节点currentlyExecuting-已跳过(没有补偿事务)的节点的长度,即回到最后(有补偿事务)执行的步骤节点。
     //不需要补偿:则下一个状态为当前节点currentlyExecuting+需要跳过(没有补偿事务)的节点的长度
     //计算完后将currentlyExecuting进行更新
      SagaExecutionState newState = currentState.nextState(size());

      SagaActions.Builder<Data> builder = SagaActions.builder();
      //当前正执行是否需要补偿回滚
      boolean compensating = currentState.isCompensating();
      //调用2-6中的makeStepOutcome方法
      //第一次进来时:因为步骤一执行的是本地方法,调用的2-6中的makeStepOutcome,作用在于判断当前“步骤”是该执行补偿事务还是正常的本地事务,如果执行出现错误则返回一个带有报错信息的StepOutcome对象
      //第二次进来时:因为步骤二执行的是让参与方执行方法,调用的2-7中的makeStepOutcomemakeStepOutcome
      //执行StepOutcome的visit方法:将StepOutcome的RuntimeException类型的localOutcome属性赋值给SagaActions中的RuntimeException类型localException属性,同时将SagaActions中的local属性设置为ture;
      step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands);

      //SagaActions的makeSagaActions方法做两件事:将当前节点的数据newState格式化成JSON数据,newState信息包括:当前执行步骤的计数(第几个步骤)、是否回滚(布尔值)、是否是最后一个步骤(布尔值),是否出现错误(布尔值)。然后调用buildActions方法构建返回一个新的SagaActions
      //String state = encodeState(newState);
      //builder.buildActions(data, compensating, state, newState.isEndState());
      //public SagaActions<Data> buildActions(Data data, boolean compensating, String state, boolean endState) {
      //return withUpdatedSagaData(data)
      //       .withUpdatedState(state)
      //       .withIsEndState(endState)
      //       .withIsCompensating(compensating)
      //       .build();
      //}
      return makeSagaActions(builder, data, newState, compensating);
      //第一次进来时:构建SagaActions完毕,返回到2-4中的start方法
      //第二次进来时:构建SagagAcions完毕,返回到2-2中的simulateSuccessfulReplyToLocalActionOrNotification方法
    }

2-6 LocalStep类

    //2-6
    public class LocalStep<Data> implements SagaStep<Data> {
      private final Consumer<Data> localFunction;
      private final Optional<Consumer<Data>> compensation;
      private final List<LocalExceptionSaver<Data>> localExceptionSavers;
      private final List<Class<RuntimeException>> rollbackExceptions;

      @Override
      public StepOutcome makeStepOutcome(Data data, boolean compensating) {
        try {
          //如果需要回滚,执行回滚方法,compensation在1-1时的.withCompensation已经传入
          if (compensating) {
           //真正执行业务逻辑方法的地方
            compensation.ifPresent(localStep -> localStep.accept(data));
          } else {
          //如果不需要回滚,直接执行补偿方法,localFunction在1-1的.invokeLocal时已经传入
            localFunction.accept(data);
          }
          return makeLocalOutcome(Optional.empty());
        } catch (RuntimeException e) {
          localExceptionSavers.stream().filter(saver -> saver.shouldSave(e)).findFirst().ifPresent(saver -> saver.save(data, e));
          if (rollbackExceptions.isEmpty() || rollbackExceptions.stream().anyMatch(c -> c.isInstance(e)))
            return makeLocalOutcome(Optional.of(e));
          else
            throw e;
        }
      }

    }

2-7 ParticipantInvocationStep类

   //2-7
   public class ParticipantInvocationStep<Data> implements SagaStep<Data> {
      //participantInvocation被1-6的aaddStep方法传递action赋值
      private Optional<ParticipantInvocation<Data>> participantInvocation;
      private Optional<ParticipantInvocation<Data>> compensation;
      
      public ParticipantInvocationStep(Optional<ParticipantInvocation<Data>> participantInvocation,
                                   Optional<ParticipantInvocation<Data>> compensation,
                                   Map<String, BiConsumer<Data, Object>> actionReplyHandlers,
                                   Map<String, BiConsumer<Data, Object>> compensationReplyHandlers) {
    	this.actionReplyHandlers = actionReplyHandlers;
    	this.compensationReplyHandlers = compensationReplyHandlers;
    	this.participantInvocation = participantInvocation;
    	this.compensation = compensation;
  	 }

     //判断是否需要回滚,如果需要回滚执行compensation方法,如果不需要执行participantInvocation方法
      private Optional<ParticipantInvocation<Data>> getParticipantInvocation(boolean compensating) {
        return compensating ? compensation : participantInvocation;
      }

      @Override
      public boolean isSuccessfulReply(boolean compensating, Message message) {
        return getParticipantInvocation(compensating).get().isSuccessfulReply(message);
      }

      @Override
      public StepOutcome makeStepOutcome(Data data, boolean compensating) {
         //先调用getParticipantInvocation方法,此处假设不需要回滚所以返回participantInvocation
         //调用makeRemoteStepOutcome方法,传入List<CommandWithDestinationAndType> commandsToSend 返回RemoteStepOutcome类型的结果
        return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating)
                //调用1-7中的makeCommandToSend方法,执行消息发送方法
                .map(pi -> pi.makeCommandToSend(data))
                //将返回的CommandWithDestinationAndType包装成单元素列表 
                .map(Collections::singletonList)
                //如果上述返回为空则返回一个空列表
                .orElseGet(Collections::emptyList));
      }
    }

图解流程

有空再补....

简单案例

花了两天写了个以领域驱动为思想的Saga模式事务管理简陋框架,主要为了讲解:领域驱动模型DDD(三)——使用Saga管理事务 教学而设计的,目前只能在单体架构中使用,后续有时间会更新分布式情况下的新版本。请记住,领域驱动模型是一种思想,它不一定捆绑分布式微服务,只是领域驱动模型思想更有利于分布式情况下对微服务应用的划分。
项目框架地址:https://github.com/CG-Lin/mvn-lin

标签:调用,return,Tram,步骤,Eventuate,step,源码,compensating,public
From: https://www.cnblogs.com/jianpansangejian/p/17699095.html

相关文章

  • 企业综合信息化,人力资源管理,培训考学管理,电子采购(源码系统)
    前言:随着现代信息技术的不断发展,企业综合信息化已成为一种必然趋势。企业综合信息化是指将信息技术与企业业务流程相结合,实现企业资源的优化配置和高效利用,提升企业的竞争力和生产力。在实现企业综合信息化的过程中,人力资源管理、培训考学管理和电子采购是三个非常重要的环节。......
  • #yyds干货盘点#Koa源码浅析
    Koa源码十分精简,只有不到2k行的代码,总共由4个模块文件组成,非常适合我们来学习。我们先来看段原生Node实现Server服务器的代码:consthttp=require('http');constserver=http.createServer((req,res)=>{res.writeHead(200);res.end('helloworld');});server.list......
  • 酒店预订系统的设计与实现-计算机毕业设计源码+LW文档
    一、选题目的、意义、背景及研究现状1、选题目的1、可以有效计划好自己的行程,节约时间,以免酒店客满导致行程出现变动,所以越来越多的客人愿意提前向酒店预订客房,以便到达后能及时入住。  2、客人在预订时可以对要居住的房间提出具体要求,有利于客人及时入住理想的房型。2、选......
  • 最新修复塔罗牌占卜星座运势在线事业爱情塔罗测试源码
    源码介绍:运势测算星座塔罗牌这类源码一直都热度不减,这版非常轻量化,上传解压,填入数据库信息即可用,失效的接口也都已经一一修复。数据库信息全部都已经预设完毕,无需手动分类添加星座词等等。运营成品,搭建即用。图文文档教程手把手教你搭建。支付对接Z支付。PS:这个是最新的修复版,据说......
  • 个人收藏系统设计与实现-计算机毕业设计源码+LW文档
    1.1选题背景由于网络技术的发展十分迅猛,各类信息爆炸式增长,越来越多的人们将获取信息的方式转变为依靠网络获取,网络信息化已经深入到人们平常的生活和工作之中。于是很多人习惯通过网络获取自己感兴趣的内容来填充生活中的碎片时间,比如听歌、看视频、看电子书以及游戏攻略等,但是遇......
  • drf————源码分析
    drf————源码分析>认证源码分析权限源码分析频率类源码分析三大认证的源码分析之前读取的APIView的源码的执行流程中包装了新的request,执行了三大认证,执行视图类的方法,处理了全局异常查看源码的入口APIView的dispatch进入后在APIView的dispatch的496行上下self.......
  • mpam linux kernel源码分析
    MPAM(MemorySystemResourcePartitioningandMonitoring)是Armv8.4的feature,用于cache和内存带宽的监控和限制。截至现在,该feature在linuxkernel的实现还在推进,最新一版参见https://git.kernel.org/pub/scm/linux/kernel/git/morse/linux.git/log/?h=mpam/snapshot/v6.5-rc1。......
  • 基于自定义表编写认证类、django-jwt源码分析、权限介绍、simpleui的使用
    基于自定义表编写认证类补充:翻译函数只要做了国际化,就会显示当前国家的语言fromdjango.utils.translationimportgettext_lazyas_msg=_('Signaturehasexpired.')#_是个函数的别名,这个函数是翻译函数,只要做了国际化,它就是中文认证类fromrest_framework_jwt......
  • 《Flask Web开发:基于Python的Web应用开发实战》高清高质量PDF电子书+源码
    网盘下载:https://pan.quark.cn/s/cc9dc7402cdb......
  • 《Python数据分析基础教程:NumPy学习指南.第2版》高清高质量PDF电子书+源码
    罕见的NumPy中文入门教程,Python数据分析首选从最基础的知识讲起,手把手带你进入大数据挖掘领域囊括大量具有启发性与实用价值的实战案例下载:https://pan.quark.cn/s/730b594117c0......