首页 > 其他分享 >pomelo广播的实现(chat例子分析)

pomelo广播的实现(chat例子分析)

时间:2023-06-04 18:32:00浏览次数:48  
标签:function connector route pomelo 广播 chat var 服务器 channel



      其实最开始要读pomelo框架无非是因为自己没有读过什么node.js框架的源码,不过后来就逐渐变成了想要知道pomelo框架是如何实现广播的,貌似这也是游戏服务器比较重要的功能吧。。。。

一开始会觉得这种广播在分布式的环境下实现会比较的复杂。。但是当搞明白了pomelo的实现之后,发现它是采用了一种折中的方法实现广播。。虽然没有刚开始自己想的那么牛逼,不过觉得也算是一种比较好的解决方案吧。。


那么接下来就用pomelo给的chat这个例子来分析吧,来看登录吧,首先会向gate服务器发起连接:


1. function
2. var route = 'gate.gateHandler.queryEntry';  
3.     pomelo.init({  
4.         host: window.location.hostname,  
5.         port: 3014,  
6. true
7. function() {  
8. //发起请求,用于获取用于连接的connector服务器的地址
9.             uid: uid  
10. function(data) {  
11.             pomelo.disconnect();  
12. if(data.code === 500) {  
13.                 showError(LOGIN_ERROR);  
14. return;  
15.             }  
16.             callback(data.host, data.port);  
17.         });  
18.     });  
19. };

这部分代码主要要完成的目的就是与gate进行通信,gate会返回该客户用于连接的connector服务器的地址,我们来看看gate服务器是怎么生成这个地址的吧:


1. //next是一个函数,用于执行一些操作,将返回的数据发送回去
2. handler.queryEntry = function(msg, session, next) {  
3. var
4. if(!uid) {  
5. null, {  
6.             code: 500  
7.         });  
8. return;  
9.     }  
10. // get all connectors
11. var connectors = this.app.getServersByType('connector');  //获取素有connector服务器的配置信息
12. if(!connectors || connectors.length === 0) {  
13. null, {  //第一个参error,第二个参数wie返回给客户端的信息
14.             code: 500  
15.         });  
16. return;  
17.     }  
18. // select connector
19. var res = dispatcher.dispatch(uid, connectors);   //选取一个connector服务器
20. null, {  
21.         code: 200,  
22.         host: res.host,  
23.         port: res.clientPort  
24.     });  
25. };  
26. var crc = require('crc');  
27.   
28. module.exports.dispatch = function(uid, connectors) {  
29. var
30. return
31. };



到这里就应该知道gate服务器是怎么挑选connector服务器的了吧。。。那么在获取了用于连接的connector之后,就应该建立与connector服务器的连接,进行登录了。。。代码如下:




1. //query entry of connection
2. queryEntry(username, function(host, port) {  
3.     pomelo.init({  
4. //这里是返回的用于连接的connector服务器的host与port
5.         port: port,  
6. true
7. function() {  
8. var route = "connector.entryHandler.enter";  //这里可以当做是进行登录吧
9.         pomelo.request(route, {  
10.             username: username,  
11.             rid: rid  
12. function(data) {  
13. if(data.error) {  
14.                 showError(DUPLICATE_ERROR);  
15. return;  
16.             }  
17.             setName();  
18.             setRoom();  
19.             showChat();  
20.             initUserList(data);  
21.         });  
22.     });  
23. });


可以看到这里调用的是connector服务器的handler的enter方法,然后传过去的参数是username和rid(房间的id),那么我们来看看这个connector服务器的enter方法干了些什么事情吧:





1. handler.enter = function(msg, session, next) {  
2. var self = this;  
3. var
4. var uid = msg.username + '*' + rid  //用户名字还要加上组名字
5. var sessionService = self.app.get('sessionService');  
6.   
7. //duplicate log in
8. if( !! sessionService.getByUid(uid)) {  //表示有相同的用户了
9. null, {  
10.             code: 500,  
11. true
12.         });  
13. return;  
14.     }  
15.   
16. //将这个session与uid绑定起来
17. 'rid', rid);  
18. 'rid', function(err) {  
19. if(err) {  
20. 'set rid for session service failed! error is : %j', err.stack);  
21.         }  
22.     });  
23. 'closed', onUserLeave.bind(null, self.app));  //设置closed事件的处理函数
24.   
25. //put user into channel
26. //这里session适用于挑选后台的chat服务器的,这里还要讲当前frontend服务器的serverID传送过去,因为后台要知道当前channel的用户都在哪些frontend服务器上面连接着
27. //这里挑选后台的chat服务器的时候,用的是rid,所以可以保证同一个房间的人分到同一个chatserver
28. 'serverId'), rid, true, function(users){  
29. null, {  
30. //远程服务器返回的当前channel里面的所有的用户
31.         });  
32.     });  
33. };



其实这部分的处理韩式很简单的,无非是处理一下从connector组件中分配的 session,设置一下rid,uid等基本的信息,最后有一个比较重要的操作,那就是进行chat的远程调用,在chat服务器中添加一个user, 这里上面的注释也已经很清楚了,其实到这里就已经知道pomelo是怎么实现广播的了,但是还是来看看究竟是怎么搞的吧。。。

那么我们来看看这个远程调用是怎么进行的,如果看过之前对pomelo框架proxy模块的分析,上面的实际上执行的是下面的方法:



1. /*
2. { namespace: 'sys',
3.     serverType: 'chat',
4.     path: '/home/fjs/Desktop/pomelo/game-server/node_modules/pomelo/lib/common/remote/backend/' },
5. */
6. null, serviceName, methodName, args, attach, invoke);  //调用proxyCB方法来处理数据


这里serviceName就是chatRemote,methodName是add,args就就是上面传进来的参数,attach就是这个远程调用的基本西溪,例如上面注释的那种形式,invoke可以忽略,那么我们在来看看proxyCB函数究竟干了些设么事情吧:




1. var proxyCB = function(client, serviceName, methodName, args, attach, invoke) {  
2. if(client.state !== STATE_STARTED) {  
3. throw new Error('[pomelo-rpc] fail to invoke rpc proxy for client is not running');  
4.   }  
5.   
6. if(args.length < 2) {  
7. '[pomelo-rpc] invalid rpc invoke, arguments length less than 2, namespace: %j, serverType, %j, serviceName: %j, methodName: %j',  
8.       attach.namespace, attach.serverType, serviceName, methodName);  
9. return;  
10.   }  
11.   
12. var routeParam = args.shift(); //用于route的参数,一般情况下是session
13. var cb = args.pop();  //用于处理返回消息的回调函数
14. //其实msg也就是pomelo定义的远程方法调用的消息格式,远程服务器会根据这个消息来解析需要调用的方法名字等信息
15. //namespace可以是sys和user,servicename是当前调用的js源码或者说模块的名字,method就是方法的名字,args为传给方法的参数
16. var
17.     service: serviceName, method: methodName, args: args};  
18. // do rpc message route caculate
19. var
20. if(typeof client.router === 'function') {  
21.     route = client.router;  
22. null;  
23. else if(typeof client.router.route === 'function') {  
24. //router函数,是用于在服务器中挑选一个,甚至可以理解为负载均衡吧
25.     target = client.router;  
26. else
27. '[pomelo-rpc] invalid route function.');  
28. return;  
29.   }  
30.   
31. //这里调用route函数获取serverID,想这个server发送消息
32. function(err, serverId) {  
33. if(err) {  
34.       utils.invokeCallback(cb, err, serverId);  
35. return;  
36.     }  
37.   
38.     client.rpcInvoke(serverId, msg, cb);  
39.   });  
40. };


其实这里之所以想要再将整个rpc的过程又弄出来,主要就是想要证明一个东西,那就是 同一个房间的用于将会被分配到同一个后台chat服务器,在这里我们可以看到一个route函数,不知道大家是否记得在application的时候的一 段代码:app.route('chat', routeUtil.chat);   //chat是server类型,第二个是route函数

这里就会为挑选后台的chat服务器提供一个route函数,那么将在这里使用,那么我们在这里来看看这个函数是怎么定义的吧:




1. var
2. var dispatcher = require('./dispatcher');  
3.   
4. exp.chat = function(session, msg, app, cb) {  
5. var chatServers = app.getServersByType('chat');  
6.   
7. if(!chatServers || chatServers.length === 0) {  
8. new Error('can not find chat servers.'));  
9. return;  
10.     }  
11.   
12. var res = dispatcher.dispatch(session.get('rid'), chatServers);  //这里可以保证相同的rid最后访问的是同一个chat服务器
13.   
14. null, res.id);  
15. };


这里将会会dispatch函数传入rid,也就是房间的id,这也就能够知道为什么同一个房间的用于将会被分配到同一个chat服务器了吧。。。好了那么这里对rpc的说明就到此了吧,那么接下来来看看调用的chat服务器的add方法究竟干了些什么事情吧:



1. //当有用户进来的时候会调用这个方法
2. //这里uid是用户的id,sid是前端的connector服务器id,那么是房间的id,
3. //由于这里是远程的rpc调用访问的方法,cb是用于将执行结果返回过rpc客户端
4. hatRemote.prototype.add = function(uid, sid, name, flag, cb) {  
5. var channel = this.channelService.getChannel(name, flag);  //这里的flag表示如果没有这个channel的时候要创建这个channel
6. var username = uid.split('*')[0];  
7. var
8. 'onAdd',  
9.     user: username  
10. };  
11. channel.pushMessage(param);  //想这个channel广播消息,表示当前有用户加入了channel
12.   
13. if( !! channel) {  
14. // 在这个channel中添加一个人,这里还将前段的connector服务器的serverid也传进去了
15. }  
16.   
17. cb(this.get(name, flag)); //获取当前channel所有的用户,返回回去
18. ;


这部分其实代码一看就基本上就能明白的差不多吧,无非是根据房间的名字来获取这个房间 的channel,然后再想这个channel广播有用户加进来的消息,接着还要在这个channel中设置新的用户,并且还要讲当前channel中所 有的用户返回,,,。。那么这里涉及到广播消息的就是channel.pushMessage(param);

但是在分析这个广播消息的方法之前,我们先来看看channel中是如何添加用户的吧,也就是channel.add(uid, sid);  // 在这个channel中添加一个人,这里还将前段的connector服务器的serverid也传进去了  




1. //在当前channel中添加一个新的user,uid是username*rid  ,sid就是这个user所属的前端connector服务器的id
2. Channel.prototype.add = function(uid, sid) {  
3. if(this.state > ST_INITED) {  
4. return false;  
5. else
6. //这里add说白了就是为了记录当前的前端connector服务器总所属于当前channel的user
7. var res = add(uid, sid, this.groups);  //用于添加用户,这里groups是用于记录当前前端connector服务器属于当前channel的所有的user
8. if(res) {  
9. this.records[uid] = {sid: sid, uid: uid};  //相当于是记录当前user的前端服务器
10.     }  
11. return
12.   }  
13. };  
14.   
15. /**


其实这个函数要执行的工作就两个:

(1)记录当前user的前端connector服务器

(2)记录这个前端connector服务器的user

到这里应该就更能够明白pomelo是怎么进行广播的了吧

那么接下来我们还是来看看这个广播方法究竟是怎么进行的广播的吧




1. //将数据发送给这个channel的所有用户
2. hannel.prototype.pushMessage = function(route, msg, cb) {  
3. if(this.state !== ST_INITED) {  
4. new Error('channel is not running now'));  
5. return;  
6.  }  
7.   
8. if(typeof route !== 'string') {  
9.    cb = msg;  
10.    msg = route;  
11.    route = msg.route;  
12.  }  
13. /这里group是保存了所有有当前用户的前端connector服务器  
14. this.__channelService__, route, msg, this.groups, cb);  
15. ;


好像没什么意思吧,那么继续来看这个sendMessageByGroup方法吧:



1. //这个函数用于向group的所有的用户发送消息,这里group保存的数据格式是
2. //key:前端的connector服务器的serverID
3. //value:[],一个数组保存这个服务器中要接受数据的user
4. var sendMessageByGroup = function(channelService, route, msg, groups, cb) {  
5. var
6. var namespace = 'sys';  //这里是进行rpc的参数
7. var service = 'channelRemote';  //服务
8. var method = 'pushMessage';  //方法
9. var
10. var successFlag = false;  
11. var
12.   
13. if(count === 0) {  
14. // group is empty
15.     utils.invokeCallback(cb);  
16. return;  
17.   }  
18.   
19. var latch = countDownLatch.createCountDownLatch(count, function(){  
20. if(!successFlag) {  
21. new Error('all uids push message fail'));  
22. return;  
23.     }  
24. null, failIds);  
25.   });  
26.   
27. var rpcCB = function(err, fails) {  
28. if(err) {  
29. '[pushMessage] fail to dispatch msg, err:'
30.       latch.done();  
31. return;  
32.     }  
33. if(fails) {  
34.       failIds = failIds.concat(fails);  
35.     }  
36. true;  
37.     latch.done();  
38.   };  
39.   
40. var
41. for(var sid in
42. //当前server要接受数据的用户
43. if(group && group.length > 0) {  
44. //向相应的服务器发送rpc消息,将数据发送给对应的用户
45. //挨个向所有的服务器发送消息
46.       app.rpcInvoke(sid, {namespace: namespace, service: service,  
47.         method: method, args: [route, msg, groups[sid]]}, rpcCB);  
48. else
49. // empty group
50.       process.nextTick(rpcCB);  
51.     }  
52.   }  
53. };


其实上面的方法无非就是调用前端connector服务器的方法,让他们将数据发送给最终的用户,那么到这里后端的chat服务器要做的事情就差不多了,工作又回到了前端的connector服务器,那么又来看看吧:




1. //uid是应该接受数据的user,msg就是要发送的数据,route就是route
2. emote.prototype.pushMessage = function(route, msg, uids, cb) {  
3. if(!msg){  
4. 'Can not send empty message! route : %j, compressed msg : %j',  
5.        route, msg);  
6. return;  
7.  }  
8.   
9. var connector = this.app.components.__connector__;  
10.   
11. var sessionService = this.app.get('sessionService');  
12. var
13. for(var
14. //获取这个用户的session
15. if(!sessions) {  //如果么有session,那么发送失败
16.      fails.push(uids[i]);     
17. else
18. for(j=0, k=sessions.length; j<k; j++) {  
19. //这里session的id其实就是connector组件中socket的id,其实这里session就已经有send方法了,为什么不调用?
20.      }  
21.    }  
22.  }  
23.   
24. null, route, msg, sids, {isPush: true}, function(err) {  //调用connector的send方法将数据发送给刚刚弄出来的socket
25.    cb(err, fails);  
26.  });  
27. ;

哈,到这里整个广播的过程我想基本上也就弄的比较的清楚了。。。最后再用一张图来总结一下吧:

pomelo广播的实现(chat例子分析)_服务器


好了,到这里就基本上搞清楚了我最开始想要搞清楚的问题。。那么整体上pomelo框架的内容就差不太多了,可能还有一些模块要做的事情我没有细看,不过也无所谓吧。。以后如果真有机会用pomelo框架的时候再去看也不迟,只要搞清楚了pomelo框架总体的脉络。。。

其实这里还有一种高并发网络系统的设计思想,叫做连接的离散化。。。这里将用户的连接 分不到不同的connector服务器上,但是如果他们属于同一个房间,则他们访问的依然是同一个后台chat服务器,那么也就做到了职责的分离,前台的 connector服务器主要负责维护用户的连接,用于尽可能多的连接最终的用户,而后台的chat服务器就专职处理一些业务逻辑,只需要维护较少的与前 端connector服务器的rpc连接就可以了。。。



 


标签:function,connector,route,pomelo,广播,chat,var,服务器,channel
From: https://blog.51cto.com/u_2700990/6411708

相关文章

  • chatofpomelo简析之二——聊天
     chatofpomelo简析之二——聊天 上一篇ChatofPomelo简析之一——用户登录分析客户端登陆的过程。当用户登陆成功后,聊天又是个什么过程呢?下面就来分析聊天时,客户端与服务器端的交互过程。客户端我们先来看看下,聊天发送消息的过程。当用户在文本框内输入文字,并回车就可以发送消息了......
  • 利用ChatGPT 和Milvus快速搭建智能问答机器人
    智能问答机器人是自然语言处理领域一个很经典的问题,它用于回答人们以自然语言形式提出的问题,有着广泛的应用。其经典应用场景包括:智能语音交互、在线客服、知识获取、情感类聊天等。常见的分类有:生成型、检索型问答系统;单轮问答、多轮问答系统;面向开放领域、特定领域的问答系统。......
  • ChatGPT 国内镜像网站独家汇总:发现最优秀的人工智能对话体验!
    欢迎来到我们的ChatGPT镜像网站汇总博客!在这个令人激动的人工智能时代,ChatGPT作为一款顶尖的语言模型,已经引起了全球范围内的热议。但是,您是否曾经为了找到最佳的ChatGPT使用体验而苦苦搜寻?别担心,我们为您精心准备了这个独家汇总,让您一站式探索最优秀的人工智能对话体验!如果......
  • ChatGPT 生态,毁灭人性的一次开源!
    原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,非公众号转载保留此声明。很多人可能对百度文心一言发布会的现场记忆犹新。几百亿身价的老板,像小学生一样汇报自己的PPT,充满了忐忑。其实大可不必,命令行更加富有科技感,也更有说服力。不管对程序员来说还是围观的群众来说,能干什么并不重要......
  • ChatGPT使用小技巧—如何快速制作一张表格?
    ChatGPT使用技巧—如何快速制作表格?一、背景:        在学习或工作中,时常会需要做一些表格进行数据统计、分析,通常我们会用word或者excel做出表格,然后把数据一个个复制进去,非常慢,用了ChatGPT之后,你会发现工作会变得如此简单。二、所需工具    ChatGPT平......
  • 文心一言 VS 讯飞星火 VS chatgpt (30)-- 算法导论5.2 2题
    二、在HIRE-ASSISTANT中,假设应聘者以随机顺序出现,你正好雇用两次的概率是多少?文心一言:我们可以先计算出在HIRE-ASSISTANT中,雇佣一个应聘者的概率为3/4,然后计算出雇佣两次应聘者的概率为3/4*3/4*2。已知在HIRE-ASSISTANT中,雇佣一个应聘者的概率为:0.75根据概率乘法原理,可计算出......
  • 文心一言 VS 讯飞星火 VS chatgpt (30)-- 算法导论5.2 2题
    二、在HIRE-ASSISTANT中,假设应聘者以随机顺序出现,你正好雇用两次的概率是多少?文心一言:我们可以先计算出在HIRE-ASSISTANT中,雇佣一个应聘者的概率为3/4,然后计算出雇佣两次应聘者的概率为3/4*3/4*2。已知在HIRE-ASSISTANT中,雇佣一个应聘者的概率为:0.75根据概率乘法原理,可......
  • 整合ChatGPT与Flutter高级技术
    整合ChatGPT与Flutter高级技术在当今快节奏的数字时代,人们对智能化应用的需求越来越高。ChatGPT是一种基于人工智能的语言模型,而Flutter是一种快速开发移动应用程序的框架。本文将介绍如何将这两种技术整合在一起,从而为用户提供更好的智能化移动应用体验。ChatGPT概述ChatGPT(Genera......
  • chat
    列表https://chatgptqdymyscn/:三小时只能用10次https://alllinkai.com/:体验完,要登录,不推荐https://gpttalk.live/:每天限制使用次数https://chatgpt-cn.co/:每天免费使用250个中文功能和使用......
  • 【ChatGPT功能挖掘】论文绘图救星!
    前言今天分享一个使用ChatGPT来解决绘图没有思路、不知如何下手的问题!注意看!全文干货,无废话!ChatGPT上下文记忆功能首先要分享的是ChatGPT上下文记忆功能,什么意思?就是ChatGPT是有记忆和你之前对话的能力,在一个会话中它可以基于你之前问过问题的答案对你下面的回答进行回复的。下面用......