服务间请求和响应
目录这节会主要是介绍在lua服务中使用skynet.call
函数
- snlua基本启动过程
- skynet.pack打包过程
- 协程基础知识
- main发送请求
- db处理请求
- main处理db返回的响应
snlua服务启动的基本知识
前面介绍过logger服务。也说起过snlua服务。其实 snlua服务 就说我们skynet常说的lua服务。这个 lua服务 跟 logger服务 的工作模式在c层看来都是一样的,即都有一个队列,然后不断的从队列中取出消息,然后处理掉消息。当然他们属于不同模块
现在而言,lua服务你可以认为跟logger服务最大的区别是:
- 取出消息->lua服务回调函数->skynet.dispatch_message
- 取出消息->logger服务回调函数
我们之前创建一个logger服务是在底层调用 skynet_context_new(const char * name, const char *param)
,name就是模块名字logger,param我们默认是NULL。那么实际上我们创建一个lua服务,也是调用这个函数,name就是snlua,param就是我们的一个lua文件名.
但实际上我们的服务创建是通过 skynet.newservice创建的。实际这个函数最终一样会调用底层的skynet_context_new来创建服务。我们skynet进程启动后,就会启动一个bootstrap服务,bootstrap服务就会调用skynet.newservice("main")创建main服务。那么我们怎么就执行到这个main服务配置lua代码来的呢?
--这是main服务指定的lua文件
--main.lua
local skynet = require "skynet"
skynet.start(function()--这个匿名函数可以认为是lua服务的入口函数
local db = skynet.newservice("db")--启动一个db服务
local key = "zhangsan"
local age = skynet.call(db, "lua", "GET",key) --发送 lua类型 的请求给db服务,然后等待对方回应
skynet.exit()
end)
--这是db服务指定的lua文件
--db.lua
local skynet = require "skynet"
require "skynet.manager" -- import skynet.register
local db = {--保存了年龄
zhangsan = 12,
lisi = 33,
wangwu = 4
}
local command = {}
function command.GET(key)
return db[key]
end
skynet.start(function() --skynet.start注册一个匿名的入口函数
skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
local f = command[cmd] --这里收到的 cmd 是"GET" 参数是 "zhangsan"
if f then
skynet.ret(skynet.pack(f(...)))--发送响应给main服务
end
end)
end)
这里我们先启动了一个main服务,然后在main服务里面启动了一个db服务。之后在main服务中向db服务发送了一个请求。请求的目的是 希望db服务告诉我们 zhangsan
的年龄。
skynet.start(main)
就是在注册一个初始化函数。我们在第一回就说过,可以往服务队列里面push定时器消息。实际上,我们lua服务调用skynet.start时会主动要求push一个定时器消息到自己的队列,定时器消息最终的处理会调用我们的这个初始化函数。所以你可以认为main就是一个服务的入口函数。
既然skynet.start
负责注册main函数,那skynet.start是怎么被调用的呢?
实际上skynet在底层启动一个lua服务的过程中,一定会先push一个消息到服务自己的队列中。我们说过,一个lua服务处理消息的过程就是调用服务的回调函数,然后在回调函数内部又调用一个lua函数。特例是,我们处理第一个消息时,我们直接在回调函数执行我们服务指定的lua文件即可。比如这里就是main.lua文件。那么在执行lua文件时,就会调用到 skynet.start
发送请求
上面的代码大致看来应该是可以理解的。那么具体 skynet.call是如何发送请求的呢。
function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"
local p = proto[typename]--根据消息类型获取对应的协议
local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil
return p.unpack(yield_call(addr, session))
end
我们发送消息时要指明消息类型。不同的消息类型对应着不同的协议。比如我们的请求发送出去一般都是要先打包的,不同的消息类型,对应的协议里面有自己的打包函数。lua协议类型的打包函数 skynet.pack主要做的事情是把一堆lua参数转变成一个 指针+长度。
具体可以查看skynet的pack函数 。
一般情况下,有几个协议默认就定义好了
----- register protocol
do
local REG = skynet.register_protocol
REG {
name = "lua",--协议类型名字
id = skynet.PTYPE_LUA,--协议id
pack = skynet.pack,--打包
unpack = skynet.unpack,--解包
}
REG {
name = "response",
id = skynet.PTYPE_RESPONSE,
}
REG {
name = "error",
id = skynet.PTYPE_ERROR,
unpack = function(...) return ... end,
dispatch = _error_dispatch,
}
end
发送消息除了指明消息类型,还需要指明 目标服务,以及session。
这里session是做什么用呢?先看个简单场景。你跑到肯德基,跟服务员说点一个 清蒸鲈鱼,服务员说ok,然后给你开了一个订单,流水号是 9527。之后你找个了位置坐下来,刷手机等待。大概过了十分钟,服务员喊 9527 您可以取餐了。此时你就过去把订单给服务员,服务员确认是9527后,把清蒸鲈鱼交给你了,并祝你用餐愉快。
这里的流水号9527就是session。主要是用来匹配请求和响应用的。
你可以认为skynet的每个lua服务内部都有一个产生session号的机器。当然唯一性只能在服务内提供保证。假设a服务往b服务发送了两个请求requst1, request2,现在a服务收到了一个响应,那么这个响应,是响应哪个请求的呢?实际a服务上发送request1 的时候附带了一个session1 ,b服务给出响应的时候也附带了一个session指明了是响应哪个请求的。
再看c.send函数。这个函数主要是把请求push到目标服务的队列。
LUAMOD_API int
luaopen_skynet_core(lua_State *L) {
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend },//next
};
}
/*
uint32 address
string address
integer type
integer session
string message
lightuserdata message_ptr
integer len
*/
static int
lsend(lua_State *L) {
return send_message(L, 0, 2);//next
}
注意传递的参数依次是 address type session message_ptr len 下面看看c层是怎么获取lua传递过来的参数的。
此时传入的 type是 "lua" ; session 是 nil
static int
send_message(lua_State *L, int source, int idx_type) {//idx_type表示type在传入参数队列中的位置
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));//获取代表当前所在服务
uint32_t dest = (uint32_t)lua_tointeger(L, 1);//获取目标服务地址
const char * dest_string = NULL;
int type = luaL_checkinteger(L, idx_type+0);//根据 指定位置 找到lua层传递的参数
int session = 0;
if (lua_isnil(L,idx_type+1)) {//session如果是nil 表示lua层希望分配一个session
type |= PTYPE_TAG_ALLOCSESSION;
}
int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TLIGHTUSERDATA: {//skynet.call的调用是走这里
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
//注意这里给 type 还加上了一个 PTYPE_TAG_DONTCOPY 标记 这里source是0
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);//next
break;
}
lua_pushinteger(L,session);
return 1;
}
上面的代码给type打了两个标记,PTYPE_TAG_ALLOCSESSION
和 PTYPE_TAG_DONTCOPY
。PTYPE_TAG_ALLOCSESSION
表示希望服务分配一个session,PTYPE_TAG_DONTCOPY
表示是否需要分配新内存。具体往下看skynet_send
static void
_filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
int needcopy = !(type & PTYPE_TAG_DONTCOPY);//检查type里面有没有 PTYPE_TAG_DONTCOPY标签
int allocsession = type & PTYPE_TAG_ALLOCSESSION;//检查type里面有没有 PTYPE_TAG_ALLOCSESSION标记
type &= 0xff;//只保留低八位 丢弃临时添加的 PTYPE_TAG_DONTCOPY PTYPE_TAG_ALLOCSESSION 这些标记
if (allocsession) {
assert(*session == 0);
*session = skynet_context_newsession(context);//需要分配session的时候 context不能为NULL
}
if (needcopy && *data) {//是否需要分配新内存 把data数据重新拷贝一份
char * msg = skynet_malloc(*sz+1);
memcpy(msg, *data, *sz);
msg[*sz] = '\0';
*data = msg;
}
*sz |= (size_t)type << MESSAGE_TYPE_SHIFT;//把type放置到sz的高八位上
}
int//next
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {//sz是当前系统最大无符号数 sz的高8位是给type用 此时高8位必须是全0
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -2;
}
_filter_args(context, type, &session, (void **)&data, &sz);//next
if (source == 0) {//当前是0
source = context->handle;//当前服务的handle
}
struct skynet_message smsg;
smsg.source = source;//当前服务handle
smsg.session = session;//新生成的session
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {//把消息push到目标服务的队列
skynet_free(data);
return -1;
}
return session;//最后返回session给lua层
}
sz的高八位是预留出来储存 消息类型的。_filter_args 做三件事情
- 获取session
- 是否需要分配新的内存。
- 给type打上 消息类型 的标记,并设置到sz的高八位。
现在回到lua层skynet.call的定义
function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"
local p = proto[typename]--根据消息类型获取对应的协议
local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil
return p.unpack(yield_call(addr, session))
end
此时session返回了。消息也已经push到目标服务了。接下来就是挂起当前协程,等待响应。
关于协程的基本知识,可以看这里了解协程 。
继续看代码yield_call(addr, session)
实际上 skynet.call是在某个协程中被调用的。所以才有我们说的当前协程。一般来说,业务代码都是在某个协程中执行的。框架代码就是调度各个协程。类似我们的操作系统,进程的使用者一般是不关心进程是怎么被操作系统具体调度的。
local function yield_call(service, session)
session_id_coroutine[session] = running_thread --通过session作为key保存当前协程
local succ, msg, sz = coroutine_yield "SUSPEND" --这里表示挂起当前协程
watching_session[session] = nil
return msg,sz
end
在挂起协程前,通过session记住了当前协程x。等响应消息到来时,通过响应消息里面的session可以找到当前协程x,然后唤醒这个协程,让他继续执行。
话分两头,此时db服务开始处理main服务发送的请求了。db服务处理请求
收到响应
大概db服务已经处理完了。现在收到db发送过来的响应消息了。我们看看当前是怎么唤醒挂起的协程x的。lua服务会把队列里面的消息转交给lua层的一个函数 skynet.dispatch_message
处理。我们知道它的内部先是调用raw_dispatch_message处理。这里是处理响应消息
每个snlua服务都会调用skynet.start(start_func)函数注册启动函数。skynet.start在内部都会向c层注册一个lua层的回调函数。lua服务会把消息队列里面的消息处理最终交给这个回调函数处理。这个回调函数就是 skynet.dispatch_message
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then --这里是处理响应
local co = session_id_coroutine[session] --响应消息的处理都是 通过seesion去得到协程 然后执行协程;因为session是本服务分配的 所以具有唯一性
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz, session))--next
end
end
也就是说我们会根据响应消息的 session找到协程x,唤醒协程x继续运行。再看一次 skynet.call
function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"
local p = proto[typename]--根据消息类型获取对应的协议
local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil
return p.unpack(yield_call(addr, session))
end
此时 第6行 yield_call返回了,同时skynet.call也接着返回了想要的年龄信息。
标签:服务,请求,lua,--,响应,session,skynet,type From: https://www.cnblogs.com/waittingforyou/p/16966095.html