6. RPC高级用法
6.1 动态监听handler
有些场景下,我们无法在编译阶段提前实现或注册好所有的handler,但在执行时可以通过一些动态规则动态分配handler。 mqant也支持这样的功能场景
6.1.1 handler监听器
type RPCListener interface {
/**
NoFoundFunction 当未找到请求的handler时会触发该方法
*FunctionInfo 选择可执行的handler
return error
*/
NoFoundFunction(fn string) (*FunctionInfo, error)
/**
BeforeHandle会对请求做一些前置处理,如:检查当前玩家是否已登录,打印统计日志等。
@session 可能为nil
return error 当error不为nil时将直接返回改错误信息而不会再执行后续调用
*/
BeforeHandle(fn string, callInfo *CallInfo) error
OnTimeOut(fn string, Expired int64)
one rror(fn string, callInfo *CallInfo, err error)
/**
fn 方法名
params 参数
result 执行结果
exec_time 方法执行时间 单位为 Nano 纳秒 1000000纳秒等于1毫秒
*/
OnComplete(fn string, callInfo *CallInfo, result *rpcpb.ResultInfo, exec_time int64)
}
6.1.2 设置监听器
func (self *HttpGateWay) OnInit(app module.App, settings *conf.ModuleSettings) {
self.SetListener(self)
}
6.1.3 示例
实现一个http网关路由示例,将handler转换为http请求的path路由
6.1.3.1 监听器实现
func (self *HttpGateWay) NoFoundFunction(fn string)(*mqrpc.FunctionInfo,error){
return &mqrpc.FunctionInfo{
Function:reflect.ValueOf(self.CloudFunction),
Goroutine:true,
},nil
}
func (self *HttpGateWay) BeforeHandle(fn string, callInfo *mqrpc.CallInfo) error{
return nil
}
func (self *HttpGateWay) OnTimeOut(fn string, Expired int64){
}
func (self *HttpGateWay) one rror(fn string, callInfo *mqrpc.CallInfo, err error){}
/**
fn 方法名
params 参数
result 执行结果
exec_time 方法执行时间 单位为 Nano 纳秒 1000000纳秒等于1毫秒
*/
func (self *HttpGateWay) OnComplete(fn string, callInfo *mqrpc.CallInfo, result *rpcpb.ResultInfo, exec_time int64){}
6.1.3.2 请求转发器实现
以下是一段伪代码
- 监听http网关的请求
- 解析http的path(url)
- 填充http请求参数
- 通过httptest模拟http请求
- 将结果返回http网关
func (self *HttpGateWay) CloudFunction(trace log.TraceSpan,request *go_api.Request) (*go_api.Response,error) {
e := echo.New()
ectest := httgatewaycontrollers.SetupRouter(self, e)
req, err := http.NewRequest(request.Method, request.Url, strings.NewReader(request.Body))
if err != nil {
return nil,err
}
for _,v:=range request.Header{
req.Header.Set(v.Key, strings.Join(v.Values,","))
}
rr := httptest.NewRecorder()
ectest.ServeHTTP(rr, req)
resp := &go_api.Response{
StatusCode: int32(rr.Code),
Body: rr.Body.String(),
Header: make(map[string]*go_api.Pair),
}
for key, vals := range rr.Header() {
header, ok := resp.Header[key]
if !ok {
header = &go_api.Pair{
Key: key,
}
resp.Header[key] = header
}
header.Values = vals
}
return resp,nil
}
6.2 全局监听handler
通常希望能监控handler的具体执行情况,例如做监控报警等等
6.2.1 应用级别handler监听
app := mqant.CreateApp(
module.SetClientRPChandler(func(app module.App, server registry.Node, rpcinfo rpcpb.RPCInfo, result interface{}, err string, exec_time int64) {
}),
module.SetServerRPCHandler(func(app module.App, server module.Module, callInfo mqrpc.CallInfo) {
}),
)
6.2.2 调用方监控
module.SetClientRPChandler(func(app module.App, server registry.Node, rpcinfo rpcpb.RPCInfo, result interface{}, err string, exec_time int64) {
})
6.2.3 服务方监控
module.SetClientRPChandler(func(app module.App, server registry.Node, rpcinfo rpcpb.RPCInfo, result interface{}, err string, exec_time int64) {
})
7. 长连接网关
7.1 网关介绍
mqant中的Gate网关模块相对来说非常重要,它支撑了服务器与客户端的长连接通信
7.1.1 特性
- 支撑tcp,websocket通信方式
- 默认支撑MQTT协议
- 可自定义通信协议
7.1.2 使用Gate网关模块
gate网关模块包含的功能虽然多,但在实际开发时并不需要做过多的二次开发, 开发者只需要继承basegate.Gate这个基础模块即可,示例如下:
type Gate struct {
basegate.Gate //继承
}
func (this *Gate) GetType() string {
//很关键,需要与配置文件中的Module配置对应
return "Gate"
}
func (this *Gate) Version() string {
//可以在监控时了解代码版本
return "1.0.0"
}
func (this *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
//注意这里一定要用 gate.Gate 而不是 module.BaseModule
this.Gate.OnInit(this, app, settings)
}
7.2 mqtt协议
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
- 对负载内容屏蔽的消息传输;
- 使用 TCP/IP 提供网络连接;
有三种消息发布服务质量:
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制;
总的说来MQTT协议是非常精简的通信协议,同时也有完善的【心跳包检测】和【重连机制】,很适合移动游戏环境使用
7.2.1 消息体概述
除去MQTT协议的实现,在实际游戏过程中我们可以只需要关注以下内容: MQTT协议消息体由两部分组成【topic】和【body】
7.2.1.1 主题topic
MQTT是通过主题对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用就是了。
主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而*只能出现在主题最后表示过滤任意级别的层级。 举个例子:
baidu/chatroom:代表百度公司的聊天室。
+/chatroom:代表任何公司的聊天室。
baidu/*:代表百度公司所有的子频道
7.2.1.2 消息体(body)
消息体是二进制数据流
7.2.2 如何使用MQTT协议实现游戏路由
由于mqant目前主要用于游戏开发,因此mqant只使用了mqtt协议的一小部分功能。
mqant网关将收到信息topic解析出moduleType和handler用来定位到后端模块的对应处理方法,然后进行远程RPC调用。msgid作为客户端是否需要消息应答的的标记
7.3 编写第一个网关
7.3.1 代码组织结构
重新组织了一下代码目录结构,新增了一个gate目录用来存放网关代码,robot目录用来存放访问网关的mqtt客户端代码
工程目录
|-bin
|-conf
|-server.conf
|-helloworld
|-module.go
|-xxx.go
|-gate
|-module.go
|-robot
|-test
|-manager.go
|-work.go
|-robot_task.go
|-main.go
7.3.2 编写第一个网关
package mgate
import (
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/gate/base"
"github.com/liangdas/mqant/module"
)
var Module = func() module.Module {
gate := new(Gate)
return gate
}
type Gate struct {
basegate.Gate //继承
}
func (this *Gate) GetType() string {
//很关键,需要与配置文件中的Module配置对应
return "Gate"
}
func (this *Gate) Version() string {
//可以在监控时了解代码版本
return "1.0.0"
}
func (this *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
//注意这里一定要用 gate.Gate 而不是 module.BaseModule
this.Gate.OnInit(this, app, settings,
gate.WsAddr(":3653"),
gate.TcpAddr(":3563"),
)
}
7.3.3.1 网关监听端口
func (this *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
//注意这里一定要用 gate.Gate 而不是 module.BaseModule
this.Gate.OnInit(this, app, settings,
gate.WsAddr(":3653"),
gate.TcpAddr(":3563"),
)
}
7.4 网络路由协议
7.4.1 通信模式
mqant是支持与客户端双向通信的长连接框架,与客户端通信有以下三种模式:
-
Request-Response模式
类似http的Request--Response模式,一问一答。
-
Request-NoResponse模式
客户端发出消息后不需要服务端回答,一问。Request-NoResponse模式通常与ServerPush模式配合使用,当后端异步响应客户端消息时非常有用
-
ServerPush模式
服务器主动给客户端发送消息与app的推送功能相似
7.4.2 默认路由协议
mqant网关是这样进行约定的
topic格式约定
[moduleType]/[handler]/[msgid]
moduleType 模块名称
handler 模块中的的方法
msgid 本次消息唯一ID [可选]
mqant网关将收到信息topic解析为以上三部分,moduleType、handler、msgid。 moduleType和handler其中用来定位到后端模块的对应处理方法,然后进行远程RPC调用。 msgid作为客户端是否需要消息应答的的标记,即类似http的Request-Response模式。 如果不设置msgid就是Request-NoResponse模式。
7.5 session
gate handler传参第一个参数是session gate.Session,本章节将详细阐述session的原理和用法
7.5.1 网关与后端通信
在客户端通过网关向后端业务模块发送消息时,网关会将session放在第一个参数,因此再设计handler是需要将第一个参数定义为session
func (self *HellWorld) gatesay(session gate.Session,msg map[string]interface{}) (r string, err error) {
session.Send("/gate/send/test",[]byte(fmt.Sprintf("send hi to %v", msg["name"])))
return fmt.Sprintf("hi %v 你在网关 %v", msg["name"],session.GetServerId()), nil
}
7.5.2 session定义
Session是由Gate网关模块维护的,代表客户端跟网关建立的一条连接,session封装了网关和客户端连接的信息
7.5.2.1 组成
- 网关信息
- 连接自定义信息
标签:网关,mqant,string,module,进阶篇,func,gate,Gate From: https://blog.csdn.net/peaceLT/article/details/140790378大致字段如下:
{ Userid string IP string Network string Sessionid string Serverid string Settings <key-value map> }
- Userid
需要调用Bind绑定来设置 默认为"" 当客户端登陆以后可以设置该参数,其他业务模块通过判断Userid来判断该连接是否合法- IP
客户端IP地址- Network
网络类型 TCP websocket ...- Sessionid
Gate网关模块生成的该连接唯一ID- Serverid
Gate网关模块唯一ID,后端模块可以通过它来与Gate网关进行RPC调用- Settings
可以给这个连接设置一些参数,例如当用户加入对战房间以后可以设置一个参数 roomName="mqant"