skynet是一个轻量级的游戏服务器框架。
skynet的核心是服务,服务之间通过消息来通信,消息的来源主要有:
- 定时器
- 网络
- 服务之间的调用(
skynet.send
或skynet.call
)
skynet.send和skynet.call
假设我们有两个服务A和B,A发了两条消息给B:
这里skynet.send
和skynet.call
的主要区别,在于call
会阻塞,等待消息的返回值,而send
将消息发送出去之后,就继续执行后续的指令。那么这里skynet.call
之后,是怎么获取这个返回值的呢?我们来看看代码。
skynet.send的代码比较简单:
function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0, p.pack(...))
end
根据类型,将数据打包,然后调用底层的c.send
将消息发送给目标地址。
再来看看skynet.call
的代码:
function skynet.call(addr, typename, ...)
--调试相关代码
--...
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
这里我们看到,skynet.call
和skynet.send
在发送数据时,调用的底层函数是一样的,都是c.send,区别在于参数不同:
- skynet.send调用c.send时,第三个参数是0,表示不用分配会话(session)
- skynet.call调用c.send时,第三个参数是nil,表示需要分配会话ID(session)
这里的session,是系统一个自增的ID,每次分配时增加1,相当于给这一次的call分配一个唯一ID。
最后,skynet.call
的返回是p.unpack(yield_call(addr, session))
p.unpack
是解包数据,而yield_call
,看名字就知道,是一个挂起的调用:
local function yield_call(service, session)
watching_session[session] = service
session_id_coroutine[session] = running_thread
local succ, msg, sz = coroutine_yield "SUSPEND"
watching_session[session] = nil
if not succ then
error "call failed"
end
return msg,sz
end
这里,用到了刚刚分配的session
,记录了session
对应的服务地址和执行协程,然后,调用coroutine_yield
将线程挂起,参数是"SUSPEND",等到目标服务返回结果后,才重新回到这个协程。
处理消息并返回
服务A调用skynet.call
发送消息给服务B之后,A的协程挂起了,收到消息的服务B,是怎么处理这个消息,并返回给服务A的呢?
在skynet
的体系中,每个服务都有一个消息处理函数。对于skynet的lua服务,在启动时,skynet.start
的第一行代码,就是设置lua
层面的回调函数:
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
--...其他代码
end
而skynet.dispatch_message
中的第一句,则是以pcall的方式调用raw_dispatch_message
,这个函数一共有5个参数:
- ptototype: 消息类型
- msg: 消息体
- sz:消息长度
- session:会话ID,使用send的话,则是0
- source:消息来源的服务地址
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
--...处理响应消息
else
local p = proto[prototype]
if p == nil then
--...错误处理
end
local f = p.dispatch
if f then
local co = co_create(f) -- 取得一个协程
session_coroutine_id[co] = session -- 并关联协程和会话
session_coroutine_address[co] = source -- 以及来源
--... trace调试相关代码
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
--...错误处理
end
end
end
关键看这一句:suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
从里向外,有三个函数调用:
- p.unpack(msg, sz):根据消息类型预设好的unpack函数,来解析消息,返回解析后的参数。
- coroutine_resume(co, session, source, ...):执行协程,协程参数为session,source,以及解析后的参数。这里实际上就是执行到
skynet.dispatch
中设置的消息处理函数(上面示例代码中,serverB
的函数f
)。 - suspend(co, ...):处理完一条消息,挂起后的一些处理。
skynet.call的返回
从上面的消息处理来看,并没有对skynet.call
做特别的处理,实际上,对于skynet.call
的消息,我们必须手动调用skynet.retpack
来返回数据。
通常,在消息处理函数中,我们可以通过session,来判断要不要使用skynet.retpack:
if session > 0 then
skynet.retpack(func(...))
else
func(...)
end
skynet.retpack实际上是对skynet.ret的调用:
- 前面收到消息时,记录了当前协程对应的session,这里取出session。
- 如果session等于0,表示是
send
的消息,不需要返回。 - 前面收到消息时,还记录当前协程对应的消息来源,这里,给来源地址
source
发送一个PTYPE_RESPONSE
类型的消息,成功将数据返回。
上面这些返回的操作,是在服务B中,而在服务A中,就收到了一个PTYPE_RESPONSE
消息。此时前面发送skynet.call
时的协程co
还处于挂起的状态。
前面讲到raw_dispatch_message
的时候,略过了PTYPE_RESPONSE
的处理,现在再来看一下:
- 通过session取得处理协程,在skynet.call => yield_call中,挂起之前,记录的session对应哪个协程,这里取回挂起的协程。
RESPONSE
并不只是skynet.ret才会用到,还有可能是skynet.timeout的定时时间到了,也会发送RESPONSE
,这时co是一个字符串"BREAK"- 收到一个未知的response的处理。
- 正常的
skynet.call
在这里获得返回值,这里的coroutine_resume,执行co协程,就是回到前面的yield_call
- 挂起的协程
co
恢复执行后,接收succ
,msg
,sz
参数,最终yield_call
返回的是msg
和sz
。 - 将
yield_call
的返回值,通过unpack
解析之后,最终返回给调用者。至此,skynet.call
终于取到了返回值。
Maybe forgot response session ... from ...
假设消息B在收到一个skynet.call
的消息后,没有调用skynet.ret
返回,那么会输出一个报错:Maybe forgot response session ... from ...
,skynet
系统是怎么知道没有返回的呢?
前面在讲到消息处理raw_dispatch_message
函数中,有一个步骤是从协程池中获取一个协程,并调用设置好的dispatch
函数(示例中serviceB
的函数f
),实际上,这里并不是直接调用f
,而是加了一层封装,我们来看看co_create
的代码:
- 从池子里取出一条协程。
- 池子里没有协程时,创建协程。
- 协程的主函数,首先执行
f
(即传入的dispatch
函数)。 - 执行完成之后,判断当前协程是否记录着session,当调用
skynet.ret
时,会清掉这个session。如果此时的session
不等于0,就表示收到一个call
之后没有使用skynet.ret
返回,就在这里报个错。 - 清理数据。
- 将当前协程放入池子里,等待循环使用。
- 将协程挂起。
- 下一将调用
co_create
时,如果能从池子里找到co
,则在这里开始执行协程,传入f
,继续执行。
延迟返回
一般情况下,在处理call消息的协程中,我们必须调用skynet.retpack
来返回数据,否则的话,会报错误Maybe forgot response
。
但有些情况下,我们希望在其他协程中返回数据(例如skynet.newservice 简介:服务的启动讲到的launch),这时候,我们可以使用skynet.response
来生成一个响应函数。
function skynet.response(pack)
pack = pack or skynet.pack
local co_session = assert(session_coroutine_id[running_thread], "no session")
session_coroutine_id[running_thread] = nil
local co_address = session_coroutine_address[running_thread]
if co_session == 0 then
-- do not response when session == 0 (send)
return function() end
end
local function response(ok, ...)
if ok == "TEST" then
return unresponse[response] ~= nil
end
if not pack then
error "Can't response more than once"
end
local ret
if unresponse[response] then
if ok then
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, pack(...))
if ret == false then
-- If the package is too large, returns false. so we should report error back
c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
end
else
ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
end
unresponse[response] = nil
ret = ret ~= nil
else
ret = false
end
pack = nil
return ret
end
unresponse[response] = co_address
return response
end
这里实际上就是把返回需要用到的session
和source
用作一个函数的upValue
,并返回这个函数,同时,清除session_coroutine_id
中当前co
对应的session
,这样就不会触发到Maybe forgot response
的警告了。