首页 > 编程语言 >Swoole 源码分析之 Channel 通道模块

Swoole 源码分析之 Channel 通道模块

时间:2024-04-04 16:22:06浏览次数:24  
标签:co Swoole swoole 源码 channel msg 协程 Channel

原文首发链接:Swoole 源码分析之 Channel 通道模块
大家好,我是码农先森。

引言

通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗。

底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗 channel 基于引用计数实现,是零拷贝的。

源码拆解

Channel 通道需要在协程环境中使用,我们先看下面这段代码,使用 new Channel(1) 创建一个 channel 对象,然后在第一个协程中向通道中推送数据,在第二个协程获取到通道内的数据进行消费。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

run(function(){
    // 创建 channel 通道对象
    $channel = new Channel(1);
    Coroutine::create(function () use ($channel) {
        for($i = 0; $i < 10; $i++) {
            Coroutine::sleep(1.0);
            // 向通道内推送数据
            $channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
            echo "{$i}\n";
        }
    });
    Coroutine::create(function () use ($channel) {
        while(1) {
            // 从通道中获取数据
            $data = $channel->pop(2.0);
            if ($data) {
                var_dump($data);
            } else {
                assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
                break;
            }
        }
    });
});

在分析源代码之前,我们可以提前看一下源码整体的调用逻辑图,以便我们有个大致的印象。

这段代码主要是在 Swoole 的协程环境中创建 Channel 对象并初始化其容量的逻辑。

// swoole-src/ext-src/swoole-channel.cc:132
static PHP_METHOD(swoole_channel_coro, __construct) {
    zend_long capacity = 1;
	
	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_LONG(capacity)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (capacity <= 0) {
        capacity = 1;
    }

	// 当前对象对应的 ChannelObject 结构体指针
    ChannelObject *chan_t = php_swoole_channel_coro_fetch_object(Z_OBJ_P(ZEND_THIS));
    // 为该通道对象分配新的 Channel 实例,并设置其容量为传入的值。
    chan_t->chan = new Channel(capacity);
    zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("capacity"), capacity);
}

这段代码主要是在 Swoole 的协程环境中向通道中推送数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:149
static PHP_METHOD(swoole_channel_coro, push) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    zval *zdata;
    double timeout = -1;

	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 2)
    Z_PARAM_ZVAL(zdata)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    Z_TRY_ADDREF_P(zdata);
    zdata = sw_zval_dup(zdata);
    // 向通道中推入数据
    if (chan->push(zdata, timeout)) {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
        RETURN_TRUE;
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        Z_TRY_DELREF_P(zdata);
        efree(zdata);
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:105
bool Channel::push(void *data, double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭
    if (closed) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return false;
    }
    // 如果通道已满或生产者队列不为空,则设置超时消息,并根据传入的超时值添加定时器,等待生产者。
    if (is_full() || !producer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = PRODUCER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起生产者协程
        yield(PRODUCER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
	// 将数据压入数据队列。
    data_queue.push(data);
    swoole_trace_log(SW_TRACE_CHANNEL, "push data to channel, count=%ld", length());
    
    // 如果消费者队列不为空,则唤醒消费者协程。
    if (!consumer_queue.empty()) {
        Coroutine *co = pop_coroutine(CONSUMER);
        // 恢复消费者协程
        co->resume();
    }
    return true;
}

这段代码主要是在 Swoole 的协程环境中从通道中取出数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:175
static PHP_METHOD(swoole_channel_coro, pop) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    // 设置超时变量为-1
    double timeout = -1;
	
	// 解析一个超时参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

	// 从通道中取出数据,并返回一个 zval 指针
    zval *zdata = (zval *) chan->pop(timeout);
    // 如果返回的 zval 指针不为空
    if (zdata) {
    	// 将其返回给 PHP 脚本,并释放内存
        RETVAL_ZVAL(zdata, 0, 0);
        efree(zdata);
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:55
void *Channel::pop(double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭且为空
    if (closed && is_empty()) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return nullptr;
    }
    // 如果通道为空或者消费者队列不为空
    if (is_empty() || !consumer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = CONSUMER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起消费者协程
        yield(CONSUMER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
    // 从数据队列中弹出数据,并返回该数据。
    void *data = data_queue.front();
    data_queue.pop();

    // 如果生产者队列不为空,则唤醒生产者协程
    if (!producer_queue.empty()) {
        Coroutine *co = pop_coroutine(PRODUCER);
        // 恢复到生产者协程
        co->resume();
    }
    return data;
}

这段代码一是针对超时回调处理的处理逻辑,并恢复相关的协程操作。二是实现了协程的挂起操作,并根据不同的类型将当前协程放入不同的队列中,以便后续根据需要恢复执行。

// swoole-src/coroutine/channel.cc:22
void Channel::timer_callback(Timer *timer, TimerNode *tnode) {
    TimeoutMessage *msg = (TimeoutMessage *) tnode->data;
    msg->error = true;
    msg->timer = nullptr;
    if (msg->type == CONSUMER) {
    	// 从消费者队列中移除该协程
        msg->chan->consumer_remove(msg->co);
    } else {
    	// 从生产者队列中移除该协程
        msg->chan->producer_remove(msg->co);
    }
    // 恢复协程
    msg->co->resume();
}

// swoole-src/coroutine/channel.cc:34
void Channel::yield(enum Opcode type) {
	// 获取当前协程
    Coroutine *co = Coroutine::get_current_safe();
    if (type == PRODUCER) {
    	// 将当前协程放入到生产者队列
        producer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "producer cid=%ld", co->get_cid());
    } else {
    	// 将当前协程放入到消费者队列
        consumer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "consumer cid=%ld", co->get_cid());
    }
    
    // 挂起被取消,则调用该函数
    Coroutine::CancelFunc cancel_fn = [this, type](Coroutine *co) {
        if (type == CONSUMER) {
            consumer_remove(co);
        } else {
            producer_remove(co);
        }
        co->resume();
        return true;
    };

    // 挂起当前协程
    co->yield(&cancel_fn);
}

总结

  1. Channel 通道需要在协程的环境中进行使用,通道是纯内存操作,没有 IO 消耗,非常高效。
  2. 底层使用 Channel::yield 函数实现了协程的自动切换和调度,如果通道处理超时则会自动调用 Channel::timer_callback 函数。
  3. Channel 通道是跨协程直接通信的一大利器,在实际的场景中使用起来十分的便利、高效。

标签:co,Swoole,swoole,源码,channel,msg,协程,Channel
From: https://www.cnblogs.com/yxhblogs/p/18090624

相关文章

  • 最新社交相亲系统源码PHP
    最新社交相亲系统源码PHP安装环境:php7.2mysql5.7框架:后端thinkphp6前端:jquerylayuiPC移动端响应式线上案例:https://cjr.oemsun.com/主要页面及功能预览首页相亲资料详情页红娘跟进记录海报、一键复制分享img条件搜索红线互换微信动态发......
  • 约跑小程序源码(asp.net+vue+element++uniapp+sqlserver)
    开发语言:c#框架:后端asp.netmvcpc管理页面:vue+element数据库:sqlserver开发软件:eclipse/myeclipse/idea浏览器:谷歌浏览器小程序框架:uniapp小程序开发软件:HBuilderX一、前言介绍   约跑小程序是一个小程序。用户跑步可以用宜刻小程序进行记录打卡,也可以选择和其他......
  • 【全开源】任务悬赏源码+接单系统+淘客(APP+H5+公众号)_博纳软云
    任务悬赏源码+接单系统+淘客(APP+H5+公众号):一站式解决方案,开启您的赚钱新纪元在当今这个数字化、信息化的时代,如何快速、高效地获取收益,成为众多用户关注的焦点。为此,我们隆重推出“任务悬赏源码+接单系统+淘客(APP+H5+公众号)”一站式解决方案,让您轻松开启赚钱新纪元。我们的任......
  • 【全开源】外卖微信小程序京东拼多多外卖cps|外卖红包优惠券源码美团饿了么红包_博纳
    外卖微信小程序京东拼多多外卖cps|外卖红包优惠券源码美团饿了么红包,这一串词汇,实际上代表了当今外卖市场的多元化发展和技术创新的融合。随着移动互联网的普及,外卖小程序已经成为人们日常生活中不可或缺的一部分。京东、拼多多等电商巨头也纷纷涉足外卖领域,通过cps模式与各大......
  • 【全开源】外卖霸王餐免费吃外卖小程序+APP+公众号+H5多端霸王餐源码_博纳软云
    外卖霸王餐免费吃外卖小程序+APP+公众号+H5多端霸王餐源码:畅享美食新体验在这个快节奏的时代,外卖已经成为人们生活中不可或缺的一部分。为了满足广大食客对于美食的渴望,我们隆重推出“外卖霸王餐免费吃外卖小程序+APP+公众号+H5多端霸王餐源码”,让您畅享美食新体验。这款源码......
  • 【全开源】洗衣清洁服务同城清洗服务小程序源码_博纳软云
    洗衣清洁服务,同城清洗服务小程序源码,为现代生活带来便捷与舒适。在这个繁忙的时代,人们往往因工作、学习等原因,难以抽出时间处理家务琐事,尤其是洗衣清洁这样的日常任务。而有了这款同城清洗服务小程序,一切变得简单而高效。这款小程序源码,集成了先进的技术与智能算法,为用户提供......
  • 【全开源】知识付费教育付费资源付费平台公众号小程序源码_博纳软云
    知识付费教育付费资源付费平台,以公众号与小程序为媒介,承载着无尽的知识与智慧,让知识的价值得以彰显。在这个信息化时代,我们深知知识的力量,因此,我们倾力打造了这款集公众号与小程序于一体的知识付费平台,让知识的获取与传播变得更加高效与便捷。这个平台汇聚了海量的教育资源,无......
  • 【全开源】洗衣清洁服务同城清洗服务小程序源码_博纳软云
    洗衣清洁服务,同城清洗服务小程序源码,为现代生活带来便捷与舒适。在这个繁忙的时代,人们往往因工作、学习等原因,难以抽出时间处理家务琐事,尤其是洗衣清洁这样的日常任务。而有了这款同城清洗服务小程序,一切变得简单而高效。这款小程序源码,集成了先进的技术与智能算法,为用户提供......
  • 【全开源】运营版JAVA源码单商户PC源码小程序公众号APP源码_博纳软云
    运营版JAVA源码、单商户PC源码、小程序与公众号APP源码,这一系列的源码技术,仿佛一座座桥梁,连接着商家与消费者的心灵。它们以高效的性能、稳定的表现和灵活的扩展性,助力企业打造个性化的运营平台,实现商业价值的最大化。JAVA源码,作为运营版的核心,以其强大的跨平台性和安全性,为企......
  • 【全开源】多商户运营版JAVA商城小程序源码-地摊兄源码多商户源码社交电商源码_博纳软
    多商户运营版JAVA商城小程序源码-地摊兄源码多商户源码社交电商源码:开启智慧零售新篇章在数字化浪潮的推动下,智慧零售正成为新时代商业发展的重要方向。为满足广大商户的需求,我们隆重推出多商户运营版JAVA商城小程序源码——地摊兄源码,以及多商户源码和社交电商源码,助您轻松开......