首页 > 其他分享 >implement a parallel batch processing in X++ of Dynamics 365 F&O

implement a parallel batch processing in X++ of Dynamics 365 F&O

时间:2023-11-17 10:57:05浏览次数:35  
标签:task ++ custTrans processing batch queryRun CustTrans public

One of the powerful features of Dynamics 365 Finance and Operations is a Batch framework. In this post, I explain how you can convert your existing batch job to multi-threaded to increase its performance.

Initial example

Let's consider the following operation - a user dialog that processes customer transactions and performs an action at the end.

 

The logic is quite simple - loop thought all specified customer transactions and call process() function. In our case, it will sleep for the specified number of milliseconds. After the processing of the transactions, it runs a final function - in our case, it is just an Infolog message.

public void run()
{
    //1. data preparation
    info(strFmt("%1 - Start operation", AifUtil::applyUserPreferredTimeZoneOffset(DateTimeUtil::utcNow())));

    //2. Query Processing
    this.runQueryProcessing();

    //3.final task
    info(strfmt("%2 - %1 record(s) processed", SysQuery::countTotal(queryRun),
 AifUtil::applyUserPreferredTimeZoneOffset(DateTimeUtil::utcNow())));
}
public void runQueryProcessing()
{
    while (queryRun.next())
    {
        CustTrans   custTrans = queryRun.get(tablenum(CustTrans));
        this.processRecord(custTrans);
    }
}
public void processRecord(CustTrans  _custTrans)
{
    //do some job using _custTrans and transDate
    sleep(taskSleepTimeMs);
}

Standard USMF demo company has 1700 customer transactions, so if we run this job with no filters in a user interface or in a batch and specify 200ms to process one transaction, it will take 340 seconds for the whole job.

 

https://usnconeboxax1aos.cloud.onebox.dynamics.com/?mi=SysClassRunner&cls=DEVTutorialBatchSingleThread&cmp=USMF

Solution design principles

It is quite obvious that we can optimize this by running the code in parallel threads. Possible options of how this works are described in AX perf blog - Batch Parallelism in AX, but every approach from that article has its pros and cons.

What should be considered while designing a simplified solution for this:

  • A change should be simple and require minimum modifications to the original class. We don't want to create new classes or new tables to support parallel execution.
  • We can't run a single batch thread per one transaction - it will create a lot of overhead for a batch framework, so the solution should allow specifying maximum batch threads.
  • Execution flow in batch mode or in user interface should be exactly the same, better to avoid operations that can be run only in a batch.
  • We should support the final task, and it should be executed only once after transactions processing.

In most cases, we(as developers) should know how to split the load. In the example above we can just split selected customer transactions by equal intervals, but the split function can be more complex(for example we may want to avoid running parallel tasks for the same customer to prevent blocking)

The main idea is to introduce a new class parameter batchIdentifier - identification for the split interval and then run our logic only for this interval.

I created a base class DEVTutorialBatchMultipleThreadBase to incorporate this logic. If we have batchIdentifier specified - that means it is a child task that needs to perform a calculation for this batchIdentifier. If the class is executed with an empty batch identifier - that means it is the main task that should split the load and create all tasks for each split interval and the final task at the end.

class DEVTutorialBatchMultipleThreadBase extends RunBaseBatch
{
    public void run()
    {
        container               batchIdentifierCon;
        int                     i;
        ;
        if (batchIdentifier) //child task
        {
            if (batchIdentifier == this.finalTaskIdentifier())
            {
                this.runFinalTask();
            }
            else
            {
                this.runThreadTask();
            }
        }
        else
        {
            this.runStartTask();

            batchIdentifier = this.finalTaskIdentifier();
            this.processThreadItem(true); //create the final task, we need a dependency, so create it in the beggining.

            batchIdentifierCon = this.getBatchIdentifiersRangeCon();

            for (i = 1; i <= conLen(batchIdentifierCon); i++)
            {
                batchIdentifier = conPeek(batchIdentifierCon, i);
                this.processThreadItem(false);
            }

            if (finalTask)
            {
                if (this.isInBatch())
                {
                    batchHeader.save();
                }
                else
                {
                    finalTask.run();
                }
            }
        }
    }

Method processThreadItem creates the same instance of our class and calls a pack function. If the process is executed in a batch mode it creates a new runtime batch task. Without batch mode, it just runs this task.

Multiple threads batch example

Let's change our class to multithread.

we need to implement 3 function - runStartTask(), runThreadTask(), runFinalTask() to execute our tasks and getBatchIdentifiersRangeCon() to create a list of intervals - in our case it will be ranges FromRecId..ToRecId for the selected transactions. Method getBatchIdentifiersRangeCon() is the most complex and new in this example, all others are just a copy of original methods.

public class DEVTutorialBatchMultipleThread extends DEVTutorialBatchMultipleThreadBase
{
public void runStartTask()
{
    //1. data preparation
    info(strFmt("%1 - Start operation", AifUtil::applyUserPreferredTimeZoneOffset(DateTimeUtil::utcNow())));
}
public void runThreadTask()
{
    //2. Query Processing
    QueryBuildDataSource  qBDS = queryRun.query().dataSourceTable(tablenum(CustTrans));
    qBDS.addRange(fieldnum(CustTrans, RecId)).value(batchIdentifier);
    while (queryRun.next())
    {
        CustTrans   custTrans = queryRun.get(tablenum(CustTrans));
        this.processRecord(custTrans);
    }
}
public void runFinalTask()
{
    //3.final task
    info(strfmt("%2 - %1 record(s) processed", SysQuery::countTotal(queryRun),
                AifUtil::applyUserPreferredTimeZoneOffset(DateTimeUtil::utcNow())));
}
public container  getBatchIdentifiersRangeCon()
{
    container  res;
    QueryRun   queryRunLocal = new QueryRun(queryRun.query());

    QueryBuildDataSource   qBDS = queryRunLocal.query().dataSourceTable(tablenum(CustTrans));
    int                    totalRecords, curRecord, recordsPerBatch;
    RecId                  fromRecId, toRecId;

    qBDS.sortClear();
    qBDS.addSortField(fieldnum(CustTrans, RecId));
    !!!qBDS.addRange(fieldnum(CustTrans, RecId)).value(batchIdentifier);

    totalRecords = SysQuery::countTotal(queryRunLocal);
    recordsPerBatch = maxTaskCount > 0 ? totalRecords div maxTaskCount : totalRecords;       if (! recordsPerBatch)
    {
        recordsPerBatch = 1;
    }
    while (queryRunLocal.next())
    {
        CustTrans   custTrans = queryRunLocal.get(tablenum(CustTrans));
        if (! fromRecId) fromRecId = custTrans.RecId;

        curRecord++;
        toRecId = custTrans.RecId;
        if ((curRecord mod recordsPerBatch) == 0)
        {
            res += SysQuery::range(fromRecId, toRecId);
            fromRecId = 0;
            curRecord = 0;
        }
    }
    if (curRecord && fromRecId && toRecId) res += SysQuery::range(fromRecId, toRecId);
    return res;
}

In a user interface we added a new field "Number of batch tasks" to specify how many tasks to create.

 

if we run our function in a batch mode, we will see the following result

 

In this case, we got the total execution time of 30 sec, but values less than a minute are more related to the batch processing.

Summary

I described how easily you could implement multithreading in RunBase framework and convert the existing single-threaded task to the multi-threaded one. One note for this - use this approach only after you have performed all possible optimizations for the original code, running non-optimal code in multiple threads can create some problems. Also, the described solution is compatible with Ax2009 and AX2012 so that you can use exactly the same approach.

 

标签:task,++,custTrans,processing,batch,queryRun,CustTrans,public
From: https://www.cnblogs.com/lingdanglfw/p/17838148.html

相关文章

  • C++笔记
    inline内联函数:内存膨胀,空间换时间,节省调用函数,给被调函数形参赋值以及自动回收内存的时间使用原则:内联函数内不要有循环,使用重复率较高,代码比较简单的函数使用内联函数引用(别名,解析引用符)int&dd=numdd与num共享同一段内存,定义引用必须赋初始值,引用的作用可以缩短名称......
  • 【C++】【图像处理】形态学处理(腐蚀、膨胀)算法解析(以.raw格式的图像为基础进行图像处
    1voiderosion(BYTE*image,intw,inth,BYTE*outImg)2{3intrept;4//腐蚀5memcpy(outImg,image,sizeof(BYTE)*w*h);//将读取的图像赋值给outImg,方便进行腐蚀操作67inti,j,m,n;8BYTEflag;9for(rept=0;rept......
  • C++ map容器
    由于我昨天做题遇到了map,但是自己对map的了解少之甚少,于是就去学了一下map我将map的总结发到了c*dn,请移步c++map容器简介......
  • C++调用Python3实战,和PyImport_ImportModule返回NULL问题解决
    LinuxC++调用Python3入门准备以下面的目录结构演示如何在LinuxC/C++调用python3。|--hello.py|--main.cpp|--CMakeLists.txt hello.py:python的脚本,里面有2个函数main.cpp:c++函数CMakeLists.txt:Cmake文件,生成makefilepython脚本示例python脚本hello.py内容如下,......
  • 基于pybind11实现C++程序中调用Python脚本增加C++程序扩展性
     文章目录前言一、pybind11与Python环境配置二、C++环境配置三、C++调用Python交互代码四、C++调用PythonDemo完整源码 前言Windows平台,在实际C++项目开发中,结合pybind11库,让python成为C++的脚本语言,可以大大提高C++程序的可扩展性,大大提高开发效率,特别......
  • C++通过pybind11调用Python 实现transpose
    在某些场合需要在C++实现类似numpy的numpy.transpose(a,axes)功能,但是很多库如NumCpp都没有提供这样的方法,只有二维矩阵的转置,没法进行多维矩阵任意维度的转换。比较简单的想法就是利用numpy现有的功能,在c++代码里面通过调用python来调用Numpy的transpose。直接调用Python提......
  • C++调用python踩坑记录
     目录0、参考文档及博客1、环境配置步骤2、C++调用python的方法代码框架:(同样来源于上面这篇博客,可用于测试环境配置成功与否)报错处理函数(1)处理方法一:PyErr_Print(2)处理方法二:PyErr_Fetch2.5、终极解决方案3、踩坑记录(1)python第三方库调用出错(2)python模块环......
  • C#调用C++动态库接口函数和回调函数方法 后续
    声明回调委托,C#的委托可以实现C#调用C++的回调,操作函数以后的回调//定义委托,CallingConvention.StdCall可以,CallingConvention.Cdecl不行,参考https://www.it1352.com/1792610.html//[UnmanagedFunctionPointer(CallingConvention.Cdecl)]//不需要要添加该句话,具体参考//htt......
  • C#调用C++动态库接口函数和回调函数方法
    这篇文章主要介绍了C#调用C++动态库接口函数和回调函数方法,通过C++端编写接口展开内容,文章介绍详细具有一定的参考价值,需要的小伙伴可以参考一下需求: 当前C已经写好了一个动态库,完成了产品开发需求,C#需要调用C编写的动态库DLL接口,开发出完整的软件,DLL动态库里包含了普通接口函......
  • An error has occurred during report processing
    ex.Message:Anerrorhasoccurredduringreportprocessing.ex.Source:Microsoft.ReportViewer.WebFormsex.StackTrace:  at Microsoft.Reporting.WebForms.LocalReport.InternalRender(Stringformat,BooleanallowInternalRenderers,StringdeviceInfo,CreateAn......