首页 > 其他分享 >db服务处理请求

db服务处理请求

时间:2024-02-27 19:12:25浏览次数:18  
标签:end 请求 处理 db queue -- session skynet local

新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X

系列博客的大纲

  • 处理请求的基本流程
  • 尾调用
  • 协程调度框架中的三个队列
    • wakeup_queue
    • error_queue
    • fork_queue
  • 协程调度
  • db服务处理main服务发送过来的请求
--db.lua
local skynet = require "skynet"
require "skynet.manager"	-- import skynet.register

local db = {--保存了年龄
    zhangsan = 12,
    lisi = 33,
    wangwu = 4
}

local command = {}

function command.GET(key)
	return db[key]
end


skynet.start(function()
	
	skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
		
		local f = command[cmd] --这里收到的 cmd 是"GET"
		if f then
			skynet.ret(skynet.pack(f(...)))--发送响应给main服务
		end
	end)

end)

snlua服务处理请求的基本流程

大多数 skynet 服务使用 lua 编写。lua服务把消息队列的消息取出后,调用c的回调函数,最终是把消息交给一个指定的lua函数处理。

snlua服务的这个lua函数就是 skynet.dispatch_message 。处理这个消息主要分为两大步 1. raw_dispatch_message 2. 不断的从 fork_queue 队列中把协程取出来做处理。

function skynet.dispatch_message(...)
	local succ, err = pcall(raw_dispatch_message,...)
	while true do
		if fork_queue.h > fork_queue.t then
			-- queue is empty
			fork_queue.h = 1  --head
			fork_queue.t = 0  --tail
			break
		end
		-- pop queue
		local h = fork_queue.h
		local co = fork_queue[h]
		fork_queue[h] = nil
		fork_queue.h = h + 1

		local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
		
	end
	assert(succ, tostring(err))
end

在lua服务中会收到请求消息和响应消息。a发送一个请求给b,然后等待b回应。b收到的这个请求,就是请求消息。当a收到b的回应消息时,这个消息就是响应消息。此时我们的db服务收到了一个lua类型的消息。我们先看 raw_dispatch_message

local function raw_dispatch_message(prototype, msg, sz, session, source)
	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
	if prototype == 1 then --这里是处理响应
		--略
	else --这里主要是处理lua text socket 等消息类型 
		local p = proto[prototype]
        
		local f = p.dispatch
		if f then
			local co = co_create(f)	--获取一个协程对象并设置任务函数f
			session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同的请求者发过来的session可以是相同的
			session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁
			
			suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))--next
	
		end
	end
end

根据 消息类型 我们的流程是 从 第行6开始。也就是”lua”类型的消息。首先根据消息类型找到对应的协议,然后获取任务处理函数 f。

之后我们获取一个协程,给协程设置这个函数,再唤醒这个协程就算是真正处理我们收到的消息了。当然唤醒协程开始执行前,必须先保存请求者的信息,不然后面处理完请求后,都不知道把结果发送给谁。注意这里是以 co为key保存请求者信息的。为什么不用session的原因是,不同的请求者发送过来的session可能是相同的。因为session是每个服务自己产生的。a服务可以产生一个session号9527,b服务也可以产生一个session号9527。

唤醒协程是通过 coroutine_resume(co, session,source, p.unpack(msg,sz))。这里的任务函数f是db服务在入口函数里设置的。当db的业务处理完成后,即协程挂起时,就会执行suspend。这个suspend接下来会唤醒其他协程。也就是说当我们lua服务收到一个消息时,就会有一个协程去处理,当这个协程处理后,挂起时,就会给执行权给其他协程。

function suspend(co, result, command)
	
	if command == "SUSPEND" then
		return dispatch_wakeup() --next
	elseif command == "QUIT" then
		coroutine.close(co)
		-- service exit
		return
	end
end

上面的代码 我们主要关注第3行。一般情况下 ,挂起时都会返回 ""SUSPEND"".我们继续看 dispatch_wakeup()

local function dispatch_wakeup()
	while true do
		local token = tremove(wakeup_queue,1)--从唤醒队列中不断取出协程
		if token then
			local session = sleep_session[token] -- 从sleep表中查找 注意 这个表和 唤醒队列 具有不同的意义
			if session then
				local co = session_id_coroutine[session]
				
				session_id_coroutine[session] = "BREAK"
				return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
			end
		else
			break
		end
	end
	return dispatch_error_queue()--这里是为了处理这种情况:当前服务在苦苦等待服务x响应,而服务x已经有错误,且已经把错误通知当前服务了 
end

当我们调用skynet.sleep或者skynet.wait时,就会把指定协程加入到sleep_session睡眠表中,标识协程是睡眠状态,同时当前协程会挂起。之后在合适的时机,我们调用skynet.wakeup就会把指定的睡眠协程加入到wakeup_queue唤醒队列中。之后这个唤醒队列里面的协程就会被调度,得到执行。

注意把协程加入唤醒队列,不代表马上就唤醒协程执行。

上面的dispatch_wakeup主要的处理过程是这样:

  1. 如果唤醒队列中有协程,转2;如果没有转 4
  2. 取出一个,并唤醒执行。
  3. 唤醒执行挂起后 回到 1
  4. dispatch_error_queue

看看dispatch_error_queue 代码

local function dispatch_error_queue()
	local session = tremove(error_queue,1)
	if session then
		local co = session_id_coroutine[session]
		session_id_coroutine[session] = nil
		return suspend(co, coroutine_resume(co, false, nil, nil, session))
	end
end

这几个函数的共同特点是最后都会调用suspend。有点像是递归调用。实际上,并不是。这是lua尾调用。去了解lua函数调用 也就是说 调用帧 是不会一直递增的。

这里出现了一个所谓的error_queue 错误队列。比如当我们的协程a发送请求给x服务后,会等待x服务给出响应。如果此时x服务退出或者是处理出现错误,那么就会给我们服务发送一个错误类型的消息。针对错误消息的处理函数会把协程a加入错误队列,等待时机唤醒执行,不然协程a就会一直挂起。看看错误消息的处理函数

	REG {
		name = "error",
		id = skynet.PTYPE_ERROR,
		unpack = function(...) return ... end,
		dispatch = _error_dispatch,
	}

local function _error_dispatch(error_session, error_source)
	skynet.ignoreret()	-- don't return for error
	if error_session == 0 then --收到一个即将下线的服务x的错误消息 
		-- error_source is down, clear unreponse set 
		for session, srv in pairs(watching_session) do --原本发出请求给服务x,等待x响应的协程需要另外处理了
			if srv == error_source then
				tinsert(error_queue, session)
			end
		end
	else
		-- capture an error for error_session
		if watching_session[error_session] then
			tinsert(error_queue, error_session)
		end
	end
end

错误类型消息处理函数 主要是把之前等待的协程加入错误队列。关于错误类型处理,暂时讲到这里。

skynet.dispatch_message第一步已经讲完,还有第二步是处理 fork_queue .

function skynet.dispatch_message(...)
	local succ, err = pcall(raw_dispatch_message,...)
	while true do --从这里开始处理fork_queue 
		if fork_queue.h > fork_queue.t then
			-- queue is empty
			fork_queue.h = 1  --head
			fork_queue.t = 0  --tail
			break
		end
		-- pop queue
		local h = fork_queue.h
		local co = fork_queue[h]
		fork_queue[h] = nil
		fork_queue.h = h + 1

		local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
		
	end
	assert(succ, tostring(err))
end

上面的代码主要是从 fork_queue 队列中不断的取出协程,然后依旧是调用suspend来处理。fork_queue 里面的协程是怎么来的?一般当我们在业务层需要协程的时候,我们就会调用skynet.fork(func,...)来创建一个。看代码

function skynet.fork(func,...)
	local n = select("#", ...)
	local co
	if n == 0 then
		co = co_create(func)
	else
		local args = { ... }
		co = co_create(function() func(table.unpack(args,1,n)) end)
	end
	local t = fork_queue.t + 1 --尾部递增
	fork_queue.t = t
	fork_queue[t] = co --push到队列的尾部
	return co
end

实际上,内部也是调用co_create()创建的协程。这里总结下分发流程图。

dispatch

db服务返回响应给main服务

我们最后看db服务是如何在处理完业务后,也就是拿到年龄信息后,怎么把信息发送给main服务的。回顾

--db.lua

local skynet = require "skynet"
require "skynet.manager"	-- import skynet.register

local db = {--保存了年龄
    zhangsan = 12,
    lisi = 33,
    wangwu = 4
}

local command = {}

function command.GET(key)
	return db[key]
end


skynet.start(function()
	
	skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求
		
		local f = command[cmd] --这里收到的 cmd 是"GET"
		if f then
			skynet.ret(skynet.pack(f(...)))--发送响应给main服务
		end
	end)

end)

上面的代码很容易找到 zhangsan的年龄是 12. 我们主要看 skynet.ret是怎么把返回数据发送给 main服务的。

function skynet.ret(msg, sz)
	msg = msg or ""

	local co_session = session_coroutine_id[running_thread] --找到session

	session_coroutine_id[running_thread] = nil
	if co_session == 0 then
		return false	-- send don't need ret
	end
	local co_address = session_coroutine_address[running_thread] --找到请求者地址
	local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz) --注意消息类型是 skynet.PTYPE_RESPONSE
	if ret then
		return true
	end
	
end

还记得吗,当我们db收到请求时,就把请求者的信息保存下来了。而且是以 当前协程为key,请求者信息为value保存的。现在把信息取出来,通过c.send发送给给main服务。

标签:end,请求,处理,db,queue,--,session,skynet,local
From: https://www.cnblogs.com/waittingforyou/p/18037602

相关文章

  • spring重点后置处理器
    1.DefaultListableBeanFactory的作用:默认实现了ListableBeanFactory和BeanDefinitionRegistry接口,基于beandefinition对象,是一个成熟的beanfactroy。最典型的应用是:在访问bean前,先注册所有的definition(可能从beandefinition配置文件中)。使用预先建立的bean定义元数......
  • 你是怎么处理vue项目中的错误的?
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助一、错误类型任何一个框架,对于错误的处理都是一种必备的能力在Vue 中,则是定义了一套对应的错误处理规则给到使用者,且在源代码级别,对部分必要的过程做了一定的错误处理。主要的错误来源包括:后端接口错误代码中......
  • springboot 统一处理请求非法参数
    通过拦截器和过滤器实现,话不多说上代码。1、重写HttpServletRequestWrapper读取body里面的内容。publicclassRequestWrapperextendsHttpServletRequestWrapper{privatefinalStringbody;publicRequestWrapper(HttpServletRequestrequest){super......
  • DbContext配置解析
    publicclassIdDbContext:IdentityDbContext<User,Role,long>{publicIdDbContext(DbContextOptions<IdDbContext>options):base(options){}protectedoverridevoidOnModelCreating(ModelBuildermodelBuilder){......
  • 云服务器转发动态请求(uwsgi+django项目)
    路飞后台部署本地操作上线前配置prod.py:上线的配置文件,内容拷贝dev.py,前身就是settings.py#关闭测试环境DEBUG=FalseALLOWED_HOSTS=['39.99.192.127'#公网ip地址]CORS_ORIGIN_ALLOW_ALL=True#允许所有跨域#静态文件配置:上线后还有额外配置,见下方......
  • C#程序全局异常处理—WPF和Web API两种模式
    C#程序的全局异常处理,网上搜下资料都是一大堆,我这里最近也是独立做一个B/S结构的小项目,后面又增加了需求用WPF实现相同的功能,这里将我所使用的全局异常处理方式做一个简短的总结分享。WebAPI项目的全局异常处理这种项目下,我们可以直接自定义一个全局异常的过滤器,用来处理全局......
  • Go语言精进之路读书笔记第37条——了解错误处理的4种策略
    C语言家族的经典错误机制:错误就是值。同时Go结合函数/方法的多返回值机制避免了像C语言那样在单一函数返回值种承载多重信息的问题。37.1构造错误值错误处理的策略与构造错误值的方法是密切关联的。错误是值,只是以error接口变量的形式统一呈现(按惯例,函数或方法通常将error类型......
  • 传统套路只能处理低维问题,机器学习数学理论的关键是高维函数
    与传统方法相比,机器学习解决的最基本的问题就是函数的表达和逼近。数学上有分片多项式、傅利叶级数、小波……这都是传统的表达函数的套路。但传统套路只能处理低维问题,难以处理高维问题。而机器学习,尤其是深度学习,解决的许多问题都是非常高维的,所以机器学习数学理论的关键是高维......
  • 在Winform界面中使用自定义控件,丰富界面的效果处理
    我们在《SqlSugar开发框架》中,Winform界面开发部分往往也用到了自定义的用户控件,对应一些特殊的界面或者常用到的一些局部界面内容,我们可以使用自定义的用户控件来提高界面的统一性,同时也增强了使用的便利性。如我们Winform界面中用到的分页控件、附件显示内容、以及一些公司、部......
  • python3的json数据库-TinyDB效率篇
    安装了这个TinyDB库后,我突然想到一般来说python执行的速度并不算高,那这个库写文件速度如何呢?测试代码如下:fromtinydbimportTinyDBimporttime#创建数据库对象db=TinyDB('db.json')milliseconds1=int(time.time()*1000)db.insert({'type':'apple','count':......