首页 > 其他分享 >Elsa V3学习之Flowchart详解(下)

Elsa V3学习之Flowchart详解(下)

时间:2024-08-19 22:48:12浏览次数:9  
标签:Elsa children flowchartContext V3 context activity var Flowchart 节点

接上文,我们介绍了Flowchart的部分逻辑,下来来讲解flowchart剩下的逻辑。

OnChildCompletedAsync

看下OnChildCompletedAsync的代码。


    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
    {
        var logger = context.GetRequiredService<ILogger<Flowchart>>();
        var flowchartContext = context.TargetContext;
        var completedActivityContext = context.ChildContext;
        var completedActivity = completedActivityContext.Activity;
        var result = context.Result;

        // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
        var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
        var outcomeNames = result is Outcomes outcomes
            ? outcomes.Names
            : [null!, "Done"];

        // Only query the outbound connections if the completed activity wasn't already completed.
        var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
        var children = outboundConnections.Select(x => x.Target.Activity).ToList();
        var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());

        scope.RegisterActivityExecution(completedActivity);

        // If the complete activity is a terminal node, complete the flowchart immediately.
        if (completedActivity is ITerminalNode)
        {
            await flowchartContext.CompleteActivityAsync();
        }
        else if (scheduleChildren)
        {
            if (children.Any())
            {
                // Schedule each child, but only if all of its left inbound activities have already executed.
                foreach (var activity in children)
                {
                    var existingActivity = scope.ContainsActivity(activity);
                    scope.AddActivity(activity);

                    var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

                    // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                    if (!inboundActivities.Contains(completedActivity))
                    {
                        await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                        continue;
                    }

                    // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                    if (activity is not IJoinNode)
                    {
                        var executionCount = scope.GetExecutionCount(activity);
                        var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);

                        if (haveInboundActivitiesExecuted) 
                            await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                    }
                    else
                    {
                        // Select an existing activity execution context for this activity, if any.
                        var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
                            x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
                        var scheduleWorkOptions = new ScheduleWorkOptions
                        {
                            CompletionCallback = OnChildCompletedAsync,
                            ExistingActivityExecutionContext = joinContext,
                            PreventDuplicateScheduling = true
                        };

                        if (joinContext != null)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
                        else if (!existingActivity)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
                        else
                        {
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
                            continue;
                        }

                        await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
                    }
                }
            }

            if (!children.Any())
            {
                await CompleteIfNoPendingWorkAsync(flowchartContext);
            }
        }

        flowchartContext.SetProperty(ScopeProperty, scope);
    }

从第11行开始,这里先做了下判断,判断执行结束的节点状态是否是已完成。如果不是已完成,则不会再往下执行,节点执行结束并不表示节点执行完成,这个节点可能处于异常,或者暂停状态。

第17-18行的代码表示,获取已完成的节点的后续所有节点。如果这个节点的出口连接了多个节点,那么这里的children会有多个节点。

第19行和21行则是获取当前工作流的上下文的一个Scope状态。并将已执行过的节点做记录。

再接下来有个判断,判断执行完成的节点是否是ITerminalNode类型,如果是则直接完成整个工作流。继承ITerminalNode的节点都将是流程的终结节点。

然后才是判断scheduleChildren的结果,如果不是已完成的状态,则保存一下scope状态,不继续执行后续流程。
如果节点已完成,则继续判断children是否有值,如果没有,说明后续没有连线的节点,那么就会继续判断节点是否处于挂起状态或者异常状态,如果都没有,则结束工作流程。

private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
{
    var hasPendingWork = HasPendingWork(context);

    if (!hasPendingWork)
    {
        var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);

        if (!hasFaultedActivities)
        {
            await context.CompleteActivityAsync();
        }
    }
}

如果children有值。那么将遍历children,并执行该Activity。
执行Children Activity时,首先在scope记录该节点。
然后判断已完成的活动在不在左侧入口的路线中,如果不在,则执行Activity。

var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

// If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
if (!inboundActivities.Contains(completedActivity))
{
    await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
    continue;
}

如果在,那么再继续判断当前Activity是否属于IJoinNode类型,如果是IJoinNode类型,那么需要等待其左侧所有连接的节点执行结束后再继续执行。
如果不是,那么继续判断其左侧节点的执行次数是否大于当前节点,满足条件则继续执行。

这里可以发现,每一步的ScheduleActivityAsync都会继续把OnChildCompletedAsync传递下去,使用递归的方式执行我们的工作流,知道工作流程结束。

到这我们就基本理清楚我们的flowchart的执行逻辑了。

Signal

然后我们回过头看flowchart的构造函数。

public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
{
    OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
    OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
    OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
}

可以看到有几个信号接收的订阅。这里的Signal用于在工作流执行的过程中接收到的外部信号,并对应作出处理,这里最简单的是CancelSignal,当flowchart的执行过程中,如果收到这个信号,那么将立即完成执行工作流。

    private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
    {
        await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
    }

详细的Signal的执行逻辑,我们将在后续文章中继续介绍。flowchart介绍先到此结束

结语

通过两篇文章,我们基本理清楚了我们编排后的工作流的运行逻辑。希望对小伙伴们有所帮助。

标签:Elsa,children,flowchartContext,V3,context,activity,var,Flowchart,节点
From: https://www.cnblogs.com/fanshaoO/p/18368273

相关文章

  • Elsa V3学习之Flowchart详解(上)
    前面我们通过界面学习了Elsa的一些基本使用,若是有实操的小伙伴们,应该可以发现,我们工作流定义中的root,既我们的工作流画布其实也是一个activity,就是Flowchart。那么本文将来解读以下flowchart的执行逻辑。Flowchart源码为了方便大家,这里先直接把flowchart的源码贴出。usingSyst......
  • 详尽 | Deeplabv3+结构理解
    https://arxiv.org/pdf/1802.02611.pdfhttps://link.springer.com/chapter/10.1007/978-3-319-10578-9_23目录Deeplabv3+Encoder部分Decoder部分补充摘要SPP 空间金字塔池化层模块Dilated/AtrousConv空洞卷积Deeplabv3+deeplab-v3+是语义分割网络,组合采用空洞......
  • Elsa V3学习之调起其他流程
    在Elsa中,还能通过DispatchWorkflow节点来执行其他已发布的流程。DispatchWorkflowDispatchWorkflow可以选择任一以及发布的工作流程,这里我们选择最初的HelloWord的流程Workflow1。通过HTTPEndpoint节点触发。触发链接为https://localhost:5001/api/workflows/Dispatch请求......
  • Elsa V3学习之工作流调度
    Elsa支持工作流的定时调度功能。包括Cron表达式执行,Delay延迟执行,Timer固定时间间隔重复执行。本文来介绍一下这几个节点的使用。Cron把Cron节点拖到画布,并配置cron表达式0/1****?,表示每秒执行一次,WriteLine打印当前时间。点击发布工作流则立即生效。可以看到控制台每秒......
  • Elsa V3学习之脚本
    在前面的文章中,可以看到我们经常使用JS脚本来获取变量的值。在Elsa中是支持多种脚本的,最常用的基本是JS脚本和C#脚本。本文来介绍以下这两个脚本使用。Javascript在ELSA中的javascript是通过Jint这个包来实现的。通过JS映射到C#内部的方法中。可以在代码中先预定义我们的Functi......
  • Elsa V3学习之分支节点
    接下来我们来介绍下Elsa的一些内置节点的使用。本节介绍分支节点。Descision这个节点其实就是If,只不过是用flow编排的模式。我们来创建一个简单的分支流程,通过HTTP节点请求的参数,判断是否满足表达式,分别输出True,False。添加一个变量,将HTTPEndpoint的OUTPUT的QueryStringData......
  • Elsa V3学习之循环节点
    上篇我们学习了分支节点,这篇文章我们来学习循环节点。Forfor节点跟我们代码中的for循环是一样的效果,有三个参数。Start,End,Step。分别表示起始数字,终点数字,以及步长,即每次循环加几的意思。下面的配置相当于for(i=0,i<=10,i++)。for节点的output表示当前的循环的值,我们可以......
  • Elsa V3学习之介绍篇
    一、ELSAV3概述ELSA是一个开源的工作流引擎,旨在帮助开发者快速构建和管理复杂的工作流。ELSAV3是该框架的最新版本,带来了许多新特性和改进,使得工作流的创建和管理更加高效和灵活。1.1主要特性可视化设计器:ELSAV3提供了一个直观的可视化工作流设计器,允许用户通过拖放方......
  • Elsa V3学习之内置工作流节点
    在ELSAV3中,Activity是工作流的基本构建块,它们代表了工作流中的具体操作或任务。每个Activity都可以执行特定的功能,帮助实现复杂的业务逻辑。ELSAV3提供了一系列内置的Activity,开发者可以直接使用这些Activity,或者根据需要进行扩展和自定义。内置Activity概述在ELSA......
  • Elsa V3学习之Hello Word
    前面文章介绍了Elsa的基础节点内容,接下来我们来开始实践一下。启动项目启动源码目录src\bundles中的Elsa.ServerAndStudio.Web的项目。这个项目包含ElsaServer以及前端界面。可以让我们快速学习Elsa项目。控制台HelloWord打开Workflows下的Definitions页面,点击CREATEWORKFL......