首页 > 其他分享 >协程杂记

协程杂记

时间:2024-04-09 23:22:53浏览次数:28  
标签:return ++ void state 杂记 协程 id

协程杂记

协程是什么

协程是一种协作式多任务模型,不同于线程的抢占式多任务模型。协程是由程序员自己控制的,可以在任意时刻挂起或者恢复一个协程,更适合于用来实现彼此熟悉的程序组件。
在通常使用线程的情景中,负责处理不同任务的线程之间存在着数据竞争,使用加锁可以解决问题,但其实并不符合我们的本意。以生产者消费者为例,我们希望消费者可以获取到生产者生产的数据,但是我们并不想要线程带来的额外开销(如加锁解锁)。协程的出现就解决了这个问题,生产者产生数据后,主动挂起,消费者用完数据后,主动切换到生产者。主要的改变就协程实际上变成了串行执行,但是却不会带来额外的开销,本质还是一个用户态的线程(所以想要高效的利用CPU还是要使用线程),这样两个相互依赖的任务不存在数据竞争,减少了维护同步的开销。

var q := new 队列

coroutine 生产者
    loop
        while q 不满载
            建立某些新产品
            向 q 增加这些产品 
        yield 消费者

coroutine 消费者
    loop
        while q 不空载
            从 q 移除某些产品
            使用这些产品
        yield 生产者

适用场景

协程适用于IO密集型任务,如网络请求,文件读写等。在这些任务中,CPU的利用率很低,大部分时间都在等待IO操作完成。使用线程的话,会造成大量的线程切换,而协程的切换是用户自己控制的,这种情况下协程的性能会更好。

协程通常有yield和resume两个操作,yield用于挂起当前协程,resume用于恢复一个挂起的协程,从上一个yield的地方继续执行。

偶然看到的特殊实现

循环展开

循环展开作为一种优化手段,通过将循环体内的代码复制多次、减少循环的次数,进而减少分支预测次数,提高程序的性能(详见csapp)。
下面是一个循环展开的例子,展开次数为 8 次。

send(to, from, count)
register short *to, *from;
register count;
{
    register n = count % 8;
    while (n-- > 0) {
        *to = *from++;
    }
    n = count / 8;
    if (n == 0) return;     
    do {
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
    } while (--n > 0)
}

达夫设备(Duff's device)是一种循环展开的特殊形式。
C标准switch的规定:在switch控制语句内,条件标号(case)可以出现在任意子语句之前,充作其前缀(像label一样是Labeled statements);若未加入break语句,则在switch语句根据条件判定,跳转到对应标号并开始执行后,控制流会无视其余条件标号限定,一直执行到switch嵌套语句的末尾。
因此可以实现一种抽象的循环展开

send(to, from, count)
register short *to, *from;
register count;
{
	register n = (count + 7) / 8;
	switch (count % 8) {
	case 0:	do { *to = *from++;
	case 7:		 *to = *from++;
	case 6:		 *to = *from++;
	case 5:		 *to = *from++;
	case 4:	     *to = *from++;
	case 3:      *to = *from++;
	case 2:      *to = *from++;
	case 1:      *to = *from++;
	        } while (--n > 0);
	}
}

为什么要扯到循环展开,达夫设备呢?因为协程的实现中,有一种特殊的实现方式,就是使用达夫设备来实现协程的切换。

实现

int function(void) {
    static int i, state = 0;
    switch (state) {
        case 0: // 函数开始执行
        for (i = 0; i < 10; i++) {
            state = 1; // 我们会回到 "case 1" 的地方
            return i;
            case 1:; // 从上一次返回之后的地方开始执行
        }
    }

使用宏隐藏细节

#define crBegin static int state=0; switch(state) { case 0:
// 使用 do ... while(0) 可以确保当 crReturn 位于 if ... else 之间时,不需要用大括号将其扩起来
#define crReturn(i,x) do { state=i; return x; case i:; } while (0)
#define crFinish }
int function(void) {
    static int i;
    crBegin;
    for (i = 0; i < 10; i++) {
        crReturn(1, i);
    }
    crFinish;
}

在这种规则下,函数主体要用CrBegin和CrFinish包围,多次调用要保持的变量是static。另外,由于case的label属性,crReturn的第一个参数不能不能重复,否则会报错duplicate case value。可以用__LINE__来生成唯一的标号。

#define crReturn(x) do { state=__LINE__; return x; \
                         case __LINE__:; } while (0)

一个生产者消费者的例子

// 读取用户输入的20个字符,每次读取一行
#include<iostream>
using namespace std;
#define crBegin static int scrLine = 0; switch(scrLine) { case 0:;
#define crReturn(z)     \
        do {\
            scrLine=__LINE__;\
            return (z); case __LINE__:;\
        } while (0)
#define crReturnV       \
        do {\
            scrLine=__LINE__;\
            return; case __LINE__:;\
        } while (0)
#define crFinishV }
#define crFinish(z) } return (z)
char* produce(void) {
    static int num = 0, numpre = 0;
    static char buf[100];
    char c;
    crBegin;
    while(num < 20){
        while((c=getchar())!='\n'){
            buf[num++]=c;
            if(num >= 20){
                break;
            }
        }
        buf[num] = 0;
        crReturn(buf+numpre);
        numpre = num;
    }
    crFinish(nullptr);
}

int consume(){
    char* p = produce();
    if(p == nullptr) return -1;
    cout << p << endl;
}
int main(){
    while(consume()!=-1);
}

没有必要使用协程宏把生产者消费者两段代码都重写了。这里只重写了produce,consume则作为 caller 存在。

以上代码都依赖于static变量,所以不适合多线程重入。
如果要在多线程环境下使用,需要将所有本地变量放在一个上下文结构体中,在需要使用协程的地方先创建ccrContext z = 0,并将地址&z作为参数传递给协程。具体实现见coroutine.h。

#define ccrContParam     void **ccrParam

#define ccrBeginContext  struct ccrContextTag { int ccrLine
#define ccrEndContext(x) } *x = (struct ccrContextTag *)*ccrParam

#define ccrBegin(x)      if(!x) {x= *ccrParam=malloc(sizeof(*x)); x->ccrLine=0;}\
                         if (x) switch(x->ccrLine) { case 0:;
#define ccrFinish(z)     } free(*ccrParam); *ccrParam=0; return (z)
#define ccrFinishV       } free(*ccrParam); *ccrParam=0; return

#define ccrReturn(z)     \
        do {\
            ((struct ccrContextTag *)*ccrParam)->ccrLine=__LINE__;\
            return (z); case __LINE__:;\
        } while (0)
#define ccrReturnV       \
        do {\
            ((struct ccrContextTag *)*ccrParam)->ccrLine=__LINE__;\
            return; case __LINE__:;\
        } while (0)

#define ccrStop(z)       do{ free(*ccrParam); *ccrParam=0; return (z); }while(0)
#define ccrStopV         do{ free(*ccrParam); *ccrParam=0; return; }while(0)

#define ccrContext       void *
#define ccrAbort(ctx)    do { free (ctx); ctx = 0; } while (0)

int ascending (ccrContParam) {
   ccrBeginContext;
   int i;
   ccrEndContext(foo);

   ccrBegin(foo);
   for (foo->i=0; foo->i<10; foo->i++) {
      ccrReturn(foo->i);
   }
   ccrFinish(-1);
}

int main(void) {
   ccrContext z = 0;
   do {
      printf("got number %d\n", ascending(&z));
   } while (z);
}

参考
Coroutines in C
完整实现coroutine.h

较易懂的实现

仿照xv6的用户态线程实验(将汇编上下文切换改为使用ucontext.h),实现的简单对称协程库,协程不共享栈。
主要提供了create、yield、join、release等接口。主要代码100行左右,个人认为可读性还是比较高的。

  • create:创建一个协程,传入函数
  • yield:挂起当前协程,对称的,所有协程都可使用
  • join:主线程等待所有协程结束(好像也不那么对称了)
  • release:释放主线程资源,由创建的协程交替执行
#include <ucontext.h>
#include <functional>
#include <iostream>
#include <memory>
#include <random>
#include <vector>
using namespace std;

void wrapfunc();
typedef std::function<void()> Task;
enum State {
    FREE,        // 槽位空闲
    RUNNABLE,    // 等待运行
    RUNNING,     // 正在运行
};
enum { STACK_SIZE = 4096, MAX_CO = 24 };
struct coroutine {
    char stack[STACK_SIZE];
    State state;
    ucontext_t ctx;
    Task task;
    bool isDead() { return state == FREE; }
};
coroutine coroutines[MAX_CO];
coroutine *current = nullptr;
size_t cnt = 0;

void init()
{
    current = coroutines;
    current->state = RUNNING;
}

void schedule()
{
    // 在协程表中从当前协程的下一个协程开始找到下一个可运行的协程
    coroutine *t, *next = nullptr;
    t = current + 1;
    for (int i = 0; i < MAX_CO; i++) {
        if (t >= coroutines + MAX_CO) {
            t = coroutines;
        }
        if (t->state == RUNNABLE) {
            next = t;
            break;
        }
        t++;
    }
    if (!next) {
        return;
    }
    if (current != next) {
        next->state = RUNNING;
        t = current;
        current = next;
        swapcontext(&t->ctx, &next->ctx);
    }
}

coroutine *create(Task task)
{
    coroutine *t = nullptr;
    for (int i = 0; i < MAX_CO; i++) {
        if (coroutines[i].state == FREE) {
            t = coroutines + i;
            break;
        }
    }
    if (!t) {
        cout << "no free coroutine" << endl;
        exit(1);
    }
    t->state = RUNNABLE;
    t->task = task;
    getcontext(&t->ctx);
    t->ctx.uc_link = NULL;               // 当前上下文结束后返回的上下文
    t->ctx.uc_stack.ss_sp = t->stack;    // 设置栈
    t->ctx.uc_stack.ss_size = STACK_SIZE;// 设置栈大小
    makecontext(&t->ctx, wrapfunc, 0);
    cnt++;
    return t;
}

void yield()
{
    current->state = RUNNABLE;
    schedule();
}

void join()
{
    while (cnt != 0) {
        yield();
    }
}

void release(){
    current->state = FREE;
    schedule();
}

void wrapfunc()
{
    current->task();
    current->state = FREE;
    cnt--;
    schedule();
}

void fun(int id)
{
    for (int i = 0; i < 5; i++) {
        cout << "coroutine " << id << ": " << i << endl;
        yield();
    }
}

int main()
{
    cout << "main start" << endl;
    init();
    create(std::bind(fun, 1));
    create(std::bind(fun, 2));
    // 全部协程结束后在main函数中退出
    join();
    // 释放协程资源,由创建的协程交替
    // release();
    cout << "main end" << endl;
    return 0;
}

非对称协程实现

参考了github上云风的实现,同样使用的是ucontext.h,区别是非对称、共享栈(虽然原版作者说是共享栈,但实际上yield时保存协程栈还是为协程新开了内存,而且需要在共享栈和协程保存的栈之间拷贝,只是分配的新内存相对较小且延迟分配)以及接口不同。
主要对原协程库的一些c代码做了封装和修改,使其更加易用。

  • create:创建一个协程,传入函数
  • yield:挂起当前协程,由callee调用
  • resume:恢复一个指定id的协程
  • dead:判断一个协程是否结束
  • destroy:销毁一个协程
#include <assert.h>
#include <functional>
#include <iostream>
#include <memory>
#include <random>
#include <string.h>
#include <ucontext.h>
#include <vector>

namespace coroutine
{
class CoScheduler;
class Coroutine
{
private:
    enum State { FREE, RUNNABLE, RUNNING, SUSPEND };
    typedef std::function<void()> Task;
    friend class CoScheduler;
    char *stack;
    State state;
    ucontext_t context;
    Task task;
    size_t sz;
    size_t cap;
    void saveStack(char *sk, size_t sksz)
    {
        char pos;
        char *top = sk + sksz;
        assert(sk < &pos);
        assert(&pos < top);
        if (cap < top - &pos) {
            cap = top - &pos;
            if (stack) {
                delete[] stack;
            }
            stack = new char[cap];
        }
        sz = top - &pos;
        memcpy(stack, &pos, sz);
    }

public:
    Coroutine(Task tk)
    {
        task = tk;
        stack = nullptr;
        sz = 0;
        cap = 0;
    }
    void release()
    {
        if (stack) {
            delete[] stack;
            stack = nullptr;
        }
        sz = 0;
        cap = 0;
        state = FREE;
    }
    ~Coroutine()
    {
        if (stack) {
            delete[] stack;
        }
    }
    bool dead() { return state == FREE; }
};

class CoScheduler
{
private:
    enum { STACK_SIZE = 4096, MAX_CO = 16 };
    char stack[STACK_SIZE];
    std::vector<Coroutine> coroutines;
    int curId;
    int cnt;
    ucontext_t main;
    CoScheduler()
    {
        cnt = 0;
        curId = -1;
        getcontext(&main);
    }
    ~CoScheduler()
    {
        for (size_t i = 0; i < coroutines.size(); i++) {
            if (coroutines[i].stack) {
                delete[] coroutines[i].stack;
            }
        }
    }

public:
    CoScheduler(const CoScheduler &) = delete;
    CoScheduler &operator=(const CoScheduler &) = delete;
    static CoScheduler &getInstance()
    {
        static CoScheduler sch;
        return sch;
    }
    int createCoroutine(Coroutine::Task tk)
    {
        if (cnt >= MAX_CO) {
            return -1;
        }
        int id = -1;
        for (size_t i = 0; i < coroutines.size(); i++) {
            if (coroutines[i].state == Coroutine::FREE) {
                coroutines[i].task = tk;
                id = i;
                break;
            }
        }
        if (id == -1) {
            coroutines.emplace_back(tk);
            id = coroutines.size() - 1;
        }
        cnt++;
        Coroutine &co = coroutines[id];
        co.state = Coroutine::RUNNABLE;
        getcontext(&co.context);
        co.context.uc_stack.ss_sp = stack;
        co.context.uc_stack.ss_size = STACK_SIZE;
        co.context.uc_link = &main;
        makecontext(&co.context, (void (*)(void)) & CoScheduler::wrapFunc, 2, this, id);
        return id;
    }
    void yield()
    {
        Coroutine &cur = coroutines[curId];
        cur.saveStack(stack, STACK_SIZE);
        cur.state = Coroutine::SUSPEND;
        swapcontext(&cur.context, &main);
    }
    void resume(size_t id)
    {
        if (id >= coroutines.size() || coroutines[id].state == Coroutine::FREE) {
            return;
        }
        if (id == curId) {
            return;
        }
        Coroutine &next = coroutines[id];
        Coroutine &cur = coroutines[curId];
        curId = id;
        if (next.state == Coroutine::SUSPEND) {
            memcpy(stack + STACK_SIZE - next.cap, next.stack, next.cap);
        }
        next.state = Coroutine::RUNNING;
        swapcontext(&main, &next.context);
    }
    void destroy(size_t id)
    {
        if (id >= coroutines.size() || coroutines[id].state == Coroutine::FREE) {
            return;
        }
        coroutines[id].release();
        cnt--;
    }
    static void wrapFunc(CoScheduler *sch, size_t id)
    {
        Coroutine &co = sch->coroutines[id];
        co.task();
        co.release();
        sch->cnt--;
        sch->curId = -1;
    }
    bool dead(size_t id) { return coroutines[id].dead(); }
    void run()
    {
        while (cnt > 0) {
        }
    }
};
CoScheduler &sch = CoScheduler::getInstance();
int create(std::function<void()> tk)
{
    return sch.createCoroutine(tk);
}
void resume(size_t id)
{
    sch.resume(id);
}
void yield()
{
    sch.yield();
}
bool dead(size_t id)
{
    return sch.dead(id);
}
void destroy(size_t id)
{
    sch.destroy(id);
}
} // namespace coroutine

void func(int id)
{
    for (int i = 0; i < 10; i++) {
        std::cout << "coroutine " << id << " : " << i << std::endl;
        coroutine::yield();
    }
}

void test()
{
    int id1 = coroutine::create(std::bind(func, 1));
    int id2 = coroutine::create(std::bind(func, 2));
    assert(id1 != -1);
    assert(id2 != -1);
    std::cout << "main start" << std::endl;
    while (!coroutine::dead(id1) && !coroutine::dead(id2)) {
        coroutine::resume(id1);
        coroutine::resume(id2);
    }

    int id3 = coroutine::create(std::bind(func, 3));
    assert(id3 != -1);
    int id4 = coroutine::create(std::bind(func, 4));
    assert(id4 != -1);
    coroutine::resume(id3);
    coroutine::destroy(id3);
    int id5 = coroutine::create(std::bind(func, 5));
    assert(id5 != -1);

    while (!coroutine::dead(id4) && !coroutine::dead(id5)) {
        coroutine::resume(id4);
        coroutine::resume(id5);
    }

    std::cout << "main end" << std::endl;
}
int main()
{
    test();
}

后续还要补充一下协程在各语言中的使用

标签:return,++,void,state,杂记,协程,id
From: https://www.cnblogs.com/wangerblog/p/18125104

相关文章

  • 协程操作
    协程操作一、asyncio模块asyncio模块是Python中实现异步的一个模块,该模块在Python3.4的时候发布async和await关键字在Python3.5中引入。因此,想要使用asyncio模块,建议Python解释器的版本不要低于Python3.5。二、事件循环所谓的事件循环,我们可以把它当作是一......
  • 协程理论
    协程理论一、单线程下的并发本节的主题是基于单线程来实现并发即只用一个主线程(很明显可利用的CPU只有一个)情况下实现并发为此我们需要先回顾下并发的本质:切换+保存状态当CPU正在运行一个任务会在两种情况下切走去执行其他的任务该任务发生了阻塞该任务计算的时间......
  • Python中协程(coroutine)详解
    一、协程和线程的比较及其适用场景1共用变量问题多线程中可能出现多个线程争抢变量,所以变量需要加锁;协程中任一时刻都只有一个线程,所以变量不需要加锁。但是协程虽然不像多线程争抢变量但仍是和多线程一样共用变量的,即共用变量在某处改变在另外一处引用时也会发生改变。2协......
  • 数据采集技术综合项目实战(协程式网络爬虫+数据预处理+数据可视化)附带详细步骤说明,干货
    数据采集部分:目标网址:https://item.jd.com/100066896338.html#none爬虫思路分析:1.确定采集目标:爬取“苹果15”的评论包括好评、差评、中评以及不同的评论对应的用户名、设备颜色、设备内存大小、版本号、评论发布时间等字段,共3000条以上的评论数目,如下图所示:2.查看评论来......
  • 数学杂记(技)
    经典组合数求和等于斐波那契等式\(\sum\limits_{i=0}^n\dbinom{n-i}{i}=f_{n+1}\),其中\(f\)为斐波那契数列。证法\(1\):\(\sum\limits_{i=0}^n\dbinom{n-i}{i}=\sum\limits_{i=0}^n[x^i](1+x)^{n-i}=[x^n]\sum\limits_{i=0}^nx^{n-i}(1+x)^{n-i}=[x^n]\dfrac{1}{1-x-x^2......
  • RN杂记
    1. getNativeAuthenticationWithType(type:Int32):Promise<boolean>;返回是Promise类型变量的函数:用一般方式用if取值if会读错,应该是因为Promise是个异步操作 可以用下面这种方式处理Promise返回值getNativeAuthenticationWithType(7)//假设7是类型参数......
  • lua的协程
      lua协程的创建通常是通过coroutine.create(f),其中f就是协程的主体程序,它必须是一个函数。coroutine.create返回一个类型为thread(lua的8种内建类型之一)的变量。------Createsanewcoroutine,withbody`f`.`f`mustbeaLuafunction.Returns---thisnewcorouti......
  • 协程中Flow的一些特性(冷流,流的连续型、构建器、上下文、指定协程中收集流、流的取消)
    一、冷流Flow是一种类似序列的冷流,flow构建器中的代码知道流被收集的时候才运行。惰性生成:冷流只有在被订阅时才会开始生成数据。在订阅之前,它不会执行任何操作,也不会产生任何数据项。简单来讲就是现学现用,什么时候使用什么时候才调用。比如使用collect就是启动的标志。......
  • 协程&异步编程
    协程,也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行协程一般应用在有IO操作的程序中,因为协程可以利用IO等待的时间去执行一些其他的代码,从而提升代码执行效率。async事件循环事件循环,可以理解为while循环,在周期性......
  • 在Python中如何使用协程进行并发操作
    在Python中使用协程进行并发操作是一种高效的方式来处理I/O密集型任务或者在单个Python程序内部执行多个操作。本文将详细介绍如何在Python中使用协程进行并发操作,包括协程的基本概念、如何创建和运行协程、如何使用任务来管理多个协程,以及如何利用协程进行并发网络请求等。最......