首页 > 其他分享 >并行计算框架 Taskflow && CGraph

并行计算框架 Taskflow && CGraph

时间:2023-10-01 11:45:42浏览次数:36  
标签:Task taskflow 并行计算 && tf CGraph Taskflow parallel

 

 

taskflow/taskflow: A General-purpose Parallel and Heterogeneous Task Programming System (github.com)

 

ChunelFeng/CGraph: 【A simple C++ DAG framework】 一个简单好用的、无三方依赖的、跨平台的、收录于awesome-cpp的、基于流图的并行计算框架。欢迎star & fork (github.com)

 

Taskflow 

Ubuntu macOS Windows Wiki TFProf Cite

Taskflow helps you quickly write parallel and heterogeneous task programs in modern C++

Why Taskflow?

Taskflow is faster, more expressive, and easier for drop-in integration than many of existing task programming frameworks in handling complex parallel workloads.

Taskflow lets you quickly implement task decomposition strategies that incorporate both regular and irregular compute patterns, together with an efficient work-stealing scheduler to optimize your multithreaded performance.

Static TaskingDynamic Tasking

Taskflow supports conditional tasking for you to make rapid control-flow decisions across dependent tasks to implement cycles and conditions that were otherwise difficult to do with existing tools.

Conditional Tasking

Taskflow is composable. You can create large parallel graphs through composition of modular and reusable blocks that are easier to optimize at an individual scope.

Taskflow Composition

Taskflow supports heterogeneous tasking for you to accelerate a wide range of scientific computing applications by harnessing the power of CPU-GPU collaborative computing.

Concurrent CPU-GPU Tasking

Taskflow provides visualization and tooling needed for profiling Taskflow programs.

Taskflow Profiler

We are committed to support trustworthy developments for both academic and industrial research projects in parallel computing. Check out Who is Using Taskflow and what our users say:

See a quick presentation and visit the documentation to learn more about Taskflow. Technical details can be referred to our IEEE TPDS paper.

Start Your First Taskflow Program

The following program (simple.cpp) creates four tasks ABC, and D, where A runs before B and C, and D runs after B and C. When A finishes, B and C can run in parallel.

#include <taskflow/taskflow.hpp>  // Taskflow is header-only

int main(){
  
  tf::Executor executor;
  tf::Taskflow taskflow;

  auto [A, B, C, D] = taskflow.emplace(  // create four tasks
    [] () { std::cout << "TaskA\n"; },
    [] () { std::cout << "TaskB\n"; },
    [] () { std::cout << "TaskC\n"; },
    [] () { std::cout << "TaskD\n"; } 
  );                                  
                                      
  A.precede(B, C);  // A runs before B and C
  D.succeed(B, C);  // D runs after  B and C
                                      
  executor.run(taskflow).wait(); 

  return 0;
}
 

Taskflow is header-only and there is no wrangle with installation. To compile the program, clone the Taskflow project and tell the compiler to include the headers.

~$ git clone https://github.com/taskflow/taskflow.git  # clone it only once
~$ g++ -std=c++17 examples/simple.cpp -I. -O2 -pthread -o simple
~$ ./simple
TaskA
TaskC 
TaskB 
TaskD
 

Visualize Your First Taskflow Program

Taskflow comes with a built-in profiler, TFProf, for you to profile and visualize taskflow programs in an easy-to-use web-based interface.

# run the program with the environment variable TF_ENABLE_PROFILER enabled
~$ TF_ENABLE_PROFILER=simple.json ./simple
~$ cat simple.json
[
{"executor":"0","data":[{"worker":0,"level":0,"data":[{"span":[172,186],"name":"0_0","type":"static"},{"span":[187,189],"name":"0_1","type":"static"}]},{"worker":2,"level":0,"data":[{"span":[93,164],"name":"2_0","type":"static"},{"span":[170,179],"name":"2_1","type":"static"}]}]}
]
# paste the profiling json data to https://taskflow.github.io/tfprof/
 

In addition to execution diagram, you can dump the graph to a DOT format and visualize it using a number of free GraphViz tools.

// dump the taskflow graph to a DOT format through std::cout
taskflow.dump(std::cout); 
 

Express Task Graph Parallelism

Taskflow empowers users with both static and dynamic task graph constructions to express end-to-end parallelism in a task graph that embeds in-graph control flow.

  1. Create a Subflow Graph
  2. Integrate Control Flow to a Task Graph
  3. Offload a Task to a GPU
  4. Compose Task Graphs
  5. Launch Asynchronous Tasks
  6. Execute a Taskflow
  7. Leverage Standard Parallel Algorithms

Create a Subflow Graph

Taskflow supports dynamic tasking for you to create a subflow graph from the execution of a task to perform dynamic parallelism. The following program spawns a task dependency graph parented at task B.

tf::Task A = taskflow.emplace([](){}).name("A");  
tf::Task C = taskflow.emplace([](){}).name("C");  
tf::Task D = taskflow.emplace([](){}).name("D");  

tf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { 
  tf::Task B1 = subflow.emplace([](){}).name("B1");  
  tf::Task B2 = subflow.emplace([](){}).name("B2");  
  tf::Task B3 = subflow.emplace([](){}).name("B3");  
  B3.succeed(B1, B2);  // B3 runs after B1 and B2
}).name("B");

A.precede(B, C);  // A runs before B and C
D.succeed(B, C);  // D runs after  B and C
 

Integrate Control Flow to a Task Graph

Taskflow supports conditional tasking for you to make rapid control-flow decisions across dependent tasks to implement cycles and conditions in an end-to-end task graph.

tf::Task init = taskflow.emplace([](){}).name("init");
tf::Task stop = taskflow.emplace([](){}).name("stop");

// creates a condition task that returns a random binary
tf::Task cond = taskflow.emplace(
  [](){ return std::rand() % 2; }
).name("cond");

init.precede(cond);

// creates a feedback loop {0: cond, 1: stop}
cond.precede(cond, stop);
 

Offload a Task to a GPU

Taskflow supports GPU tasking for you to accelerate a wide range of scientific computing applications by harnessing the power of CPU-GPU collaborative computing using CUDA.

__global__ void saxpy(size_t N, float alpha, float* dx, float* dy) {
  int i = blockIdx.x*blockDim.x + threadIdx.x;
  if (i < n) {
    y[i] = a*x[i] + y[i];
  }
}
tf::Task cudaflow = taskflow.emplace([&](tf::cudaFlow& cf) {

  // data copy tasks
  tf::cudaTask h2d_x = cf.copy(dx, hx.data(), N).name("h2d_x");
  tf::cudaTask h2d_y = cf.copy(dy, hy.data(), N).name("h2d_y");
  tf::cudaTask d2h_x = cf.copy(hx.data(), dx, N).name("d2h_x");
  tf::cudaTask d2h_y = cf.copy(hy.data(), dy, N).name("d2h_y");
  
  // kernel task with parameters to launch the saxpy kernel
  tf::cudaTask saxpy = cf.kernel(
    (N+255)/256, 256, 0, saxpy, N, 2.0f, dx, dy
  ).name("saxpy");

  saxpy.succeed(h2d_x, h2d_y)
       .precede(d2h_x, d2h_y);
}).name("cudaFlow");
 

Compose Task Graphs

Taskflow is composable. You can create large parallel graphs through composition of modular and reusable blocks that are easier to optimize at an individual scope.

tf::Taskflow f1, f2;

// create taskflow f1 of two tasks
tf::Task f1A = f1.emplace([]() { std::cout << "Task f1A\n"; })
                 .name("f1A");
tf::Task f1B = f1.emplace([]() { std::cout << "Task f1B\n"; })
                 .name("f1B");

// create taskflow f2 with one module task composed of f1
tf::Task f2A = f2.emplace([]() { std::cout << "Task f2A\n"; })
                 .name("f2A");
tf::Task f2B = f2.emplace([]() { std::cout << "Task f2B\n"; })
                 .name("f2B");
tf::Task f2C = f2.emplace([]() { std::cout << "Task f2C\n"; })
                 .name("f2C");

tf::Task f1_module_task = f2.composed_of(f1)
                            .name("module");

f1_module_task.succeed(f2A, f2B)
              .precede(f2C);
 

Launch Asynchronous Tasks

Taskflow supports asynchronous tasking. You can launch tasks asynchronously to dynamically explore task graph parallelism.

tf::Executor executor;

// create asynchronous tasks directly from an executor
std::future<int> future = executor.async([](){ 
  std::cout << "async task returns 1\n";
  return 1;
}); 
executor.silent_async([](){ std::cout << "async task does not return\n"; });

// create asynchronous tasks with dynamic dependencies
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
tf::AsyncTask D = executor.silent_dependent_async([](){ printf("D\n"); }, B, C);

executor.wait_for_all();
 

Execute a Taskflow

The executor provides several thread-safe methods to run a taskflow. You can run a taskflow once, multiple times, or until a stopping criteria is met. These methods are non-blocking with a tf::Future<void> return to let you query the execution status.

// runs the taskflow once
tf::Future<void> run_once = executor.run(taskflow); 

// wait on this run to finish
run_once.get();

// run the taskflow four times
executor.run_n(taskflow, 4);

// runs the taskflow five times
executor.run_until(taskflow, [counter=5](){ return --counter == 0; });

// block the executor until all submitted taskflows complete
executor.wait_for_all();
 

Leverage Standard Parallel Algorithms

Taskflow defines algorithms for you to quickly express common parallel patterns using standard C++ syntaxes, such as parallel iterations, parallel reductions, and parallel sort.

// standard parallel CPU algorithms
tf::Task task1 = taskflow.for_each( // assign each element to 100 in parallel
  first, last, [] (auto& i) { i = 100; }    
);
tf::Task task2 = taskflow.reduce(   // reduce a range of items in parallel
  first, last, init, [] (auto a, auto b) { return a + b; }
);
tf::Task task3 = taskflow.sort(     // sort a range of items in parallel
  first, last, [] (auto a, auto b) { return a < b; }
);

// standard parallel GPU algorithms
tf::cudaTask cuda1 = cudaflow.for_each( // assign each element to 100 on GPU
  dfirst, dlast, [] __device__ (auto i) { i = 100; }
);
tf::cudaTask cuda2 = cudaflow.reduce(   // reduce a range of items on GPU
  dfirst, dlast, init, [] __device__ (auto a, auto b) { return a + b; }
);
tf::cudaTask cuda3 = cudaflow.sort(     // sort a range of items on GPU
  dfirst, dlast, [] __device__ (auto a, auto b) { return a < b; }
);
 

Additionally, Taskflow provides composable graph building blocks for you to efficiently implement common parallel algorithms, such as parallel pipeline.

// create a pipeline to propagate five tokens through three serial stages
tf::Pipeline pl(num_parallel_lines,
  tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
    if(pf.token() == 5) {
      pf.stop();
    }
  }},
  tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
    printf("stage 2: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
  }},
  tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
    printf("stage 3: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
  }}
);
taskflow.composed_of(pl)
executor.run(taskflow).wait();
 

Supported Compilers

To use Taskflow, you only need a compiler that supports C++17:

  • GNU C++ Compiler at least v8.4 with -std=c++17
  • Clang C++ Compiler at least v6.0 with -std=c++17
  • Microsoft Visual Studio at least v19.27 with /std:c++17
  • AppleClang Xode Version at least v12.0 with -std=c++17
  • Nvidia CUDA Toolkit and Compiler (nvcc) at least v11.1 with -std=c++17
  • Intel C++ Compiler at least v19.0.1 with -std=c++17
  • Intel DPC++ Clang Compiler at least v13.0.0 with -std=c++17 and SYCL20

Taskflow works on Linux, Windows, and Mac OS X.

Learn More about Taskflow

Visit our project website and documentation to learn more about Taskflow. To get involved:

  • See release notes to stay up-to-date with newest versions
  • Read the step-by-step tutorial at cookbook
  • Submit an issue at GitHub issues
  • Find out our technical details at references
  • Watch our technical talks at YouTube
CppCon20 Tech TalkMUC++ Tech Talk

We are committed to support trustworthy developments for both academic and industrial research projects in parallel and heterogeneous computing. If you are using Taskflow, please cite the following paper we publised at 2021 IEEE TPDS:

More importantly, we appreciate all Taskflow contributors and the following organizations for sponsoring the Taskflow project!

    
     

License

Taskflow is licensed with the MIT License. You are completely free to re-distribute your work derived from Taskflow.

 

 

 

 

 

中文 | English Readme

CGraph 说明文档 

CGraph is a cross-platform Directed Acyclic Graph framework based on pure C++ without any 3rd-party dependencies.

You, with it, can build your own operators simply, and describe any running schedules as you need, such as dependence, parallelling, aggregation and so on. Some useful tools and plugins are also provide to improve your project.

Tutorials and contact information are show as follows. Please get in touch with us for free if you need more about this repository.

一. 简介

CGraph中文名为【色丶图】,是一套无任何第三方依赖的跨平台图流程执行框架。通过GPipeline(流水线)底层调度,实现了依赖元素依次顺序执行、非依赖元素并发执行的调度功能。

使用者只需继承GNode(节点)类,实现子类的run()方法,并根据需要设定依赖关系,即可实现任务的图化执行。还可以通过设定各种包含多节点信息的GGroup(组),自行控制图的条件判断、循环和并发执行逻辑。

项目提供了丰富的Param(参数)类型,用于不同应用场景下的数据互通。此外,还可以通过添加GAspect(切面)的方式,实现以上各种元素功能的横向扩展;通过引入GAdapter(适配器)对单个节点功能进行加强;或者通过添加GEvent(信号),丰富和优化执行逻辑。

CGraph Skeleton

详细功能介绍和用法,请参考 推荐阅读 中的文章内容。项目相关视频在B站持续更新中,欢迎观看交流和一键三连:

二. 编译说明

  • 本工程支持MacOSLinuxWindowsAndroid系统,无任何第三方依赖。默认使用C++11版本,推荐使用C++17版本,暂不支持C++11以下的版本

  • 使用CLion(推荐)作为IDE的开发者,打开CMakeLists.txt文件作为工程,即可编译通过

  • Windows环境中,使用Visual Studio(2013版或以上版本)作为IDE的开发者,安装cmake之后,输入以下指令,即可生成CGraph.sln文件

    $ git clone https://github.com/ChunelFeng/CGraph.git
    $ cd CGraph
    $ cmake . -Bbuild    # 在 build 文件夹下,生成对应的 CGraph.sln 文件
     
  • MacOS环境中,使用Xcode作为IDE的开发者,安装cmake之后,输入以下指令,即可生成CGraph.xcodeproj文件

    $ git clone https://github.com/ChunelFeng/CGraph.git
    $ cd CGraph
    $ mkdir build && cd build
    $ cmake .. -G Xcode    # 在 build 文件夹下,生成对应的 CGraph.xcodeproj 文件
     
  • Linux环境开发者,在命令行模式下,输入以下指令,即可编译通过

    $ git clone https://github.com/ChunelFeng/CGraph.git
    $ cd CGraph
    $ cmake . -Bbuild
    $ cd build
    $ make -j8
     
  • 提供online版本的编译调试环境,点击进入页面:CGraph env online ,通过github账号登录。进入后,输入以下指令,即可编译通过,并查看执行结果

    $ sudo apt-get install cmake -y          # 安装cmake
    $ ./CGraph-build.sh                      # 编译CGraph工程,生成的内容在同级/build/文件夹中
    $ ./build/tutorial/T00-HelloCGraph       # 运行第一个实例程序,并且在终端输出 Hello, CGraph.
     

三. 使用Demo

MyNode.h

#include "CGraph.h"

class MyNode1 : public CGraph::GNode {
public:
    CStatus run() override {
        printf("[%s], Sleep for 1 second ...\n", this->getName().c_str());
        CGRAPH_SLEEP_SECOND(1)
        return CStatus();
    }
};

class MyNode2 : public CGraph::GNode {
public:
    CStatus run() override {
        printf("[%s], Sleep for 2 second ...\n", this->getName().c_str());
        CGRAPH_SLEEP_SECOND(2)
        return CStatus();
    }
};
 

main.cpp

#include "MyNode.h"

using namespace CGraph;

int main() {
    /* 创建一个流水线,用于设定和执行流图信息 */
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr a, b, c, d = nullptr;

    /* 注册节点之间的依赖关系 */
    pipeline->registerGElement<MyNode1>(&a, {}, "nodeA");
    pipeline->registerGElement<MyNode2>(&b, {a}, "nodeB");
    pipeline->registerGElement<MyNode1>(&c, {a}, "nodeC");
    pipeline->registerGElement<MyNode2>(&d, {b, c}, "nodeD");

    /* 执行流图框架 */
    pipeline->process();
    GPipelineFactory::remove(pipeline);

    return 0;
}
 

CGraph Demo
如上图所示,图结构执行的时候,首先执行a节点。a节点执行完毕后,并行执行bc节点。bc节点全部执行完毕后,再执行d节点。

四. 推荐阅读

五. 关联项目

    • GraphANNS : Graph-based Approximate Nearest Neighbor Search Working off CGraph
    • CThreadPool : 一个简单好用、功能强大、性能优异、跨平台的C++线程池
    • taskflow : A General-purpose Parallel and Heterogeneous Task Programming System
    • awesome-cpp : A curated list of awesome C++ (or C) frameworks, libraries, resources, and shiny things. Inspired by awesome-... stuff.
    • awesome-workflow-engines : A curated list of awesome open source workflow engines
                 

标签:Task,taskflow,并行计算,&&,tf,CGraph,Taskflow,parallel
From: https://www.cnblogs.com/sinferwu/p/17738703.html

相关文章

  • 三维模型OBJ格式轻量化压缩并行计算处理方法浅析
    三维模型OBJ格式轻量化压缩并行计算处理方法浅析   三维模型的轻量化是指通过一系列技术和算法来减小三维模型的文件大小,以提高模型在计算机中的加载、渲染和传输效率。并行计算是利用多个计算单元同时执行任务,以加速计算过程的一种技术。在三维模型的OBJ格式轻量化中,可......
  • 并行计算中的线程和进程:原理与实践
    目录1.引言2.技术原理及概念2.1基本概念解释2.2技术原理介绍3.实现步骤与流程3.1准备工作:环境配置与依赖安装3.2核心模块实现3.3集成与测试4.应用示例与代码实现讲解4.1应用场景介绍4.2应用实例分析4.3核心代码实现4.4代码讲解说明5.优化与改进5.1性能优化并行计算......
  • 并行计算之绪论01
    一、绪论1.1基本概念加速比:表示加速效果。单个处理器运行花费时间/P个处理器运行花费时间;\(S=\frac{T(1)}{T(p)}\)效率:\(E=\frac{S}{p}=\frac{T(1)}{T(p)\timesp}\)开销:\(C=T(p)\timesp\)可扩展性:处理器数目增多时并行程序的行为;计算通信比:计算花费时间/处理器......
  • GPU技术在大规模计算和并行计算中的应用和挑战
    目录1.引言2.技术原理及概念3.实现步骤与流程4.应用示例与代码实现讲解5.优化与改进GPU技术在大规模计算和并行计算中的应用和挑战随着计算机硬件的不断发展和计算能力的提高,大规模计算和并行计算已经成为了人工智能和机器学习领域的重要研究方向。而GPU(图形处理器)......
  • MPP大规模并行计算数据库与分布式数据库的区别
    最近调研分布式TP数据库。结合公司使用的MPP数据库,一度感觉两者很像,随着分布式的深入研究,结合行内MPP数据库使用过正中遇到的问题,简单的总结一下分布式数据库与MPP数据库的区别。分布式数据库系统与并行数据库系统MPPDB有许多相似点,如都有用网络连接各个数据处理结点的特点。网络中......
  • 并行计算部分总结
    1。计算机的峰值为主频x4。2。计算机读取数组时,一次会读入一行,要最大限度的利用已读入的数据,减少频繁读写的次数。3。多线程内存共享,多进程需要消息传递来交换变量。4。利用管道在不同程序之间传递内容(可以是管道符|或mkfifomypipe)5。平均不同节点的计算量,尽量做到负载平衡。6......
  • 并行计算、分布式计算、集群计算和网格计算的介绍,以及主要有哪些区别?
    并行计算(ParallelComputing)并行计算或称平行计算是相对于串行计算来说的。并行计算(ParallelComputing)是指同时使用多种计算资源解决计算问题的过程。为执行并行计算,计算资源应包括一台配有多处理机(并行处理)的计算机、一个与网络相连的计算机专有编号,或者两者结合使用。......
  • 0001笔记【并行计算】CUDA在现代C++中如何运用?看这一个就够了
    目录SM(流多处理器)和板块(block)一个板块会被调度到一个SM上,直到执行结束常用函数cudaMalloc在显存上分配内存cudaMallocHost在主存上分配锁页内存cudaMemcpy在主存和显存之间拷贝数据cudaMallocManagerd统一内存优化时间依赖和空间依赖线程太多不行:防止寄存器打翻(registerspill)......
  • 基于时间和空间的大规模电动汽车入网网损调度 建立MISOCP模型,分时段优化,并行计算(实时
    基于时间和空间的大规模电动汽车入网网损调度建立MISOCP模型,分时段优化,并行计算(实时优化)。并对比了优化和未优化结果,验证了调度的有效性。考虑到电动汽车的机动性,市区可分为三类功能区:住宅区,商业区和办公区。白天,大多数电动汽车都停在办公室区域。到了晚上,几乎所有的电动汽车......
  • python并行计算demo,用于求0~n之间的素数之和
     想试试服务器的并行计算能力,就让cpu慢慢计算,计算0~n之间所有素数之和设置target为结尾,num_of_processors为进程数,即可开始跑如下所示frommultiprocessingimportP......