集群的使用
现在的游戏服务器框架中,分布式是一种常见的需求。一个游戏服务器组通常可以分成网关服务器、登录服务器、逻辑服务器、跨服服务器等等。
在skynet
中,我们可以通过cluster
来组建一个集群,实现分布式的部署。
示例
我们先来看一个简单的例子,在这里我们实现了两个skynet
结点(进程):一个称为center
,一个称为game
。
在center
中启动一个data
服务,然后game
结点向center
结点的data
服务获取数据。
center
结点有两个文件centerMain
和dataService
:
--centerMain.lua
local skynet = require("skynet")
require("skynet.manager")
local cluster = require("skynet.cluster")
skynet.start(function()
--打开结点:表示当前进程是center结点,监听center端口
cluster.open("center")
--启动dataService服务
local addr = skynet.newservice("dataService")
--注册名字,以便其他结点访问
skynet.name(".dataService", addr)
end)
--dataService.lua
local skynet = require "skynet"
local kv = {}
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, k, v)
if cmd == "set" then
kv[k] = v
elseif cmd == "get" then
skynet.retpack(kv[k])
end
end)
end)
game
结点连接center
结点,访问dataService
服务:
local skynet = require("skynet")
require("skynet.manager")
local cluster = require("skynet.cluster")
skynet.start(function()
--使用cluster.send/call
cluster.send("center", ".data", "set", "k1", "v1")
print(cluster.call("center", ".data", "get", "k1"))
--使用skynet.send/call
local addr = cluster.proxy("center", ".data")
skynet.send(addr, "lua", "set", "k1", "v2")
print(skynet.call(addr, "lua", "get", "k1"))
end)
在配置文件中,我们需要指定一个cluster
文件:
--config
cluster = "./config/clusterConfig.lua"
--config/clusterConfig.lua
__nowaiting = true
center="127.0.0.1:14880"
game="127.0.0.1:14881"
这样一个简单的skynet
集群就搭建好了
cluster
库
在示例中,我们看到集群的监听和发送,都是通过cluster
这个库来操作的。cluster
是一个封装的用来进行集群相关操作的函数库,通过local cluster = require("skynet.cluster")
引入
主要的函数有:
1. cluster.reload
cluster.reload
用来加载配置。除了通过配置文件之外,我们也可以使用cluster.reload
来加载配置:
cluster.reload({
center="127.0.0.1:14880",
game="127.0.0.1:14881",
})
如果不传参数的话,则表示重新从配置文件中加载:
cluster.reload()
reload
是新加配置,对于旧的结点,如果没有被覆盖到则没有影响。想要清除旧的结点,可以配置node=nil
可以通过配置节点名字对应的地址为 false 来明确指出这个节点已经下线
2. cluster.open
cluster.open
表示监听指定的端口。示例中,在center
结点,我们使用cluster.open('center')
来监听center
端口,在这里是监听127.0.0.1:14880
。
这里会发一条listen
指令给到clusterd
,clusterd
启动gate
服务,并通过gate
服务来监听端口。
clusterd
在收到listen
时,会启动一个gate
服务,所以如果你有自定义的网关服务,最好不要用gate
这个服务名。
在一个进程里,我们可以open
多个端口,如果其他结点不会主动连接本结点,那也可以不open
任何端口。
3. cluster.send / cluster.call
send
和call
可以向指定的结点发送lua
消息。
其效果和在目标结点上,发送lua
消息一样。
在game
结点上执行:
cluster.send("center", ".dataService", "set", "k1", "v1")
相当于在center
结点上执行:
skynet.send(".dataService", "lua", "set", "k1", "v1")
4. cluster.proxy
cluster.proxy
用于生成一个代理地址,使发送跨结点的消息看起来和发送本地消息一样:
local addr = cluster.proxy('center', '.dataService')
skynet.send(addr, 'lua', 'set', 'k1', 'v1')
和发送本地消息的区别在于,这种方式下,skynet.send只能支持lua消息。
5. cluster.register / cluster.query
cluster.register
用于给某个服务在当前结点注册个名字。
其他结点,则可以使用cluster.query
来判断目标结点是否存在某个服务。
clusterd
服务
当我们引入cluster
库时,会启动一个唯一服务clusterd
,这是在cluster.lua
文件中,通过skynet.init
的方式启动的:
--cluster.lua
skynet.init(function()
clusterd = skynet.uniqueservice("clusterd")
end)
cluster
库的函数,基本上都是通过发送lua
消息到clusterd
服务来进行的。
clusterd
会处理以下lua
消息:
1. reload
clusterd
维护一个node_address
表,用来记录每个结点对应的IP地址和端口。
2. listen
收到listen
消息时,clusterd
会启动一个gate
服务。然后根据结点名,获取结点地址,启动gate
监听网络端口。
当这个gate
服务收到网络连接时,会发送lua
消息socket
到clusterd
服务,然后由clusterd
启动一个clusteragent
服务,来处理这个连接的消息:
function command.socket(source, subcmd, fd, msg)
if subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
-- new cluster agent
cluster_agent[fd] = false
local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
--...其他代码
else
--...关闭和错误处理
end
end
3. register
clusterd
维护一个register_name
表,用来记录注册的服务名和地址。
4. proxy
clusterd
维护一个proxy
表,记录结点名.服务名
对应的代理地址,当代理地址不存在时,则创建一个clusterproxy
服务。
5. sender
clusterd
维护一个node_channel
表,记录结点对应的clustersender
地址,这里会返回结点对应的clustersender
地址,没有则先创建一个。
发送消息到其他结点
cluster.send
cluster.send
会通过一个clustersender
服务来发送消息。先看看代码:
function cluster.send(node, address, ...)
-- push is the same with req, but no response
local s = sender[node]
if not s then
table.insert(task_queue[node], skynet.packstring(address, ...))
else
skynet.send(sender[node], "lua", "push", address, skynet.pack(...))
end
end
在cluster
中维护着sender
列表,在同一个服务里,sender
记录每个node
对应的clustersender
服务。
当sender[node]
不存在的时候,没有直接创建一个clustersender
,而是将当前的参数打包,然后插入到一个task_queue
队列中,这是因为创建服务是一个阻塞的过程,在创建服务的过程中,可能会再次调用cluster.send
,所以这里将所有创建过程中的参数都缓存起来,等clustersender
创建完成后,再统一发送到目标结点。
而创建clustersender
,又是发送消息到clusterd
服务:local ok, c = pcall(skynet.call, clusterd, "lua", "sender", node)
skynet.packstring
可以将多个参数(字符串,数字,表,布尔值)序列化成一个字符串。可以使用skynet.unpack
反序列化将参数解出。
cluster.call
cluster.call
同样是通过clustersender
服务来发送消息。
function cluster.call(node, address, ...)
-- skynet.pack(...) will free by cluster.core.packrequest
local s = sender[node]
if not s then
local task = skynet.packstring(address, ...)
return skynet.call(get_sender(node), "lua", "req", repack(skynet.unpack(task)))
end
return skynet.call(s, "lua", "req", address, skynet.pack(...))
end
cluster.send
使用task_queue
可以立刻返回,不会阻塞,而cluster.call
本身就是会阻塞的,所以可以直接使用get_sender
,以阻塞的形式获取一个clustersender
。
注意这里先将参数序列化,等获取到sender
之后才重新化序列化,这是因为get_sender
这个过程是阻塞的,而参数有可能是个table
,在阻塞的过程中,这个table
中的值有可能发生变化,导致逻辑不符合预期,所以这里通过序列化来保证发送时的参数不会被改变。
clustersender
服务
clustersender
用来连接指定结点并发送数据。这个服务是在第一次发送数据时才创建的。
在集群中,结点A向结点B发送过消息,那么结点A就有一个指向结点B的clustersender
服务,且只有一个。
clustersender
服务是在clusterd
服务中创建的,clusterd
是一个唯一服务,在这个服务的管理下,每个目标结点只有一个clustersender
服务。
现在我们来看看clustersender
服务是怎么发送数据的。
启动服务时,我们传过来四个参数
node
: 连接的集群的结点名字
nodename
: 主机的hostname
init_host
: socket
连接的地址
init_port
: socket
连接的端口
在skynet.start
时,创建了一个skynet.socketchannel
,然后设置lua
消息处理函数,这里主要处理两种服务:
push
push
对应的是cluster.send
,只负责发送,不需要响应。
function command.push(addr, msg, sz)
local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
if padding then -- is multi push
session = new_session
end
channel:request(request, nil, padding)
end
这里的cluster
的C
层的库,不是lua
层的库。
cluster.packpush
按特定的协议,来打包数据。
cluster.packpush
返回三个值:
request
:打包后的二进制数据。
new_session
:session+1
。
padding
:如果数据过大,则将超过单包上限的二进制数据以table
数组的形式放在padding
里。
channel:request
则是将request
发送给对端主机,如果有padding
,则分多个包发出去。这里第二个参数是response
,这里为nil
表示不需要响应。
req
req
对应的是cluster.call
,需要等待响应的返回。
local function send_request(addr, msg, sz)
-- msg is a local pointer, cluster.packrequest will free it
local current_session = session
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
session = new_session
local tracetag = skynet.tracetag()
if tracetag then
if tracetag:sub(1,1) ~= "(" then
-- add nodename
local newtag = string.format("(%s-%s-%d)%s", nodename, node, session, tracetag)
skynet.tracelog(tracetag, string.format("session %s", newtag))
tracetag = newtag
end
skynet.tracelog(tracetag, string.format("cluster %s", node))
channel:request(cluster.packtrace(tracetag))
end
return channel:request(request, current_session, padding)
end
function command.req(...)
local ok, msg = pcall(send_request, ...)
if ok then
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
else
skynet.error(msg)
skynet.response()(false)
end
end
command.req
调用send_request
,返回值如果是table
表示这是一个大包切割成多个小包,需要使用cluster.concat
连接起来再返回,如果是string
则直接返回。
在send_request
中,cluster.packrequest
和cluster.packpush
类似,来打包request
类型的数据。
最后同样是交给channel:request
来发送socket
消息,和push
不同的是,这里第二个参数传入了current_session
,表示接收响应的会话ID。
这里简单的讲一下channel:request
是怎么发送和接收数据的。
channel:request
首先会检查当前的连接状态:
- 如果
socket
连接还没建立,则先建立连接,再发送数据; - 如果
socket
连接已断开,则会抛出异常; - 如果
socket
连接正常,则直接发送数据。
在发送数据的时候,如果数据包太大(超过32K
),则会切分成多个包来发送。
如果需要等待响应数据,那么会调用socketchannel
的__response
函数,这个函数是在socketchannel
初始化的时候传入的,在socketsender
这里,则是read_response
函数:
local function read_response(sock)
local sz = socket.header(sock:read(2))
local msg = sock:read(sz)
return cluster.unpackresponse(msg) -- session, ok, data, padding
end
skynet.start(function()
channel = sc.channel {
host = init_host,
port = tonumber(init_port),
response = read_response,
nodelay = true,
}
skynet.dispatch("lua", function(session , source, cmd, ...)
local f = assert(command[cmd])
f(...)
end)
end)
read_response
中,sock:read
是一个阻塞函数,接收对端socket
传回来的网络消息。
clsteragent
服务
前面提到过,当clustersender
第一次发送数据时,会先建立socket
连接,而当socket
连接建立时,对面的结点会创建一个clusteragent
服务,来处理收到的数据。
clusteragent
的创建是在clusterd
服务中:
function command.socket(source, subcmd, fd, msg)
if subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
-- new cluster agent
cluster_agent[fd] = false
local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
--...其他代码
else
--...其他代码
end
end
可以看到,创建clusteragent
的时候,传入了三个参数:clusterd
地址、source
地址(即gate
地址)、fd
文件描述符(代表这个socket
)。
clusteragent
在调用skynet.start
的时候,设置gate
服务的转发,将来自fd
的网络消息,都转发到这个clusteragent
地址,然后设置了对网络消息的处理:
skynet.register_protocol {
name = "client",
id = skynet.PTYPE_CLIENT,
unpack = cluster.unpackrequest,
dispatch = dispatch_request,
}
这里的cluster.unpackrequest
将clustersender
传过来的网络数据进行解析,然后分配给dispatch_request
处理:
dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
clustersender
中会将服务地址addr
打包,这里将addr
解析出来,addr
可以是字符串,也可以是数字。
当addr
是数字0
的时候,表示查询某个注册名字的数字地址:
--结点A
cluster.register("name", addr)
--结点B
local addr = cluster.call("NodeA", 0, "name") --返回NodeA中注册的"name"的数字地址
当addr
是字符串或大于0的数字,则判断addr
是不是通过cluster.register
注册过的,如果是则addr
转化成注册的地址。 然后再根据is_push
,来执行skynet.rawcall
或skynet.rawsend
,进行数据转发。 如果是call
,最后还需要将数据通过socket
返回给clustersender
。
clusterproxy服务
当我们调用cluster.proxy(node, addr)
时,会向clusterd
申请一个clusterproxy
服务。
这里的参数,有三种形式:
- cluster.proxy("center", ".data")
- cluster.proxy("center.addr") :等价于 cluster.proxy("center", ".data")
- cluster.proxy("center@addr") :等价于 cluster.proxy("center", "@data")
clusterd
会以node .. "." .. name
作为key
,保证同一个key
只有一个clusterproxy
服务。
而clusterproxy
服务很简单:
它会向clusterd
申请一个面向node
的clustersender
,然后就收到的lua
消息,转发到这个clustersender
上,参数使用服务初始化时的addr
。