首页 > 其他分享 >Dlang 并行化

Dlang 并行化

时间:2023-06-28 22:11:06浏览次数:31  
标签:Dlang int 并行 void worker Tid message main

Dlang 并行化

好难受,dlang 生态太差,没办法,学了半天才明白。

我尽量以精炼的语言解释。

采用 定义,例子(代码),解释 的步骤讲解。

所以你可能看到很多代码,一点解释……

我会省略一些 import,让代码短一些

目录

parallelism 并行

感觉好废物,这一小部分了解即可。

这部分只需要会 parallelmap & amap 其实就差不多了。

介绍比较实用的几种方法。

parallel 迭代

foreach (i; parallel(range, work_uint_size = 100)) {
    // do something here
}

其中 work_unit_size 表示最多同时运行的数量。

例子

import std.stdio, std.parallelism;
import core.thread;

struct Producer {
    void produce() {
        Thread.sleep(1.seconds);

        writeln("Process +1");
    }
};

void main() {
    auto prods = new Producer[](10);

    foreach (prod; parallel(prods)) {
        prod.produce();
    }
}

Task

创建任务:

auto theTask = task!anOperation(arguments);
// or
auto theTask = task(&someFunction, parameters...)

运行任务:theTask.executeInNewThread()

查看是否完成:if (theTask.done) { ... }

获取结果:auto result = theTask.yeildForce()


asyncBuf

感觉没啥用。

并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……

例子:

struct Producer {
    int i, total;

    bool empty() const {
        return total <= i;
    }

    int front() const {
        return i;
    }

    void popFront() {
        writefln("Producing product ID: %d", i);
        Thread.sleep(1.seconds / 2);
        ++i;
    }
};

void main() {
    auto prods = Producer(0, 10);
    foreach (prod; taskPool.asyncBuf(prods, 3)) {
        writef("Got product id: %d\n", prod);
        Thread.sleep(1.seconds);
        writeln("Used product...");
    }
}

map & amap

先看例子:

int increase(int x) {
    Thread.sleep(500.msecs);
    return x + 3;
}

void main() {
    int[] nums;
    foreach (i; 0 .. 10) {
        nums ~= i;
    }

    // auto results = taskPool.map!increase(nums);
    auto results = taskPool.amap!increase(nums);
    foreach (result; results) {
        writeln(result);
    }
}

可以类比 python 中的 map

两者的区别:

  • map 可以指定同时运行的数量,而 amap 是有多少运行多少。

  • map 会一定程度上按顺序执行,而 amap 并不是顺序执行,它依靠 RandomAccessRange,也就是随机顺序执行。


消息并发

我不知道怎么翻译,反正就是 Message Passing Concurrency

核心方法: spawn (唤起)

我们可以形象的认为,spawn 方法可以唤起一个新的工人(线程)来为我们工作。

并且这个工人与主线程是分开的(先看代码后面解释):

import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (worker) in ", thisTid);

    }

}
void main() {
    Tid myWorkerTid = spawn(&worker);
    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main) in ", thisTid);

    }

    writeln("main is done!");
}

最终输出:

0 (main) in Tid(7f0eb19bc0b0)
0 (worker) in Tid(7f0eb19bc000)
1 (main) in Tid(7f0eb19bc0b0)
2 (main) in Tid(7f0eb19bc0b0)
1 (worker) in Tid(7f0eb19bc000)
3 (main) in Tid(7f0eb19bc0b0)
2 (worker) in Tid(7f0eb19bc000)
4 (main) in Tid(7f0eb19bc0b0)
main is done!
3 (worker) in Tid(7f0eb19bc000)
4 (worker) in Tid(7f0eb19bc000)

实际输出可能略有差异。

解释

  • spawn(&worker) 唤起了一个新的线程运行 worker 函数,并返回了新的线程的 id 是一个结构体 Tid

  • thisTid 类似于一个宏,用于获取当前所在线程的 id


发送消息

先看代码后解释:

void worker() {
    int value = 0;
    while (value >= 0) {
        value = receiveOnly!int();
        double result = cast(double)value / 7;
        ownerTid.send(result);
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    foreach (val; 0 .. 10) {
        myWorker.send(val);
        double result = receiveOnly!double();
        writefln("Send %s got %s", val, result);
    }

    myWorker.send(-1); // terminate worker process
}

最终输出:

Send 0 got 0
Send 1 got 0.142857
Send 2 got 0.285714
Send 3 got 0.428571
Send 4 got 0.571429
Send 5 got 0.714286
Send 6 got 0.857143
Send 7 got 1
Send 8 got 1.14286
Send 9 got 1.28571

解释

  • ownerTid 类似于一个宏,用于取得唤醒自己的线程的 Tid,从而发送消息。

  • Tid.send(...) 可以向 Tid 代表的那个线程发送一条消息。

    • 如果同时要发送多个东西,在发送的地方是 Tid.send(a, b, c, ...)

    • 在接受的地方要变化为 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最终得到的是一个 tuple,可以通过下标访问。

  • receiveOnly!type() 表示只接受类型为 type 的消息。

  • 最后 myWorker.send(-1) 是根据代码逻辑结束的,并不属于通法。

如果我们需要更灵活的接受方法怎么办?

void workerFunc() {
    bool isDone = false;
    while (!isDone) {
        void intHandler(int message) {
            writeln("handling int message: ", message);

            if (message == -1) {
                writeln("exiting");
                isDone = true;
            }
        }

        void stringHandler(string message) {
            writeln("handling string message: ", message);
        }
        
        receive(&intHandler, &stringHandler);
    }    
}

我们可以指定多种 Handler 以处理不同的数据类型。利用 receive 注册 到处理类型消息的函数中。


更优雅的方式

处理更多的类型:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        receive(
            (int message) {
                writeln("int message ", message);
            },

            (string message) {
                writeln("string message", message);
            },

            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Unexpected message: ", message);
            }
        );
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    myWorker.send(10.1);
    myWorker.send(Exit());
}

主要是使用了匿名函数……

解释

  • 利用 std.variant.Variant 以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成 Variant

超时接受

我们可以定一个超时时间,超过这个时间就直接返回。

先看代码:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        bool received = receiveTimeout(600.msecs,
            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Some message: ", message);
            }
        );
        if (!received) {
            writeln("no message yet...");
        }
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    Thread.sleep(1.seconds);
    myWorker.send(10.1);
    myWorker.send(Exit());
}

最终输出

Some message: 10
Some message: hello
no message yet...
Some message: 10.1
Exit message

解释

  • receiveTimeout 只比 recieve 多了一个参数,用于指定超时时间。

  • 返回一个 bool 变量,如果为 false 则没有接收到任何消息。


等待所有线程结束thread_joinAll()

一般来说放在需要放的地方……即可。


数据共享

终于讲到这里了。

我们先考虑一个程序:

import std.stdio;
import std.concurrency;
import core.thread;

int variable;

void printInfo(string message) {
    writefln("%s: %s (@%s)", message, variable, &variable);
}

void worker() {
    variable = 42;
    printInfo("Before the worker is terminated");
}

void main() {
    spawn(&worker);
    thread_joinAll();
    printInfo("After the worker is terminated");
}

其输出是这样的:

Before the worker is terminated: 42 (@7F308C88C530)
After the worker is terminated: 0 (@7F308C98D730)

可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。

此时我们只需要修改:

shared int variable;

即可。

实际上写为 shared(int) variable; 会更标准,但是好麻烦……

当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。


Data Race

数据竞争是一个很常见的问题。

例子

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        *i = *i + 1;
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        spawn(&worker, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望输出 2000000,但是实际输出可能远小于此。

所以我们要考虑同步:

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

解释

  • synchronized 会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。

  • 有些时候,synchronized 会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。

  • synchronized 创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。


共用锁

void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i - 1;
        }
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        if (id & 1) spawn(&increase, &i);
        else spawn(&decrese, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望输出 0 但是实际输出……不知道。所以我们需要共用锁:

synchronized (lock_object) {
    // ...
}

修改后的代码

class Lock {}
shared Lock lock = new Lock();

void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i - 1;
        }
    }
}

现在就可以得到正确的答案了。


同步类

我们可以使用 synchronized 修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied

synchronized class Cls {
    void func() {
        // ...
    }
}

上面的等价于:

class Cls {
    void func() {
        synchronized (this) {
            // ...
        }
    }
}

同步初始化

我们考虑这份代码:

static this() {
    writeln("executing static this()");
}

void worker() {
}
void main() {
    spawn(&worker);
    thread_joinAll();
}

最终会输出两次 executing static this()

如果我们修改为 shared static this() { ... },那么最终只会输出一次。


原子操作

需要用到 core.atomic 库。

有代码:

atomic!"+="(var, x);
atomic!"-="(var, x);
// ... like *= /= ^= ...

这些都是原子操作。

有方法:

shared(int) *value;
bool is_mutated = cas(value, currentValue, newValue);

如果返回 true,那么值会改变,否则没有。

原子操作一般来说快于 synchronized

同时,原子操作也可以作用于结构体上,这里不作为讲解。

更多操作可以参考标准库:

  • core.sync.barrier

  • core.sync.condition

  • core.sync.config

  • core.sync.exception

  • core.sync.mutex

  • core.sync.rwmutex

  • core.sync.semaphore

标签:Dlang,int,并行,void,worker,Tid,message,main
From: https://www.cnblogs.com/jeefy/p/17512683.html

相关文章

  • 合并行的单元格 EXTJS
    在ExtJS4中,如何合并行的单元格,已经选取的时候只能选择某一列,期望的效果如下:在ExtJS中,合并表头的列有现成方案,但是合并行单元格不是extjs的现有功能,这个需要底层扩展,也就是使用table的跨行实现。而ExtJS7以及新版本和ExtJS4在最底层的Grid组成上又有差别,所以不同......
  • m基于FPGA的数据串并并串转换系统verilog实现,包含testbench,可以配置并行数量
    1.算法仿真效果 本系统进行了两个平台的开发,分别是: Vivado2019.2 Quartusii18.0+ModelSim-Altera6.6d StarterEdition 其中Vivado2019.2仿真结果如下: 分别进行2路,4路,8路,16路并行串行转换      Quartusii18.0+ModelSim-Altera6.6d Starter......
  • Dlang 与 C 语言交互(二)
    Dlang与C语言交互(二)随着需求不断增加,发现好像需要更多的东西了。在官网上找不到资料,四处拼凑才有了本文的分享。上一文(DLang与C语言交互(一)-jeefy-博客园)中说了非常简单了例子。本文试着向更高级的方法拓展。文章链接(防止机器搬运):https://www.cnblogs.com/jeefy/p/1......
  • 前后端分离,前端和后端是并行开发吗?还是前端先开发?后端在开发时能不能先看到项目前端
    前后端分离并不只是开发模式,而是web应用的一种架构模式。在开发阶段,前后端工程师约定好数据交互接口,实现并行开发和测试;在运行阶段前后端分离模式需要对web应用进行分离部署,前后端之前使用HTTP或者其他协议进行交互请求。并行开发:开发前,前后端定义接口规范定义好接口后,后端......
  • DLang 与 C 语言交互
    DLang与C语言交互很难受的是,这部分的文档太少了,根本没有教程向的文章。所以我写了此文以做分享。本文原址链接(防止机器搬运):https://www.cnblogs.com/jeefy/p/17501476.html阅读提示:请保证如下条件:会基本C语言使用,以及其编译命令。会基本D语言使用,以及其编译命令......
  • DLang 与 C 语言交互
    DLang与C语言交互很难受的是,这部分的文档太少了,根本没有教程向的文章。所以我写了此文以做分享。本文原址链接(防止机器搬运):https://www.cnblogs.com/jeefy/p/17499441.html阅读提示:请保证如下条件:会基本C语言使用,以及其编译命令。会基本D语言使用,以及其编译命令......
  • 解放计算力:使用并行处理提升python for循环速度
    Python是一门功能强大的编程语言,但在处理大规模数据或复杂计算任务时,性能可能成为一个瓶颈。幸运的是,Python提供了多种方法来提升性能,其中之一是利用并行处理来加速循环操作。本文将介绍如何使用并行处理技术来优化for循环,从而提高Python程序的执行速度。我们将讨论并行处......
  • 信息存储的层次与并行技术
    Cache存储体系功能操作的实现物理地址Cache地址变换这里首先要说明的是:主要是记住各个映像规则关系和变化方法同时根据映像规则需要能够写出主存地址,Cache地址,映像表的格式《全相联》 从图中我们可以看出相联度为Cb所谓相联度即......
  • 并行计算中的线程和进程:原理与实践
    目录1.引言2.技术原理及概念2.1基本概念解释2.2技术原理介绍3.实现步骤与流程3.1准备工作:环境配置与依赖安装3.2核心模块实现3.3集成与测试4.应用示例与代码实现讲解4.1应用场景介绍4.2应用实例分析4.3核心代码实现4.4代码讲解说明5.优化与改进5.1性能优化并行计算......
  • 并行的常见问题和注意事项
    关于Oracle中的并行,可以说是一把双刃剑,用得好,可以充分利用系统资源,提升数据库的处理能力,用得不好,可能会适得其反。并行的基本使用方法,对于大部分SQL开发者和DBA来说,并行的一些最基本的使用方法还没有完全掌握,为此老虎刘老师特意写了一篇文章《关于parallel(并行)的几个基本常识》,着重......