首页 > 其他分享 >SignalR循序渐进(三)简易的集群通讯组件

SignalR循序渐进(三)简易的集群通讯组件

时间:2022-12-02 22:44:50浏览次数:57  
标签:members id var Id member SignalR 循序渐进 组件 节点

上一篇演示了泛型Hub的实现,微软于6月17日更新了SignalR 2.1.0,然后自带了泛型Hub,于是就不需要自己去实现了…(微软你为啥不早一个月自带啊…)。不过没关系,SignalR出彩之处不在泛型Hub,本篇为各位观众带来了基于SignalR的简易集群通讯组件Demo,可用于分布式定时任务。

说到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制于成员数量,Cluster用数量堆高可用性,HPC太复杂。本着SignalR的双向异步通讯的特点,其实是可以用来玩弹性计算的。初始状态由一台计算任务分发节点,一台监控以及一台计算节点构成。随着任务分发队列中的任务数越来越多,一台执行节点无法及时消耗待执行任务,达到某个阈值的时候,动态的加入一个计算节点来增加计算吞吐量。同样的,当队列中的任务基本处于很低的数量的时候,自动移除一个计算节点来减少资源消耗。当然,如果是大型的计算量之下,分发节点,队列都应该是集群的,还要考虑各种计算节点故障之类的问题,这不在本篇考虑的范畴内,本篇以初始状态模型来一步步实现简易集群通讯组件。

好,废话不说了,正篇开始。

任务分发节点

image

任务分发节点只有一个公开的行为,就是接受计算节点任务执行完成的消息。

下面是实现。

复制代码
/// <summary>
    /// 集群交换器
    /// </summary>
    public class ClusterHub : Hub<IClusterClient>
    {
        /// <summary>
        /// 
        /// </summary>
        static ClusterHub()
        {
            aliveDictionary = new ConcurrentDictionary<string, Guid>();
        }
        
        /// <summary>
        /// 
        /// </summary>
        /// <param name="dispatcher"></param>
        public ClusterHub(IDispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            db = OdbFactory.Open(localDbFileName);
        }

        /// <summary>
        /// 本地数据库文件名
        /// </summary>
        const string localDbFileName = "ClusterStorage.dll";

        /// <summary>
        /// 监视器连接Id
        /// </summary>
        static string monitorConnectionId;

        /// <summary>
        /// 调度器
        /// </summary>
        IDispatcher dispatcher;

        /// <summary>
        /// 在线词典
        /// </summary>
        static ConcurrentDictionary<string, Guid> aliveDictionary;

        /// <summary>
        /// 
        /// </summary>
        static IOdb db;

        /// <summary>
        /// 完成任务
        /// </summary>
        /// <param name="jobResult"></param>
        public void Finished(Contracts.Messages.JobResultDto jobResult)
        {
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));
                if (member != null)
                {
                    member.UpdateStatisticsInfo(jobResult.ProcessedTime);
                    db.Store(member);
                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });
                    }
                }
            }
            Clients.Caller.RunJob(dispatcher.GetJobId());
        }

        /// <summary>
        /// 加入
        /// </summary>
        void Join()
        {
            object ip = string.Empty;
            var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";
            Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);
                if (member != null)
                {
                    member.MemberStatusType = MemberStatusTypeEnum.Connectioned;
                }
                else
                {
                    member = new MemberDo(ip.ToString(), isMonitor);
                    if (isMonitor)
                    {
                        monitorConnectionId = Context.ConnectionId;
                    }
                }
                db.Store(member);

                aliveDictionary.TryAdd(Context.ConnectionId, member.Id);
                if (!isMonitor)
                {
                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).MemberJoin(member.Id);
                    }
                    Clients.Caller.GetId(member.Id.ToString());
                    Clients.Caller.RunJob(dispatcher.GetJobId());
                }
            }
        }

        /// <summary>
        /// 离开
        /// </summary>
        void Leave()
        {
            var id = Guid.Empty;
            aliveDictionary.TryRemove(Context.ConnectionId, out id);
            lock (db)
            {
                var members = db.AsQueryable<MemberDo>();
                var member = members.SingleOrDefault(m => m.Id == id);
                if (member != null)
                {
                    member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;
                    db.Store(member);
                    if (member.IsMonitor)
                    {
                        monitorConnectionId = string.Empty;
                    }
                    else if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                    {
                        Clients.Client(monitorConnectionId).MemberLeave(id);
                    }
                }
            }
        }

        public override Task OnConnected()
        {
            Console.WriteLine(Context.ConnectionId+":Connected");
            Join();
            return base.OnConnected();
        }

        public override Task OnDisconnected()
        {
            Console.WriteLine(Context.ConnectionId + ":Disconnected");
            Leave();
            return base.OnDisconnected();
        }

        public override Task OnReconnected()
        {
            Console.WriteLine(Context.ConnectionId + ":Reconnected");
            return base.OnReconnected();
        }
    }
复制代码

ClusterHub承载着2种客户端角色的交互,计算节点和监控。

这边采用了一个轻量级的基于C#开发的无引擎对象数据库来存储客户端信息。

先说重载的部分:

OnConnected - 当有客户端连接的时候,执行Join方法。

OnDisconnected - 当有客户端离线的时候,执行Leave方法。

然后是私有方法:

Join - 根据QueryString来区分客户端类型是计算节点还是监视器,如果是计算节点,就直接通知监视器有成员加入,然后通过IDispatcher来获取任务Id,通知计算节点开始执行任务。

Leave -  计算节点离线的时候通知监视器。

公开方法:

Finished - 计算节点完成任务后就调用该方法,Hub将计算的一些统计信息更新到本地存储,同时通知监视器更新计算结果。

私有变量:

IDispatcher– 任务调度器接口,由外部组件来负责具体的实现。

计算节点

image

计算节点有两个行为:

GetId - 获取节点身份。

RunJob - 执行任务。

复制代码
/// <summary>
    /// 集群客户端
    /// </summary>
    public class ClusterClient
    {
        /// <summary>
        /// 
        /// </summary>
        /// <param name="jobProvider"></param>
        public ClusterClient(IJobProvider jobProvider)
        {
            this.jobProvider = jobProvider;
            url = ConfigurationManager.AppSettings["HubAddress"];
            var queryStrings = new Dictionary<string, string>();
            queryStrings.Add("ClientRole", "Normal");
            connection = new HubConnection(url, queryStrings);
            hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description);
            InitClientEvents();
            connection.Start().Wait();
        }

        string url;

        HubConnection connection;

        IHubProxy hubProxy;

        IJobProvider jobProvider;

        string id;

        /// <summary>
        /// 
        /// </summary>
        void InitClientEvents()
        {
            hubProxy.On("GetId", (id) => GetId(id));
            hubProxy.On("RunJob", (jobId) => RunJob(jobId));
        }

        /// <summary>
        /// 执行任务
        /// </summary>
        /// <param name="id"></param>
        void GetId(string id)
        {
            this.id = id;
        }

        /// <summary>
        /// 执行任务
        /// </summary>
        /// <param name="jobId"></param>
        void RunJob(string jobId)
        {
            var startTime = DateTime.Now;
            jobProvider.Invoke(jobId);
            var stopTime = DateTime.Now;
            hubProxy.Invoke("Finished", new JobResultDto() { Id = id, JobId = jobId, ProcessedTime = (stopTime - startTime).TotalMilliseconds });
        }
    }
复制代码

客户端的实现很简单,核心就是通过构造函数注入任务提供接口,由接口通过任务Id来执行任务。

 

监视器

image

监视器具有三个公开行为:

MemberJoin - 计算节点加入

MemberLeave - 计算节点离线

UpdateMemberStatisticsInfo - 更新节点统计信息

复制代码
/// <reference path="jquery-2.1.1.js" />
/// <reference path="jquery.signalR-2.1.0.js" />
(function ($) {

    var members = [];

    var methods = {
        reloadList: function () {
            var list = "";
            $.each(members, function (i, n) {
                list += "<li id='member_" + n.Id + "'>[" + n.Id + "]:AverageProcessedTime " + n.AverageProcessedTime + " Milliseconds</li>";
            });
            $('#members').html(list);
        }
    }

    var hubs = {
        clusterHub: $.connection.clusterHub,
        init: function () {
            $.connection.hub.logging = true;
            $.connection.hub.url = 'http://192.168.1.124:10086/signalr';
            $.connection.hub.qs = { "ClientRole": "Monitor" }
            $.connection.hub.start().done(function () { });
        }
    }

    var cluster = {
        on: {
            updateMemberStatisticsInfo: function (data) {
                $.each(members, function (i, n) {
                    if (n.Id == data.Id) {
                        n.AverageProcessedTime = data.AverageProcessedTime;
                        return;
                    }
                });
                methods.reloadList();
            },
            memberJoin: function (id) {
                members.push({ "Id": id, "AverageProcessedTime": 0 });
                methods.reloadList();
            },
            memberLeave: function (id) {
                members = $.grep(members, function (n) { return n.Id != id });
                methods.reloadList();
            }
        }
    }

    $(function () {
        hubs.clusterHub.client.UpdateMemberStatisticsInfo = cluster.on.updateMemberStatisticsInfo;
        hubs.clusterHub.client.MemberJoin = cluster.on.memberJoin;
        hubs.clusterHub.client.MemberLeave = cluster.on.memberLeave;
        hubs.init();
    });
})(jQuery);
复制代码 复制代码
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <title>集群监视器</title>
</head>
<body>
    <div>
        <ul id="members"></ul>
    </div>
    <script src="scripts/jquery-2.1.1.min.js"></script>
    <script src="scripts/jquery.signalR-2.1.0.min.js"></script>
    <script src="http://192.168.1.124:10086/signalr/hubs"></script>
    <script src="scripts/core.js"></script>
</body>
</html>
复制代码

监视器用real-time的Web平台实现,一共注册三个方法的实现。

最终效果

image

 

Hub端启动后,先启动监视器,然后在不同的机器上启动计算端,图上是2个计算节点,监视器上也显示着2个节点,每个节点执行一个JobId后,监视器上就会刷新结果。

进一步思考和扩展

简易集群组件就到这儿了,本篇演示的是一个思路,可以在这个基础上深度扩展成文章开头所描述的那样,高性能高可用的基于SignalR的集群组件。欢迎各位有兴趣的同学进行讨论和拍砖。

 

标签:members,id,var,Id,member,SignalR,循序渐进,组件,节点
From: https://www.cnblogs.com/webenh/p/16945881.html

相关文章

  • SignalR 循序渐进(五)多个Hub服务器下的消息订阅
    SignalR的通讯方式决定了其高性能,但是即便如此,当消息的并发量上来以后,单节点的Hub服务器依然可能无法承载总的消息吞吐量,那么如何对Hub服务器做水平扩展呢?从微软官方的文......
  • 小程序分包放置echarts组件
    app.json"subPackages":[{"root":"commonPackage","pages":[]}],pages同级目录加commonPackage分包中放入echarts组件信息使用xx.js......
  • ReactHook父组件调用子组件的方法,且子组件用了connect
    ReactHook父组件调用子组件的方法,且子组件用了connect子组件1、引入useImperativeHandle,forwardRef2、子组件由function改成let,接收prop和ref,并从props中结构出refI......
  • VUE组件之间的参数传递与方法调用
    父组件向子组件父组件向子组件传参:父组件中的子组件标签中增加:param="param"子组件中增加props接受参数(注意props需要与data同级)props:{param:{typ......
  • Flutter不常用组件(二)
    ColoredBox一般我们想要一个带有背景颜色的组件我们会使用哪个组件?当然第一个想到的就是Container。其实在Flutter中还要一个专门用来设置颜色的组件ColoredBox。该组......
  • vue3组件通信,网上一大堆,这里直说常用的两个,props,emit
    props(父传子,多用于子组件的数据渲染)【父】<div><comp-son:name="name":age="age"/>//向子组件传值</div>setup(){conststate=reactive({......
  • vue局部刷新 (组件重载)
     组件重载的运用是当组件的数据产生了变化之时,需要通过重载来实现组件页面的刷新,而不是重新刷新URL进行重新请求,也可以理解为是局部刷新 步骤:在父级页面通过......
  • Vue2适用的视频组件
    Vue2适用的视频组件官方文档路径西瓜视频官方文档:http://v2.h5player.bytedance.com/gettingStarted/随笔制作参考:https://juejin.cn/post/7023547598724136990#hea......
  • 组件
    <!DOCTYPEhtml><html> <head> <metacharset="utf-8"> <scriptsrc="js/vue.js"type="text/javascript"charset="utf-8"></script> <title></title> </head> ......
  • Flutter不常用组件(一)
    Flutter目前拥有400多个组件,其中常用的也就那么几个。大家学习Flutter一般都是看的其他人的教程,或者官网的文档教程,这些教程里用到的组件只有那些常用的,更多的组件需要......