首页 > 其他分享 >服务间请求和响应

服务间请求和响应

时间:2022-12-08 15:01:41浏览次数:47  
标签:服务 请求 lua -- 响应 session skynet type

服务间请求和响应

目录

这节会主要是介绍在lua服务中使用skynet.call函数

  • snlua基本启动过程
  • skynet.pack打包过程
  • 协程基础知识
  • main发送请求
  • db处理请求
  • main处理db返回的响应

snlua服务启动的基本知识

前面介绍过logger服务。也说起过snlua服务。其实 snlua服务 就说我们skynet常说的lua服务。这个 lua服务 跟 logger服务 的工作模式在c层看来都是一样的,即都有一个队列,然后不断的从队列中取出消息,然后处理掉消息。当然他们属于不同模块

现在而言,lua服务你可以认为跟logger服务最大的区别是:

  • 取出消息->lua服务回调函数->skynet.dispatch_message
  • 取出消息->logger服务回调函数

我们之前创建一个logger服务是在底层调用 skynet_context_new(const char * name, const char *param),name就是模块名字logger,param我们默认是NULL。那么实际上我们创建一个lua服务,也是调用这个函数,name就是snlua,param就是我们的一个lua文件名.

但实际上我们的服务创建是通过 skynet.newservice创建的。实际这个函数最终一样会调用底层的skynet_context_new来创建服务。我们skynet进程启动后,就会启动一个bootstrap服务,bootstrap服务就会调用skynet.newservice("main")创建main服务。那么我们怎么就执行到这个main服务配置lua代码来的呢?

image-20220831154537139

--这是main服务指定的lua文件

--main.lua
local skynet = require "skynet"

skynet.start(function()--这个匿名函数可以认为是lua服务的入口函数 
	
	local db = skynet.newservice("db")--启动一个db服务

	local key = "zhangsan"
	 
	local age = skynet.call(db, "lua", "GET",key) --发送 lua类型 的请求给db服务,然后等待对方回应

	skynet.exit()
	
end)
--这是db服务指定的lua文件
--db.lua
local skynet = require "skynet"
require "skynet.manager"	-- import skynet.register


local db = {--保存了年龄
    zhangsan = 12,
    lisi = 33,
    wangwu = 4
}

local command = {}

function command.GET(key)
	return db[key]
end


skynet.start(function() --skynet.start注册一个匿名的入口函数
	
	skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
		
		local f = command[cmd] --这里收到的 cmd 是"GET" 参数是 "zhangsan"
		if f then
			skynet.ret(skynet.pack(f(...)))--发送响应给main服务
		end
	end)

end)

这里我们先启动了一个main服务,然后在main服务里面启动了一个db服务。之后在main服务中向db服务发送了一个请求。请求的目的是 希望db服务告诉我们 zhangsan 的年龄。

skynet.start(main) 就是在注册一个初始化函数。我们在第一回就说过,可以往服务队列里面push定时器消息。实际上,我们lua服务调用skynet.start时会主动要求push一个定时器消息到自己的队列,定时器消息最终的处理会调用我们的这个初始化函数。所以你可以认为main就是一个服务的入口函数。

既然skynet.start负责注册main函数,那skynet.start是怎么被调用的呢?

实际上skynet在底层启动一个lua服务的过程中,一定会先push一个消息到服务自己的队列中。我们说过,一个lua服务处理消息的过程就是调用服务的回调函数,然后在回调函数内部又调用一个lua函数。特例是,我们处理第一个消息时,我们直接在回调函数执行我们服务指定的lua文件即可。比如这里就是main.lua文件。那么在执行lua文件时,就会调用到 skynet.start

发送请求

上面的代码大致看来应该是可以理解的。那么具体 skynet.call是如何发送请求的呢。

function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"

	local p = proto[typename]--根据消息类型获取对应的协议
	local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil 

	return p.unpack(yield_call(addr, session))
end

我们发送消息时要指明消息类型。不同的消息类型对应着不同的协议。比如我们的请求发送出去一般都是要先打包的,不同的消息类型,对应的协议里面有自己的打包函数。lua协议类型的打包函数 skynet.pack主要做的事情是把一堆lua参数转变成一个 指针+长度

具体可以查看skynet的pack函数

一般情况下,有几个协议默认就定义好了

----- register protocol
do
	local REG = skynet.register_protocol

	REG {
		name = "lua",--协议类型名字
		id = skynet.PTYPE_LUA,--协议id
		pack = skynet.pack,--打包
		unpack = skynet.unpack,--解包
	}

	REG {
		name = "response",
		id = skynet.PTYPE_RESPONSE,
	}

	REG {
		name = "error",
		id = skynet.PTYPE_ERROR,
		unpack = function(...) return ... end,
		dispatch = _error_dispatch,
	}
end

发送消息除了指明消息类型,还需要指明 目标服务,以及session。

这里session是做什么用呢?先看个简单场景。你跑到肯德基,跟服务员说点一个 清蒸鲈鱼,服务员说ok,然后给你开了一个订单,流水号是 9527。之后你找个了位置坐下来,刷手机等待。大概过了十分钟,服务员喊 9527 您可以取餐了。此时你就过去把订单给服务员,服务员确认是9527后,把清蒸鲈鱼交给你了,并祝你用餐愉快。

这里的流水号9527就是session。主要是用来匹配请求和响应用的。

你可以认为skynet的每个lua服务内部都有一个产生session号的机器。当然唯一性只能在服务内提供保证。假设a服务往b服务发送了两个请求requst1, request2,现在a服务收到了一个响应,那么这个响应,是响应哪个请求的呢?实际a服务上发送request1 的时候附带了一个session1 ,b服务给出响应的时候也附带了一个session指明了是响应哪个请求的。

再看c.send函数。这个函数主要是把请求push到目标服务的队列。

LUAMOD_API int
luaopen_skynet_core(lua_State *L) {
	luaL_checkversion(L);

	luaL_Reg l[] = {
		{ "send" , lsend },//next
	
	};
}
/*
	uint32 address 
	 string address
	integer type
	integer session
	string message
	 lightuserdata message_ptr
	 integer len
 */
static int
lsend(lua_State *L) {
	return send_message(L, 0, 2);//next
}

注意传递的参数依次是 address type session message_ptr len 下面看看c层是怎么获取lua传递过来的参数的。

此时传入的 type是 "lua" ; session 是 nil

static int
send_message(lua_State *L, int source, int idx_type) {//idx_type表示type在传入参数队列中的位置
	struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));//获取代表当前所在服务
	uint32_t dest = (uint32_t)lua_tointeger(L, 1);//获取目标服务地址
	const char * dest_string = NULL;


	int type = luaL_checkinteger(L, idx_type+0);//根据 指定位置 找到lua层传递的参数
	int session = 0;
	if (lua_isnil(L,idx_type+1)) {//session如果是nil 表示lua层希望分配一个session 
		type |= PTYPE_TAG_ALLOCSESSION;
	}

	int mtype = lua_type(L,idx_type+2);
	switch (mtype) {

	case LUA_TLIGHTUSERDATA: {//skynet.call的调用是走这里
		void * msg = lua_touserdata(L,idx_type+2);
		int size = luaL_checkinteger(L,idx_type+3);
		//注意这里给 type 还加上了一个 PTYPE_TAG_DONTCOPY 标记 这里source是0
		session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);//next
		
		break;
	}


	lua_pushinteger(L,session);
	return 1;
}

上面的代码给type打了两个标记,PTYPE_TAG_ALLOCSESSIONPTYPE_TAG_DONTCOPYPTYPE_TAG_ALLOCSESSION 表示希望服务分配一个session,PTYPE_TAG_DONTCOPY表示是否需要分配新内存。具体往下看skynet_send

static void
_filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
	int needcopy = !(type & PTYPE_TAG_DONTCOPY);//检查type里面有没有 PTYPE_TAG_DONTCOPY标签 
	int allocsession = type & PTYPE_TAG_ALLOCSESSION;//检查type里面有没有 PTYPE_TAG_ALLOCSESSION标记 
	type &= 0xff;//只保留低八位  丢弃临时添加的 PTYPE_TAG_DONTCOPY PTYPE_TAG_ALLOCSESSION 这些标记

	if (allocsession) {
		assert(*session == 0);
		*session = skynet_context_newsession(context);//需要分配session的时候 context不能为NULL
	}

	if (needcopy && *data) {//是否需要分配新内存 把data数据重新拷贝一份
		char * msg = skynet_malloc(*sz+1);
		memcpy(msg, *data, *sz);
		msg[*sz] = '\0';
		*data = msg;
	}

	*sz |= (size_t)type << MESSAGE_TYPE_SHIFT;//把type放置到sz的高八位上
}

int//next
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
	if ((sz & MESSAGE_TYPE_MASK) != sz) {//sz是当前系统最大无符号数 sz的高8位是给type用 此时高8位必须是全0
		skynet_error(context, "The message to %x is too large", destination);
		if (type & PTYPE_TAG_DONTCOPY) {
			skynet_free(data);
		}
		return -2;
	}
	_filter_args(context, type, &session, (void **)&data, &sz);//next

	if (source == 0) {//当前是0
		source = context->handle;//当前服务的handle
	}

    struct skynet_message smsg;
    smsg.source = source;//当前服务handle
    smsg.session = session;//新生成的session
    smsg.data = data;
    smsg.sz = sz;

    if (skynet_context_push(destination, &smsg)) {//把消息push到目标服务的队列
        skynet_free(data);
        return -1;
    }
	
	return session;//最后返回session给lua层
}

sz的高八位是预留出来储存 消息类型的。_filter_args 做三件事情

  • 获取session
  • 是否需要分配新的内存。
  • 给type打上 消息类型 的标记,并设置到sz的高八位。

现在回到lua层skynet.call的定义

function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"

	local p = proto[typename]--根据消息类型获取对应的协议
	local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil 

	return p.unpack(yield_call(addr, session))
end

此时session返回了。消息也已经push到目标服务了。接下来就是挂起当前协程,等待响应。

关于协程的基本知识,可以看这里了解协程

继续看代码yield_call(addr, session)

实际上 skynet.call是在某个协程中被调用的。所以才有我们说的当前协程。一般来说,业务代码都是在某个协程中执行的。框架代码就是调度各个协程。类似我们的操作系统,进程的使用者一般是不关心进程是怎么被操作系统具体调度的。

local function yield_call(service, session)

	session_id_coroutine[session] = running_thread --通过session作为key保存当前协程 
	local succ, msg, sz = coroutine_yield "SUSPEND" --这里表示挂起当前协程
	watching_session[session] = nil
    
	return msg,sz
end

在挂起协程前,通过session记住了当前协程x。等响应消息到来时,通过响应消息里面的session可以找到当前协程x,然后唤醒这个协程,让他继续执行。

话分两头,此时db服务开始处理main服务发送的请求了。db服务处理请求

收到响应

大概db服务已经处理完了。现在收到db发送过来的响应消息了。我们看看当前是怎么唤醒挂起的协程x的。lua服务会把队列里面的消息转交给lua层的一个函数 skynet.dispatch_message 处理。我们知道它的内部先是调用raw_dispatch_message处理。这里是处理响应消息

每个snlua服务都会调用skynet.start(start_func)函数注册启动函数。skynet.start在内部都会向c层注册一个lua层的回调函数。lua服务会把消息队列里面的消息处理最终交给这个回调函数处理。这个回调函数就是 skynet.dispatch_message

local function raw_dispatch_message(prototype, msg, sz, session, source)
	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
	if prototype == 1 then --这里是处理响应
		local co = session_id_coroutine[session] --响应消息的处理都是 通过seesion去得到协程 然后执行协程;因为session是本服务分配的 所以具有唯一性
		
        session_id_coroutine[session] = nil
        suspend(co, coroutine_resume(co, true, msg, sz, session))--next
		end
	
    end

也就是说我们会根据响应消息的 session找到协程x,唤醒协程x继续运行。再看一次 skynet.call

function skynet.call(addr, typename, ...)--我们这里是typename是 "lua"

	local p = proto[typename]--根据消息类型获取对应的协议
	local session = c.send(addr, p.id , nil , p.pack(...))--seesion是通过当前服务分配的 注意第三个参数是 nil 

	return p.unpack(yield_call(addr, session))
end

此时 第6行 yield_call返回了,同时skynet.call也接着返回了想要的年龄信息。

标签:服务,请求,lua,--,响应,session,skynet,type
From: https://www.cnblogs.com/waittingforyou/p/16966095.html

相关文章