新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
系列博客的大纲
- 处理请求的基本流程
- 尾调用
- 协程调度框架中的三个队列
- wakeup_queue
- error_queue
- fork_queue
- 协程调度
- db服务处理main服务发送过来的请求
--db.lua
local skynet = require "skynet"
require "skynet.manager" -- import skynet.register
local db = {--保存了年龄
zhangsan = 12,
lisi = 33,
wangwu = 4
}
local command = {}
function command.GET(key)
return db[key]
end
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
local f = command[cmd] --这里收到的 cmd 是"GET"
if f then
skynet.ret(skynet.pack(f(...)))--发送响应给main服务
end
end)
end)
snlua服务处理请求的基本流程
大多数 skynet 服务使用 lua 编写。lua服务把消息队列的消息取出后,调用c的回调函数,最终是把消息交给一个指定的lua函数处理。
snlua服务的这个lua函数就是 skynet.dispatch_message
。处理这个消息主要分为两大步 1. raw_dispatch_message 2. 不断的从 fork_queue 队列中把协程取出来做处理。
function skynet.dispatch_message(...)
local succ, err = pcall(raw_dispatch_message,...)
while true do
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1 --head
fork_queue.t = 0 --tail
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
end
assert(succ, tostring(err))
end
在lua服务中会收到请求消息和响应消息。a发送一个请求给b,然后等待b回应。b收到的这个请求,就是请求消息。当a收到b的回应消息时,这个消息就是响应消息。此时我们的db服务收到了一个lua类型的消息。我们先看 raw_dispatch_message
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then --这里是处理响应
--略
else --这里主要是处理lua text socket 等消息类型
local p = proto[prototype]
local f = p.dispatch
if f then
local co = co_create(f) --获取一个协程对象并设置任务函数f
session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同的请求者发过来的session可以是相同的
session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))--next
end
end
end
根据 消息类型 我们的流程是 从 第行6开始。也就是”lua”类型的消息。首先根据消息类型找到对应的协议,然后获取任务处理函数 f。
之后我们获取一个协程,给协程设置这个函数,再唤醒这个协程就算是真正处理我们收到的消息了。当然唤醒协程开始执行前,必须先保存请求者的信息,不然后面处理完请求后,都不知道把结果发送给谁。注意这里是以 co为key保存请求者信息的。为什么不用session的原因是,不同的请求者发送过来的session可能是相同的。因为session是每个服务自己产生的。a服务可以产生一个session号9527,b服务也可以产生一个session号9527。
唤醒协程是通过 coroutine_resume(co, session,source, p.unpack(msg,sz))
。这里的任务函数f是db服务在入口函数里设置的。当db的业务处理完成后,即协程挂起时,就会执行suspend。这个suspend接下来会唤醒其他协程。也就是说当我们lua服务收到一个消息时,就会有一个协程去处理,当这个协程处理后,挂起时,就会给执行权给其他协程。
function suspend(co, result, command)
if command == "SUSPEND" then
return dispatch_wakeup() --next
elseif command == "QUIT" then
coroutine.close(co)
-- service exit
return
end
end
上面的代码 我们主要关注第3行。一般情况下 ,挂起时都会返回 ""SUSPEND"".我们继续看 dispatch_wakeup()
local function dispatch_wakeup()
while true do
local token = tremove(wakeup_queue,1)--从唤醒队列中不断取出协程
if token then
local session = sleep_session[token] -- 从sleep表中查找 注意 这个表和 唤醒队列 具有不同的意义
if session then
local co = session_id_coroutine[session]
session_id_coroutine[session] = "BREAK"
return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
end
else
break
end
end
return dispatch_error_queue()--这里是为了处理这种情况:当前服务在苦苦等待服务x响应,而服务x已经有错误,且已经把错误通知当前服务了
end
当我们调用skynet.sleep或者skynet.wait时,就会把指定协程加入到sleep_session睡眠表中,标识协程是睡眠状态,同时当前协程会挂起。之后在合适的时机,我们调用skynet.wakeup就会把指定的睡眠协程加入到wakeup_queue唤醒队列中。之后这个唤醒队列里面的协程就会被调度,得到执行。
注意把协程加入唤醒队列,不代表马上就唤醒协程执行。
上面的dispatch_wakeup主要的处理过程是这样:
- 如果唤醒队列中有协程,转2;如果没有转 4
- 取出一个,并唤醒执行。
- 唤醒执行挂起后 回到 1
- dispatch_error_queue
看看dispatch_error_queue 代码
local function dispatch_error_queue()
local session = tremove(error_queue,1)
if session then
local co = session_id_coroutine[session]
session_id_coroutine[session] = nil
return suspend(co, coroutine_resume(co, false, nil, nil, session))
end
end
这几个函数的共同特点是最后都会调用suspend。有点像是递归调用。实际上,并不是。这是lua尾调用。去了解lua函数调用 也就是说 调用帧 是不会一直递增的。
这里出现了一个所谓的error_queue 错误队列。比如当我们的协程a发送请求给x服务后,会等待x服务给出响应。如果此时x服务退出或者是处理出现错误,那么就会给我们服务发送一个错误类型的消息。针对错误消息的处理函数会把协程a加入错误队列,等待时机唤醒执行,不然协程a就会一直挂起。看看错误消息的处理函数
REG {
name = "error",
id = skynet.PTYPE_ERROR,
unpack = function(...) return ... end,
dispatch = _error_dispatch,
}
local function _error_dispatch(error_session, error_source)
skynet.ignoreret() -- don't return for error
if error_session == 0 then --收到一个即将下线的服务x的错误消息
-- error_source is down, clear unreponse set
for session, srv in pairs(watching_session) do --原本发出请求给服务x,等待x响应的协程需要另外处理了
if srv == error_source then
tinsert(error_queue, session)
end
end
else
-- capture an error for error_session
if watching_session[error_session] then
tinsert(error_queue, error_session)
end
end
end
错误类型消息处理函数 主要是把之前等待的协程加入错误队列。关于错误类型处理,暂时讲到这里。
skynet.dispatch_message第一步已经讲完,还有第二步是处理 fork_queue .
function skynet.dispatch_message(...)
local succ, err = pcall(raw_dispatch_message,...)
while true do --从这里开始处理fork_queue
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1 --head
fork_queue.t = 0 --tail
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
end
assert(succ, tostring(err))
end
上面的代码主要是从 fork_queue 队列中不断的取出协程,然后依旧是调用suspend来处理。fork_queue 里面的协程是怎么来的?一般当我们在业务层需要协程的时候,我们就会调用skynet.fork(func,...)来创建一个。看代码
function skynet.fork(func,...)
local n = select("#", ...)
local co
if n == 0 then
co = co_create(func)
else
local args = { ... }
co = co_create(function() func(table.unpack(args,1,n)) end)
end
local t = fork_queue.t + 1 --尾部递增
fork_queue.t = t
fork_queue[t] = co --push到队列的尾部
return co
end
实际上,内部也是调用co_create()创建的协程。这里总结下分发流程图。
db服务返回响应给main服务
我们最后看db服务是如何在处理完业务后,也就是拿到年龄信息后,怎么把信息发送给main服务的。回顾
--db.lua
local skynet = require "skynet"
require "skynet.manager" -- import skynet.register
local db = {--保存了年龄
zhangsan = 12,
lisi = 33,
wangwu = 4
}
local command = {}
function command.GET(key)
return db[key]
end
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
local f = command[cmd] --这里收到的 cmd 是"GET"
if f then
skynet.ret(skynet.pack(f(...)))--发送响应给main服务
end
end)
end)
上面的代码很容易找到 zhangsan的年龄是 12. 我们主要看 skynet.ret是怎么把返回数据发送给 main服务的。
function skynet.ret(msg, sz)
msg = msg or ""
local co_session = session_coroutine_id[running_thread] --找到session
session_coroutine_id[running_thread] = nil
if co_session == 0 then
return false -- send don't need ret
end
local co_address = session_coroutine_address[running_thread] --找到请求者地址
local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz) --注意消息类型是 skynet.PTYPE_RESPONSE
if ret then
return true
end
end
还记得吗,当我们db收到请求时,就把请求者的信息保存下来了。而且是以 当前协程为key,请求者信息为value保存的。现在把信息取出来,通过c.send发送给给main服务。
标签:end,请求,处理,db,queue,--,session,skynet,local From: https://www.cnblogs.com/waittingforyou/p/18037602