协程杂记
协程是什么
协程是一种协作式多任务模型,不同于线程的抢占式多任务模型。协程是由程序员自己控制的,可以在任意时刻挂起或者恢复一个协程,更适合于用来实现彼此熟悉的程序组件。
在通常使用线程的情景中,负责处理不同任务的线程之间存在着数据竞争,使用加锁可以解决问题,但其实并不符合我们的本意。以生产者消费者为例,我们希望消费者可以获取到生产者生产的数据,但是我们并不想要线程带来的额外开销(如加锁解锁)。协程的出现就解决了这个问题,生产者产生数据后,主动挂起,消费者用完数据后,主动切换到生产者。主要的改变就协程实际上变成了串行执行,但是却不会带来额外的开销,本质还是一个用户态的线程(所以想要高效的利用CPU还是要使用线程),这样两个相互依赖的任务不存在数据竞争,减少了维护同步的开销。
var q := new 队列
coroutine 生产者
loop
while q 不满载
建立某些新产品
向 q 增加这些产品
yield 消费者
coroutine 消费者
loop
while q 不空载
从 q 移除某些产品
使用这些产品
yield 生产者
适用场景
协程适用于IO密集型任务,如网络请求,文件读写等。在这些任务中,CPU的利用率很低,大部分时间都在等待IO操作完成。使用线程的话,会造成大量的线程切换,而协程的切换是用户自己控制的,这种情况下协程的性能会更好。
协程通常有yield和resume两个操作,yield用于挂起当前协程,resume用于恢复一个挂起的协程,从上一个yield的地方继续执行。
偶然看到的特殊实现
循环展开
循环展开作为一种优化手段,通过将循环体内的代码复制多次、减少循环的次数,进而减少分支预测次数,提高程序的性能(详见csapp)。
下面是一个循环展开的例子,展开次数为 8 次。
send(to, from, count)
register short *to, *from;
register count;
{
register n = count % 8;
while (n-- > 0) {
*to = *from++;
}
n = count / 8;
if (n == 0) return;
do {
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
} while (--n > 0)
}
达夫设备(Duff's device)是一种循环展开的特殊形式。
C标准switch的规定:在switch控制语句内,条件标号(case)可以出现在任意子语句之前,充作其前缀(像label一样是Labeled statements);若未加入break语句,则在switch语句根据条件判定,跳转到对应标号并开始执行后,控制流会无视其余条件标号限定,一直执行到switch嵌套语句的末尾。
因此可以实现一种抽象的循环展开
send(to, from, count)
register short *to, *from;
register count;
{
register n = (count + 7) / 8;
switch (count % 8) {
case 0: do { *to = *from++;
case 7: *to = *from++;
case 6: *to = *from++;
case 5: *to = *from++;
case 4: *to = *from++;
case 3: *to = *from++;
case 2: *to = *from++;
case 1: *to = *from++;
} while (--n > 0);
}
}
为什么要扯到循环展开,达夫设备呢?因为协程的实现中,有一种特殊的实现方式,就是使用达夫设备来实现协程的切换。
实现
int function(void) {
static int i, state = 0;
switch (state) {
case 0: // 函数开始执行
for (i = 0; i < 10; i++) {
state = 1; // 我们会回到 "case 1" 的地方
return i;
case 1:; // 从上一次返回之后的地方开始执行
}
}
使用宏隐藏细节
#define crBegin static int state=0; switch(state) { case 0:
// 使用 do ... while(0) 可以确保当 crReturn 位于 if ... else 之间时,不需要用大括号将其扩起来
#define crReturn(i,x) do { state=i; return x; case i:; } while (0)
#define crFinish }
int function(void) {
static int i;
crBegin;
for (i = 0; i < 10; i++) {
crReturn(1, i);
}
crFinish;
}
在这种规则下,函数主体要用CrBegin和CrFinish包围,多次调用要保持的变量是static。另外,由于case的label属性,crReturn的第一个参数不能不能重复,否则会报错duplicate case value。可以用__LINE__来生成唯一的标号。
#define crReturn(x) do { state=__LINE__; return x; \
case __LINE__:; } while (0)
一个生产者消费者的例子
// 读取用户输入的20个字符,每次读取一行
#include<iostream>
using namespace std;
#define crBegin static int scrLine = 0; switch(scrLine) { case 0:;
#define crReturn(z) \
do {\
scrLine=__LINE__;\
return (z); case __LINE__:;\
} while (0)
#define crReturnV \
do {\
scrLine=__LINE__;\
return; case __LINE__:;\
} while (0)
#define crFinishV }
#define crFinish(z) } return (z)
char* produce(void) {
static int num = 0, numpre = 0;
static char buf[100];
char c;
crBegin;
while(num < 20){
while((c=getchar())!='\n'){
buf[num++]=c;
if(num >= 20){
break;
}
}
buf[num] = 0;
crReturn(buf+numpre);
numpre = num;
}
crFinish(nullptr);
}
int consume(){
char* p = produce();
if(p == nullptr) return -1;
cout << p << endl;
}
int main(){
while(consume()!=-1);
}
没有必要使用协程宏把生产者消费者两段代码都重写了。这里只重写了produce,consume则作为 caller 存在。
以上代码都依赖于static变量,所以不适合多线程重入。
如果要在多线程环境下使用,需要将所有本地变量放在一个上下文结构体中,在需要使用协程的地方先创建ccrContext z = 0,并将地址&z作为参数传递给协程。具体实现见coroutine.h。
#define ccrContParam void **ccrParam
#define ccrBeginContext struct ccrContextTag { int ccrLine
#define ccrEndContext(x) } *x = (struct ccrContextTag *)*ccrParam
#define ccrBegin(x) if(!x) {x= *ccrParam=malloc(sizeof(*x)); x->ccrLine=0;}\
if (x) switch(x->ccrLine) { case 0:;
#define ccrFinish(z) } free(*ccrParam); *ccrParam=0; return (z)
#define ccrFinishV } free(*ccrParam); *ccrParam=0; return
#define ccrReturn(z) \
do {\
((struct ccrContextTag *)*ccrParam)->ccrLine=__LINE__;\
return (z); case __LINE__:;\
} while (0)
#define ccrReturnV \
do {\
((struct ccrContextTag *)*ccrParam)->ccrLine=__LINE__;\
return; case __LINE__:;\
} while (0)
#define ccrStop(z) do{ free(*ccrParam); *ccrParam=0; return (z); }while(0)
#define ccrStopV do{ free(*ccrParam); *ccrParam=0; return; }while(0)
#define ccrContext void *
#define ccrAbort(ctx) do { free (ctx); ctx = 0; } while (0)
int ascending (ccrContParam) {
ccrBeginContext;
int i;
ccrEndContext(foo);
ccrBegin(foo);
for (foo->i=0; foo->i<10; foo->i++) {
ccrReturn(foo->i);
}
ccrFinish(-1);
}
int main(void) {
ccrContext z = 0;
do {
printf("got number %d\n", ascending(&z));
} while (z);
}
参考
Coroutines in C
完整实现coroutine.h
较易懂的实现
仿照xv6的用户态线程实验(将汇编上下文切换改为使用ucontext.h),实现的简单对称协程库,协程不共享栈。
主要提供了create、yield、join、release等接口。主要代码100行左右,个人认为可读性还是比较高的。
- create:创建一个协程,传入函数
- yield:挂起当前协程,对称的,所有协程都可使用
- join:主线程等待所有协程结束(好像也不那么对称了)
- release:释放主线程资源,由创建的协程交替执行
#include <ucontext.h>
#include <functional>
#include <iostream>
#include <memory>
#include <random>
#include <vector>
using namespace std;
void wrapfunc();
typedef std::function<void()> Task;
enum State {
FREE, // 槽位空闲
RUNNABLE, // 等待运行
RUNNING, // 正在运行
};
enum { STACK_SIZE = 4096, MAX_CO = 24 };
struct coroutine {
char stack[STACK_SIZE];
State state;
ucontext_t ctx;
Task task;
bool isDead() { return state == FREE; }
};
coroutine coroutines[MAX_CO];
coroutine *current = nullptr;
size_t cnt = 0;
void init()
{
current = coroutines;
current->state = RUNNING;
}
void schedule()
{
// 在协程表中从当前协程的下一个协程开始找到下一个可运行的协程
coroutine *t, *next = nullptr;
t = current + 1;
for (int i = 0; i < MAX_CO; i++) {
if (t >= coroutines + MAX_CO) {
t = coroutines;
}
if (t->state == RUNNABLE) {
next = t;
break;
}
t++;
}
if (!next) {
return;
}
if (current != next) {
next->state = RUNNING;
t = current;
current = next;
swapcontext(&t->ctx, &next->ctx);
}
}
coroutine *create(Task task)
{
coroutine *t = nullptr;
for (int i = 0; i < MAX_CO; i++) {
if (coroutines[i].state == FREE) {
t = coroutines + i;
break;
}
}
if (!t) {
cout << "no free coroutine" << endl;
exit(1);
}
t->state = RUNNABLE;
t->task = task;
getcontext(&t->ctx);
t->ctx.uc_link = NULL; // 当前上下文结束后返回的上下文
t->ctx.uc_stack.ss_sp = t->stack; // 设置栈
t->ctx.uc_stack.ss_size = STACK_SIZE;// 设置栈大小
makecontext(&t->ctx, wrapfunc, 0);
cnt++;
return t;
}
void yield()
{
current->state = RUNNABLE;
schedule();
}
void join()
{
while (cnt != 0) {
yield();
}
}
void release(){
current->state = FREE;
schedule();
}
void wrapfunc()
{
current->task();
current->state = FREE;
cnt--;
schedule();
}
void fun(int id)
{
for (int i = 0; i < 5; i++) {
cout << "coroutine " << id << ": " << i << endl;
yield();
}
}
int main()
{
cout << "main start" << endl;
init();
create(std::bind(fun, 1));
create(std::bind(fun, 2));
// 全部协程结束后在main函数中退出
join();
// 释放协程资源,由创建的协程交替
// release();
cout << "main end" << endl;
return 0;
}
非对称协程实现
参考了github上云风的实现,同样使用的是ucontext.h,区别是非对称、共享栈(虽然原版作者说是共享栈,但实际上yield时保存协程栈还是为协程新开了内存,而且需要在共享栈和协程保存的栈之间拷贝,只是分配的新内存相对较小且延迟分配)以及接口不同。
主要对原协程库的一些c代码做了封装和修改,使其更加易用。
- create:创建一个协程,传入函数
- yield:挂起当前协程,由callee调用
- resume:恢复一个指定id的协程
- dead:判断一个协程是否结束
- destroy:销毁一个协程
#include <assert.h>
#include <functional>
#include <iostream>
#include <memory>
#include <random>
#include <string.h>
#include <ucontext.h>
#include <vector>
namespace coroutine
{
class CoScheduler;
class Coroutine
{
private:
enum State { FREE, RUNNABLE, RUNNING, SUSPEND };
typedef std::function<void()> Task;
friend class CoScheduler;
char *stack;
State state;
ucontext_t context;
Task task;
size_t sz;
size_t cap;
void saveStack(char *sk, size_t sksz)
{
char pos;
char *top = sk + sksz;
assert(sk < &pos);
assert(&pos < top);
if (cap < top - &pos) {
cap = top - &pos;
if (stack) {
delete[] stack;
}
stack = new char[cap];
}
sz = top - &pos;
memcpy(stack, &pos, sz);
}
public:
Coroutine(Task tk)
{
task = tk;
stack = nullptr;
sz = 0;
cap = 0;
}
void release()
{
if (stack) {
delete[] stack;
stack = nullptr;
}
sz = 0;
cap = 0;
state = FREE;
}
~Coroutine()
{
if (stack) {
delete[] stack;
}
}
bool dead() { return state == FREE; }
};
class CoScheduler
{
private:
enum { STACK_SIZE = 4096, MAX_CO = 16 };
char stack[STACK_SIZE];
std::vector<Coroutine> coroutines;
int curId;
int cnt;
ucontext_t main;
CoScheduler()
{
cnt = 0;
curId = -1;
getcontext(&main);
}
~CoScheduler()
{
for (size_t i = 0; i < coroutines.size(); i++) {
if (coroutines[i].stack) {
delete[] coroutines[i].stack;
}
}
}
public:
CoScheduler(const CoScheduler &) = delete;
CoScheduler &operator=(const CoScheduler &) = delete;
static CoScheduler &getInstance()
{
static CoScheduler sch;
return sch;
}
int createCoroutine(Coroutine::Task tk)
{
if (cnt >= MAX_CO) {
return -1;
}
int id = -1;
for (size_t i = 0; i < coroutines.size(); i++) {
if (coroutines[i].state == Coroutine::FREE) {
coroutines[i].task = tk;
id = i;
break;
}
}
if (id == -1) {
coroutines.emplace_back(tk);
id = coroutines.size() - 1;
}
cnt++;
Coroutine &co = coroutines[id];
co.state = Coroutine::RUNNABLE;
getcontext(&co.context);
co.context.uc_stack.ss_sp = stack;
co.context.uc_stack.ss_size = STACK_SIZE;
co.context.uc_link = &main;
makecontext(&co.context, (void (*)(void)) & CoScheduler::wrapFunc, 2, this, id);
return id;
}
void yield()
{
Coroutine &cur = coroutines[curId];
cur.saveStack(stack, STACK_SIZE);
cur.state = Coroutine::SUSPEND;
swapcontext(&cur.context, &main);
}
void resume(size_t id)
{
if (id >= coroutines.size() || coroutines[id].state == Coroutine::FREE) {
return;
}
if (id == curId) {
return;
}
Coroutine &next = coroutines[id];
Coroutine &cur = coroutines[curId];
curId = id;
if (next.state == Coroutine::SUSPEND) {
memcpy(stack + STACK_SIZE - next.cap, next.stack, next.cap);
}
next.state = Coroutine::RUNNING;
swapcontext(&main, &next.context);
}
void destroy(size_t id)
{
if (id >= coroutines.size() || coroutines[id].state == Coroutine::FREE) {
return;
}
coroutines[id].release();
cnt--;
}
static void wrapFunc(CoScheduler *sch, size_t id)
{
Coroutine &co = sch->coroutines[id];
co.task();
co.release();
sch->cnt--;
sch->curId = -1;
}
bool dead(size_t id) { return coroutines[id].dead(); }
void run()
{
while (cnt > 0) {
}
}
};
CoScheduler &sch = CoScheduler::getInstance();
int create(std::function<void()> tk)
{
return sch.createCoroutine(tk);
}
void resume(size_t id)
{
sch.resume(id);
}
void yield()
{
sch.yield();
}
bool dead(size_t id)
{
return sch.dead(id);
}
void destroy(size_t id)
{
sch.destroy(id);
}
} // namespace coroutine
void func(int id)
{
for (int i = 0; i < 10; i++) {
std::cout << "coroutine " << id << " : " << i << std::endl;
coroutine::yield();
}
}
void test()
{
int id1 = coroutine::create(std::bind(func, 1));
int id2 = coroutine::create(std::bind(func, 2));
assert(id1 != -1);
assert(id2 != -1);
std::cout << "main start" << std::endl;
while (!coroutine::dead(id1) && !coroutine::dead(id2)) {
coroutine::resume(id1);
coroutine::resume(id2);
}
int id3 = coroutine::create(std::bind(func, 3));
assert(id3 != -1);
int id4 = coroutine::create(std::bind(func, 4));
assert(id4 != -1);
coroutine::resume(id3);
coroutine::destroy(id3);
int id5 = coroutine::create(std::bind(func, 5));
assert(id5 != -1);
while (!coroutine::dead(id4) && !coroutine::dead(id5)) {
coroutine::resume(id4);
coroutine::resume(id5);
}
std::cout << "main end" << std::endl;
}
int main()
{
test();
}
后续还要补充一下协程在各语言中的使用
标签:return,++,void,state,杂记,协程,id From: https://www.cnblogs.com/wangerblog/p/18125104