首页 > 编程语言 >PasteSpider的集群组件PasteCluster(让你的项目快速支持集群模式)的思路及实现(含源码)

PasteSpider的集群组件PasteCluster(让你的项目快速支持集群模式)的思路及实现(含源码)

时间:2024-06-12 20:14:22浏览次数:23  
标签:node PasteSpider current host 源码 集群 var master 节点

PasteSpider是什么?

一款使用.net编写的开源的Linux容器部署助手,支持一键发布,平滑升级,自动伸缩,

Key-Value配置,项目网关,环境隔离,运行报表,差量升级,私有仓库,集群部署,版本管理等!

30分钟上手,让开发也可以很容易的学会在linux上部署你得项目!

[从需求角度介绍PasteSpider(K8S平替部署工具适合于任何开发语言)]

(https://blog.csdn.net/Apeart/article/details/138819197?spm=1001.2014.3001.5501)

PasteCluster又是什么?


一套使用.net编写的中间件,如果你的API项目是.NET的,那么通过引入这个组件可以让你的项目快速支持集群模式!

(部署是部署工具和写法的问题,模式是模式哈,有主从的那个是模式)

image

接入现有项目的案例

image
如上载入必要的配置,关于ClusterConfig的内容有啥,需要查看源码PasteCloveConfig.cs这个文件,大概有十多个配置项,也可以使用默认值哈!
这个配置在appsettings.json是这样的
image
PasteClusterHandlr这个逻辑他会产生一个Channel的异步队列,业务代码上处理这个业务队列即可,比方说我们启动一个HostedService来处理这个队列
image
上面核心代码其实只有2行

            //如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中
            // -e ClusterConfig:CurrentHost="http://192.168.1.100"
            //如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
            //_cluster_handler.Register("http://192.168.1.100",0,0);
            //启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)
            _cluster_handler.StartCluster();

            //读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息
            ReadClusterChannel();

其他部分概括下就是写入集群节点列表,写入当前节点的HOST信息的!
你只要关注你的业务代码如下:
image
就是这个default的分支,严格来说应该是msg_type=0或者大于10的(1-10组件用于通知特定消息给业务,至于业务用不用要基于自己的业务需求,比如我上面这个案例就是节点掉线后把他从集群节点列表种删除(记录在redis种))

在最后将会贴出源码

集群需求过程

如果说K8S是对运维人员的部署工具,那么PasteSpider应该就是针对开发人员的部署助手了!
K8S不是有那个主从集群模式么,一开始那会思考也整一个集群模式,这样一个PasteSpider宕机了,还有其他的来承担任务!

去中心化集群

一开始的思路是去中心化的集群模式,说白点就是数据库都是一个节点一个的,当然这里的去中心化和市面上的那个链不是一回事哈,至少没有链衔接和链校验的问题!
假设一个节点就是一套完整的PasteSpider,包括数据库,对应的api和web管理端等

如何保证集群安全性?

集群内的所有节点通讯使用一个统一的密钥,这样可以防止其他集群乱入!

数据防重和数据唯一?

首先一个,数据库的表我分几种:
1.基础表,比如会员信息表,这种基础表肯定每一个节点都要同步的
2.统计表,这个明显的代表就是报表,比如日报表,这样要报表的时候不需要从日志表中读取统计
3.关系表,一般的是基础表和基础表的关系的描述的表,也是要同步的
4.日志表,记录一些操作的,视情况而定,不一定要同步完全
每个表添加3个字段,节点,创建时间戳,更新时间戳,这样在同步数据的时候可以基于这个时间戳获取数据差,然后根据ID规则同步数据
由于对管理端来说,只要一个PasteSpider在工作,也就是Nginx中配置主备用模式,这样也就是说主要的数据都是通过一个节点新建的,这样可以排除数据重复的问题
比如说,整个系统的项目的代码需要唯一!

辅助组件如何做到集群

比如项目使用到了nginx,使用到了redis等,如果要完整的集群,甚至是多台服务器,其实严格的集群我个人认为是不可能的!
如果可能,那么每年就不会有那么多这个服务宕机多久,那个宕机多久了!
市面上没有100%的灾备方案,因为在追求100%的路上,你会发觉一直在套娃... .. .

普通的集群的现状

image

稍微查询下目前比较流程的一些中间件,用到的集群模式大概有以下要点:
1.集群选举过程中,不对外提供服务,可以理解成集群还没有协商一致好!那么引申的问题就是集群选举越快越好!
2.几乎的集群选举采用的是单数,过半模式!看过各种解释,说白点就是要少数服从多数,我个人觉得这个问题很扯的!
集群的数量发生变动了,这个是不可控的吧!
居然不可控,你咋保证宕机的数量一定是单数?那肯定单数复数都有可能,而且概率还很大,那这半数了个寂寞!
3.很多集群模式引入了多角色,比如蜂王,工蜂,监控蜂... .. . 啥意思?难不成只有工蜂和封王会宕机???如果你说不同角色宕机的概率不一样,那我还是信的,毕竟负载不一样!
如果都会宕机,那角色越多是不是意味着集群恢复的逻辑越复杂?角色切换来切换去好玩?
4.还有一种半通的集群情况,比如A,B,C,D,E,其中A只和B,C能通,其他的都能互通!
这什么神奇组合,这种的我思考来思考去,还不如你搞2个集群,然后弄一个中转,或者是业务需求变更下,因为为了实现这个半通,付出的代价太大了!

PasteCluster的方式

1.采用N个节点模式,N大于等于1即可,1的时候表示自己是节点,自己担任任何角色(其实也就2个角色,master和cluster角色)
这里2个角色的区别主要是在业务上,比方说你集群中的某些任务,需要有一个人专门管理,那么这个任务就交给master来处理

2.不采用选举模式,采用竞选模式,当发生2个竞选冲突的时候,基于vote_time时间戳和id来确定谁是master!
假设有A,B,C,D,E节点,如果当前A为master!
停止A,则会发生

B,C,D,E在不同时间点会发觉A不在了,这里的发现可能并发也可能是按照一定的顺序,取决于他们的轮询线
假设B先发现A出问题了,立马给自己的vote_time赋值时间戳ms,然后问(除了自己外的其他节点)你有master么?
假设被问的是C,他这个时候还没有发现A挂了,
他立马健康检查A,
如果通了,则返回A的信息给B,告诉他当前的master是A

如果不通,他则给自己的vote_time赋值,然后告诉B,C就是master!B收到返回后,就把C设为master
C这个时候会进入选举阶段,会把自己的master(C)信息群发给其他节点
其他节点收到后,会和自己的master做健康检查,如果接通的话,再做vote_time对比,结合节点Id选举出最小的返回给C,
C这个时候收到的结果,如果返回的master(E)(这个是对比vote_time的结果)不是C他这个节点,则C放弃自己作为节点,接受返回的master(E)为master!自己做cluster的角色
上述居然有返回E,那么表示E也启动了选举,只是在和C的对比中E胜出了,C由于在遍历节点的时候失败了,所以中途退出了,也就是E在遍历的过程中完成了一整个遍历,没有被中断!

E在完成遍历后,群发一条消息给所有节点,告知我是新的Master(E)
以上整个选举过程就算完成,从时间上来说理论上是一个集群遍历的是时间,不需要其他集群模式的反复选举!
一般的是谁先发现,然后下一个节点选举胜出!主要是这个问的环节,如果没有问,直接进入竞,复杂度还会提升!

集群问答

1.集群节点保活问题

所有的集群节点都有心跳模式,因为你没办法知道对方下线没有,所以就有一个轮询,一定时间问对方在线否,PasteCluster采用双心跳,心跳复用模式!
双心跳是指master会定期向其他节点轮询是否在线
节点也会问轮询问master是否在线
复用模式在于,每次检查,只有他们之间的交互时间大于设定的时间后才会发送这个心跳检查,比如设定10秒钟!如果近期10秒钟这2个节点有通讯,则这一次心跳不在问询的任务中!

2.如何防止小集群出现(脑裂?)

之前第一次写的时候,后面测试发现会出现有多个master的情况,其实把整个选举过程拆开,每一个步骤延长时间,就很好理解,出现多个master是可能的!
在节点的交互过程中,比如A节点发送信息给B节点,A的请求会附带至少2个信息,A的master是谁,A的所在集群节点的总数,发送给B后,B会对这2个信息进行校验,如何和A的不一致,则B会抛弃自己的master信息,转而进入“问”阶段,再根据这个问的结果,加入集群!

3.啥时候发生加入集群这个动作

加入集群有2种情况,
1.是刚刚启动的时候,肯定得找个集群去加入,虽然有可能整个系统就你一个节点,过程嘛总要走的,说不定有其他节点(节点能通)呢!
2.在数据交互过程中,发现信息不对称,抛弃自己得信息加入到现有得一个主节点

4.啥时候移除节点

1.节点离线的时候主动触发,主动向master告知我这个节点离线了,master重新整理节点信息后,群发给所有其他在线的节点,从而更新整个集群!
2.在节点交互过程中,通讯错误次数超过设定的次数,这个交互包括数据通讯或者定时的健康检查,最终在master健康检查的时候移除这个节点,然后广播

5.给集群分配节点列表信息

1.通过配置文件的方式在启动的时候导入
2.通过AddNodeToList在启动后添加,比如你通过读取Redis获得节点列表,然后导入

6.找到当前节点的IP等信息

节点之间是通过HOSTIP等通讯的,所以最基本的就是需要知道当前节点是哪个IP,有2种模式
1.已知节点列表有10个IP,现在是不知道哪个节点是哪个,把节点列表写入到当前节点信息后,然后调用FindMe(),系统会去遍历列表,从而找到哪个节点信息是当前节点,你可以使用
/api/cluster/status查看自己当前节点是多少来确定是否查找完成!
2.在服务启动的时候给导入IP信息,比如如果你使用PasteSpider部署的项目,而且项目配置了项目网关,则可以在对应服务的配置中添加如下代码即可:
image
如果是非80端口要自己修改下!看配置其实就是在启动的额时候修改配置项ClusterConfig:CurrentHost的值!

7.手动加入一个现有集群

加入一个集群有2个步骤,一个是给当前自己打标签,也就是为自己设定host等信息,然后是ScanMaster();也就是当前节点知道自己是谁后,然后去查找当前的集群信息,然后加入这个集群!

8.主动退出一个集群

image
其实你也可以主动调用Leave函数实现离开集群!

PasteCluste组件的源码

你可以从Gitee拉取最新源码 PasteCluster组件源码和使用案例
拉取后打开项目,只需要关注如下代码
image
其他的子项目是因为这个使用案例是使用PasteTemplate生成的,懒得精简了,看下案例应该就知道如何使用到自己项目中了!

源码解说

基于上图,主要文件是2个

1.PasteClusterController.cs

看文件命名就知道,他是一个Controller,用于节点对外通讯的,每一个接口都有添加header校验,用于校验请求是否属于这个集群,在最后代码处:
image
要安全点的话,你可以加入那个time_temptoken,加密校验,类似OAUTH那个协议模式,过期了丢弃这个token
上图的fromnode用于修改最后交互时间,减少心跳中不必要的检查,
masternode则用于校验是否脑裂?也就是是否有多个master的情况,发现后及时处理!
image
在配置路由转发的时候(nginx的配置),需要为这个路径做代理
源码如下:

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;

namespace PasteCluster
{

    /// <summary>
    /// 
    /// </summary>
    [ApiController]
    [Route("/api/cluster/[action]")]
    public class PasteClusterController : Controller
    {

        private PasteClusterHandler _handler;
        private PasteSloveConfig _config;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="handler"></param>
        /// <param name="config"></param>
        public PasteClusterController(PasteClusterHandler handler, IOptions<PasteSloveConfig> config)
        {
            _config = config.Value;
            _handler = handler;
        }

        /// <summary>
        /// 健康检查
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public PasteApiResponse health()
        {
            CheckHeader();
            return _handler.Health();
        }

        /// <summary>
        /// 读取当前状态
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public string status()
        {
            return _handler.Status();
        }

        /// <summary>
        /// 问询master信息
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public PasteApiResponse ask()
        {
            CheckHeader();
            return _handler.Ask();
        }

        /// <summary>
        /// 查找自己
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> find(PasteNodeModel input)
        {
            CheckHeader();
            return await _handler.Find(input);
        }

        /// <summary>
        /// 有节点加入
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public PasteApiResponse join(PasteNodeModel input)
        {
            CheckHeader();
            _handler.Join(input);
            return _handler.State();
        }

        /// <summary>
        /// 转发消息
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> msg(PasteSloveMessage input)
        {
            CheckHeader();
            input.from_api = true;
            return await _handler.Msg(input);
        }

        /// <summary>
        /// 往集群中发送消息
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public PasteApiResponse push(PasteSloveMessage input)
        {
            CheckHeader();
            _handler.PushMsgToNode(input.body);
            return _handler.State();
        }

        /// <summary>
        /// 这个一般是PasteSpider推送过来的信息
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public PasteApiResponse run()
        {
            JObject info = null;
            if (base.Request.Body != null)
            {
                if (base.Request.Body != null)
                {
                    using var stream = new StreamReader(base.Request.Body);
                    var bodystr = stream.ReadToEndAsync().GetAwaiter().GetResult();
                    //Console.WriteLine(bodystr);
                    info = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(bodystr);
                }
            }
            if (info != null)
            {
                var _host = string.Empty;
                var _port = 80;
                JObject input = info;
                //if(info is JObject)
                //{
                //    input = input as JObject;
                //}
                //else
                //{
                //    Console.WriteLine("input is not JContainer");
                //    return _handler.State();
                //}
                if (input.ContainsKey("proAddress"))
                {
                    _host = input["proAddress"].ToString();
                }
                if (input.ContainsKey("extendService"))
                {
                    var _ser = input["extendService"];
                    if (_ser.Type == JTokenType.Object)
                    {
                        var _serobj = _ser as JObject;
                        if (_serobj.ContainsKey("listenPorts"))
                        {
                            var ports = _serobj["listenPorts"].ToString();
                            if (!String.IsNullOrEmpty(ports))
                            {
                                int.TryParse(ports.Split(',')[0], out var _pp);
                                if (_pp > 0)
                                {
                                    _port = _pp;
                                }
                            }
                        }
                    }
                }
                if (!String.IsNullOrEmpty(_host))
                {
                    if (_port != 80)
                    {
                        _handler.Register($"http://{_host}:{_port}", 0, 0);
                    }
                    else
                    {
                        _handler.Register($"http://{_host}", 0, 0);
                    }
                }
                else
                {
                    Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
                }
            }
            return _handler.State();
        }

        /// <summary>
        /// 手动注册自己
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpGet]
        public string joinin([FromQuery] PasteNodeModel input, string token)
        {
            if (token == _config.SloveToken)
            {
                _handler.Register(input.host, input.id, input.group);
            }
            else
            {
                return "token参数错误!";
            }
            return "手动加入!";
        }

        /// <summary>
        /// 加入某一个节点,当前需要为master
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpGet]
        public async Task<string> addone([FromQuery] PasteNodeModel input, string token)
        {
            if (token == _config.SloveToken)
            {
                if (_handler.CurrentIsMaster || _handler.IsSingle)
                {
                    var node = new PasteNodeModel
                    {
                        host = input.host,
                        id = input.id,
                        group = input.group,

                    };
                    await _handler.AddNodeToList(node);
                }
                else
                {
                    return "当前节点不是Master,无法执行这个命令";
                }
            }
            else
            {
                return "token参数错误!";
            }
            return "手动加入!";
        }

        /// <summary>
        /// 节点集合
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public PasteNodeModel[] nodes()
        {
            return _handler.Nodes();
        }

        /// <summary>
        /// 执行选举
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> vote(PasteNodeModel input)
        {
            CheckHeader();
            return await _handler.Vote(input);
        }

        /// <summary>
        /// 被通知,有成为master
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> master(PasteMasterResponse input)
        {
            CheckHeader();
            //Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
            return await _handler.Master(input);
        }

        /// <summary>
        /// 有节点离开
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> leave(PasteNodeModel input)
        {
            CheckHeader();
            return await _handler.Leave(input);
        }

        /// <summary>
        /// 移除某节点
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<PasteApiResponse> remove(PasteNodeModel input)
        {
            CheckHeader();
            return await _handler.Remove(input);
        }

        /// <summary>
        /// 检查信息是否合法
        /// </summary>
        /// <exception cref="Exception"></exception>
        private void CheckHeader()
        {
            if (base.Request.Headers.ContainsKey("slovetoken"))
            {
                if (base.Request.Headers["slovetoken"] != _config.SloveToken)
                {
                    throw new Exception("slovetoken错误,请重新输入!");
                }
                else
                {
                    if (base.Request.Headers.ContainsKey("fromnode"))
                    {
                        _handler.TargetMasterLast(base.Request.Headers["fromnode"].ToString());
                    }
                    if (base.Request.Headers.ContainsKey("masternode"))
                    {
                        _handler.ConfirmMaster(base.Request.Headers["masternode"].ToString());
                    }
                }
            }
            else
            {
                throw new Exception("slovetoken不能为空,请重新输入!");
            }
        }

    }
}

2.PasteClusterHandler.cs

这个文件就是集群选举,维护等的主要逻辑,里面使用了2个Channel异步队列,一个是这个集群组件内部使用的,有写入数据和读取数据,另外一个就是给业务用的了,在上面的案例中的HostedService中有体现!
整个选举过程和修复等,可以查看这个文件的源码,有疑问的麻烦在评论中回复!!!
源码内容:

using System.Net;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace PasteCluster
{
    /// <summary>
    /// 注意用IOC单例注入系统,多处调用
    /// </summary>
    public class PasteClusterHandler : IDisposable
    {

        /// <summary>
        /// 
        /// </summary>
        private readonly IHttpClientFactory _httpClientFactory;

        /// <summary>
        /// 用于内部消息中转等 节点中的任务
        /// </summary>
        private Channel<PasteEventModel> _channel_slove;

        /// <summary>
        /// 业务队列 表示队列中的消息要业务逻辑处理,当前可能为slove也可能为master
        /// 业务监听队列
        /// 表示业务要处理的消息
        /// 当前节点可能是slove节点
        /// 也可能是master节点
        /// </summary>
        public Channel<PasteSloveMessage> ChannelCluster;

        /// <summary>
        /// 当前节点信息
        /// </summary>
        private PasteNodeModel _current_node = null;

        /// <summary>
        /// 当前的Master是哪个
        /// </summary>
        private PasteNodeModel _current_master = null;

        /// <summary>
        /// 
        /// </summary>
        private ILogger<PasteClusterHandler> _logger = null;

        /// <summary>
        /// 当前是否是主节点
        /// </summary>
        public bool CurrentIsMaster
        {
            get
            {
                if (_current_master != null && _current_node != null)
                {
                    return _current_master.host == _current_node.host;
                }
                return false;
            }
        }

        /// <summary>
        /// 是否是单例模式
        /// 当前节点为null
        /// 或者没有节点列表
        /// </summary>
        public bool IsSingle
        {
            get
            {
                if (_current_master == null || _current_node == null)
                {
                    return true;
                }
                if (node_list == null || node_list.Count <= 1)
                {
                    return true;
                }
                return false;
            }
        }

        /// <summary>
        /// 当前节点的名称 如果为空表示没有或者本身配置就是空
        /// </summary>
        public string CurrentName
        {
            get
            {
                if (_current_node != null)
                {
                    return _current_node.name;
                }
                return "";
            }
        }

        /// <summary>
        /// 获取或设置当前集群的密钥 如果要修改这个需要在所有的调用之前配置
        /// </summary>
        public string SloveToken
        {
            get
            {
                return _config.SloveToken;
            }
            set
            {
                _config.SloveToken = value;
            }
        }

        /// <summary>
        /// 当前节点的名称 如果为空表示没有或者本身配置就是空
        /// </summary>
        public string CurrentHost
        {
            get
            {
                if (_current_node != null)
                {
                    return _current_node.host;
                }
                return "";
            }
        }

        /// <summary>
        /// 当前节点
        /// </summary>
        public PasteNodeModel CurrentNode
        {
            get
            {
                return _current_node;
            }
        }

        /// <summary>
        /// 当前节点的名称 如果为空表示没有或者本身配置就是空
        /// </summary>
        public int CurrentId
        {
            get
            {
                if (_current_node != null)
                {
                    return _current_node.id;
                }
                return 0;
            }
        }

        /// <summary>
        /// 当前配置信息
        /// </summary>
        private PasteSloveConfig _config;

        /// <summary>
        /// 
        /// </summary>
        private List<PasteNodeModel> node_list = null;

        /// <summary>
        /// 是否在选举中
        /// </summary>
        private bool voting = false;

        /// <summary>
        /// 
        /// </summary>
        private System.Timers.Timer _timer;

        /// <summary>
        /// 当前节点的代码,初始值为空,可以被赋值,被赋值后不能再次被赋值
        /// </summary>
        private string node_code = String.Empty;

        /// <summary>
        /// 暂存发往节点的消息,这个是指消息,业务上的消息
        /// </summary>
        private List<PasteEventModel> ziplist = null;

        /// <summary>
        /// 
        /// </summary>
        private System.Threading.SemaphoreSlim _semaphoreSlim;

        /// <summary>
        /// 
        /// </summary>
        private long msg_count = 0;

        /// <summary>
        /// 
        /// </summary>
        public PasteClusterHandler(ILogger<PasteClusterHandler> logger, IOptions<PasteSloveConfig> config, IHttpClientFactory httpClientFactory)
        {
            _logger = logger;
            _config = config.Value;
            _httpClientFactory = httpClientFactory;
            _semaphoreSlim = new SemaphoreSlim(1);
            ziplist = new List<PasteEventModel>();
            _channel_slove = Channel.CreateBounded<PasteEventModel>(_config.SloveChannelMsgCapacity);
            ChannelCluster = Channel.CreateBounded<PasteSloveMessage>(_config.MasterChannelMsgCapacity);

            _timer = new System.Timers.Timer(1000);
            _timer.Elapsed += _timer_Elapsed;
            _timer.AutoReset = true;
            _timer.Start();
            ActionSloveEvent();
        }

        /// <summary>
        /// 当前计数的次数
        /// </summary>
        private int _tick_index = 0;

        /// <summary>
        /// 给当前节点赋值code 
        /// </summary>
        /// <param name="code"></param>
        /// <returns>是否赋值成功</returns>
        public bool SetNodeCode(string code)
        {
            if (String.IsNullOrEmpty(node_code))
            {
                node_code = code;
            }
            else
            {
                return false;
            }
            return true;
        }

        /// <summary>
        /// 本地用于读取节点列表
        /// </summary>
        /// <returns></returns>
        public PasteNodeModel[] Nodes()
        {
            if (node_list != null)
            {
                return node_list.ToArray();
            }
            return null;
        }

        /// <summary>
        /// 启动集群
        /// 读取ClusterHost作为集群列表
        /// 读取CurrentHost作为当前节点得Host
        /// 如果有NodeList和NodeCode(CurrentCode)触发查找自己
        /// 否则触发查找Master
        /// </summary>
        public async void StartCluster()
        {
            if (!string.IsNullOrEmpty(_config.CurrentHost))
            {
                Register(_config.CurrentHost, 0, 0);
            }
            if (!String.IsNullOrEmpty(_config.ClusterHost))
            {
                var _list = new List<PasteNodeModel>();
                var strs = _config.ClusterHost.Split(";");
                foreach (var str in strs)
                {
                    if (!string.IsNullOrEmpty(str))
                    {
                        var its = str.Split(',');
                        var _one = new PasteNodeModel { };
                        _one.host = its[0];
                        if (its.Length > 1)
                        {
                            int.TryParse(its[1], out var _id);
                            _one.id = _id;
                        }
                        if (its.Length > 2)
                        {
                            int.TryParse(its[2], out var _group);
                            _one.group = _group;
                        }
                        if (!_list.Contains(_one))
                        {
                            _list.Add(_one);
                        }
                    }
                }
                if (_list.Count > 0)
                {
                    foreach (var _one in _list)
                    {
                        await AddNodeToList(_one);
                    }
                }
            }

            if (!String.IsNullOrEmpty(_config.CurrentCode))
            {
                SetNodeCode(_config.CurrentCode);
            }
            if (!String.IsNullOrEmpty(node_code))
            {
                if (node_list != null && node_list.Count > 0)
                {
                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
                }
            }
            else
            {
                Console.WriteLine("begin to join cluster:" + node_list.Select(x => x.host).ToString());
                await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
            }
        }

        /// <summary>
        /// 需要确保先调用SetNodeCode 表示给自己打一个标记
        /// 然后会启动一个任务去遍历集合,查找这个NodeCode,找到后就知道自己的HOST信息了
        /// </summary>
        public async void FindMe()
        {
            if (!String.IsNullOrEmpty(node_code))
            {
                if (node_list != null && node_list.Count > 0)
                {
                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
                }
            }
        }

        /// <summary>
        /// 开始查找自己 需要有节点集合(通过AddNodeToList赋值) 需要有当前节点得node_code(通过SetNodeCode赋值)
        /// </summary>
        /// <returns></returns>
        public async Task<bool> StartFindMe()
        {
            if (!String.IsNullOrEmpty(node_code))
            {
                if (node_list != null && node_list.Count > 0)
                {
                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
                    return true;
                }
            }
            return false;
        }

        /// <summary>
        /// 如果消息来自Master则更新我和Master的最后交互时间
        /// 这个功能用于减少不必要的心跳资源浪费
        /// </summary>
        /// <param name="host"></param>
        public void TargetMasterLast(string host)
        {
            if (_current_master != null)
            {
                if (_current_master.host == host)
                {
                    _current_master.last_time = DateTime.Now.ToUnixTimeSeconds();
                    //Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Node:{CurrentHost} Update Master Last Time!");
                }
            }
        }

        /// <summary>
        /// 确认对方的Master和我的是否一致!
        /// 如果不一致,则我进去查找Master阶段
        /// </summary>
        /// <param name="host"></param>
        public void ConfirmMaster(string host)
        {
            if (_current_master != null)
            {
                if (_current_master.host != host)
                {
                    _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                }
            }
        }

        /// <summary>
        /// 拥有集群的地址列表,去找到自己
        /// 这个是给内部调用的,请勿调用
        /// </summary>
        /// <param name="input"></param>
        public async Task<PasteApiResponse> Find(PasteNodeModel input)
        {
            if (!String.IsNullOrEmpty(input.node_code))
            {
                if (node_code == input.node_code)
                {
                    //找到自己了
                    _current_node = new PasteNodeModel
                    {
                        node_code = node_code,
                        group = input.group,
                        host = input.host,
                        id = input.id,
                    };
                    await AddNodeToList(_current_node);
                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
                }
            }
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 定时检查
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            try
            {
                if (_current_node != null)
                {
                    _tick_index++;
                    //健康检查 被检查等!
                    if (_tick_index % _config.TickSloveHealth == 0)
                    {
                        var _ago = DateTime.Now.ToUnixTimeSeconds() - _config.TickScanSloveHealth;
                        if (CurrentIsMaster)
                        {
                            //健康检查 检查节点多久没通讯了
                            if (node_list != null && node_list.Count > 0)
                            {
                                var _post = new PasteMasterResponse { };
                                _post.master = _current_node;
                                _post.nodes = node_list?.ToArray();
                                var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
                                try
                                {
                                    await _semaphoreSlim.WaitAsync();
                                    foreach (var _node in node_list)
                                    {
                                        if (_node.host != _current_node.host)
                                        {
                                            if (_node.last_time < _ago)
                                            {
                                                var _api = await slove_get(_node, _config.ApiHealth);
                                                if (_api.success)
                                                {
                                                    if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
                                                    {
                                                        //告知对方,你的集群数据有误!
                                                        await slove_post(_node, _postBody, _config.ApiMaster);
                                                    }
                                                }
                                            }
                                        }
                                        else
                                        {
                                            _node.last_time = DateTime.Now.ToUnixTimeSeconds();//自己直接就是最新的
                                        }
                                    }
                                }
                                catch (Exception exl)
                                {
                                    _logger.LogError("line.341" + exl.Message);
                                }
                                finally
                                {
                                    _semaphoreSlim.Release();
                                }
                                //错误次数超过多少次的 执行移除操作!
                                var removes = node_list.Where(x => x.error_time > _config.RemoveByTime).ToList();
                                if (removes != null && removes.Count > 0)
                                {
                                    foreach (var _node in removes)
                                    {
                                        await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.removenode, message = Newtonsoft.Json.JsonConvert.SerializeObject(_node) });
                                    }
                                }
                            }
                        }
                        else
                        {
                            if (_current_master != null)
                            {
                                if (_current_master.last_time < _ago)
                                {
                                    //Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} node:{_current_node?.host} master:{_current_master?.host} timeout!");
                                    var _api = await slove_get(_current_master, _config.ApiHealth);
                                    if (_api.success)
                                    {
                                        //我记录的master和master记录的master不一致
                                        if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
                                        {
                                            //告知对方,你的集群数据有误!
                                            await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                                        }
                                    }
                                    else
                                    {
                                        //master失联了
                                        await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                                    }
                                }
                            }
                            else
                            {
                                //选举?
                                await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                            }
                        }
                    }
                }
                if (_tick_index > 3600)
                {
                    _tick_index = 0;
                }
            }
            catch (Exception exlelapsed)
            {
                _logger.LogError("line.396" + exlelapsed.Message);
            }
        }

        /// <summary>
        /// 业务端调用 表示把消息打入到集群 发给所有节点或者某些节点
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="node_id"></param>
        /// <param name="group"></param>
        public void PushMsgToNode(string msg, int msg_type = 0, int node_id = 0, int group = 0, string name = "")
        {
            var _msg = new PasteSloveMessage
            {
                body = msg,
                msg_type = msg_type,
                from = _current_node
            };
            var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
            if (!voting)
            {
                //如果当前在选举中,则暂存!
                _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
            }
            else
            {
                ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
            }
        }

        /// <summary>
        /// 把消息发送给所有节点,包括自己
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="node_id"></param>
        /// <param name="group"></param>
        public void PushMsgToAll(string msg, int msg_type = 0)
        {
            var _msg = new PasteSloveMessage
            {
                body = msg,
                msg_type = msg_type,
                from = _current_node
            };
            var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
            if (!voting)
            {
                //如果当前在选举中,则暂存!
                _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
            }
            else
            {
                ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
            }
        }

        /// <summary>
        /// 发送消息给master所在的节点的业务处理,如果当前为master则变更channel,切换到给业务处理
        /// </summary>
        /// <param name="msg"></param>
        public void PushMsgToMaster(string msg, int msg_type = 0)
        {
            var _msg = new PasteSloveMessage
            {
                body = msg,
                from = _current_node,
                msg_type = msg_type
            };
            var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
            if (CurrentIsMaster || _current_master == null)
            {
                //切换队列
                ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
            }
            else
            {
                if (!voting)
                {
                    _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
                }
                else
                {
                    ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
                }
            }
        }

        /// <summary>
        /// 直接推送消息,主要作用在于立马反馈
        /// 如果没有对象,则会发送给master
        /// 可以从from中判断是不是回环了
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="msg_type"></param>
        /// <param name="id"></param>
        /// <param name="name"></param>
        /// <param name="group"></param>
        /// <param name="host"></param>
        /// <returns></returns>
        public async Task<bool> PushDirectionMsg(string msg, int msg_type = 0, int id = 0, string name = "", int group = 0, string host = "")
        {
            var _message = new PasteSloveMessage
            {
                body = msg,
                msg_type = msg_type,
            };
            return await PushDirectionMsg(_message, id, name, group, host);
        }

        /// <summary>
        /// 直接发送消息给某一个节点
        /// 如果没有命中则发送给Master
        /// 也就是说,可能发送给当前!注意不要回环了!!!
        /// </summary>
        /// <param name="message"></param>
        /// <param name="id"></param>
        /// <param name="name"></param>
        /// <param name="group"></param>
        /// <param name="host">示例http://192.168.1.5</param>
        /// <returns>是否发送成功,发送给某一个节点了?</returns>
        public async Task<bool> PushDirectionMsg(PasteSloveMessage message, int id = 0, string name = "", int group = 0, string host = "")
        {
            message.from = _current_node;
            var _send = false;
            if (id != 0)
            {
                if (CurrentId == id)
                {
                    message.to = _current_node;
                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_postBody);
                    await ChannelCluster.Writer.WriteAsync(_message);
                    return true;
                }
                else
                {
                    var find = node_list.Where(x => x.id == id).FirstOrDefault();
                    if (find != null && find != default)
                    {
                        message.to = find;
                        var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                        var _api = await slove_post(find, _postBody, _config.ApiMsg);
                        return _api.success;
                    }
                }
            }

            if (!string.IsNullOrEmpty(host) && node_list != null)
            {
                var find = node_list.Where(x => x.host == host).FirstOrDefault();
                if (find != null && find != default)
                {
                    message.to = find;
                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var _api = await slove_post(find, _postBody, _config.ApiMsg);
                    return _api.success;
                }
            }

            if (!string.IsNullOrEmpty(name) && node_list != null)
            {
                var find = node_list.Where(x => x.name == name).FirstOrDefault();
                if (find != null && find != default)
                {
                    message.to = find;
                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var _api = await slove_post(find, _postBody, _config.ApiMsg);
                    return _api.success;
                }
            }
            if (group != 0 && node_list != null)
            {
                var find = node_list.Where(x => x.group == group).FirstOrDefault();
                if (find != null && find != default)
                {
                    message.to = find;
                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var _api = await slove_post(find, _postBody, _config.ApiMsg);
                    return _api.success;
                }
            }

            if (!_send)
            {
                if (CurrentIsMaster || IsSingle)
                {
                    message.to = _current_node;
                    await ChannelCluster.Writer.WriteAsync(message);
                    return true;
                }
                else
                {
                    if (_current_master != null)
                    {
                        message.to = _current_master;
                        var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                        var _api = await slove_post(_current_master, _postBody, _config.ApiMsg);
                        return _api.success;
                    }
                }
            }
            return _send;
        }

        /// <summary>
        /// 如果当前不是master则不执行,直接抛弃,如果当前是master则执行
        /// </summary>
        /// <param name="msg"></param>
        public async Task<bool> PushMsgToOnlyMaster(string msg, int msg_type = 0)
        {
            var _msg = new PasteSloveMessage
            {
                body = msg,
                msg_type = msg_type,
                from = _current_node
            };
            var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
            if (CurrentIsMaster || _current_master == null)
            {
                await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
                return true;
            }
            return false;
        }

        /// <summary>
        /// 处理节点中队列的消息
        /// </summary>
        private async void ActionSloveEvent()
        {
            try
            {
                var _read = await _channel_slove.Reader.ReadAsync();
                if (_read != null && _read != default)
                {
                    //_logger.LogInformation("Cluster.Event:" + Newtonsoft.Json.JsonConvert.SerializeObject(_read));
                    switch (_read.action)
                    {
                        case PasteSloveEvent.votemaster:
                            {
                                //告知别人,我是master
                                if (_current_node == null)
                                {
                                    return;
                                }
                                _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
                                if (node_list != null && node_list.Count > 0)
                                {
                                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_current_node);
                                    //必须大于1,有2个及以上节点,才有master的说法
                                    foreach (var _node in node_list)
                                    {
                                        if (_node.host != _current_node.host)
                                        {
                                            var _api = await slove_post(_node, _postBody, _config.ApiVote);
                                            if (_api.success)
                                            {
                                                //_node.last_time = DateTime.Now.ToUnixTimeSeconds();
                                                if (_api.master != null)
                                                {
                                                    if (_api.master.host != _current_node.host)
                                                    {
                                                        //找到一个竞争者? 有节点说他才是master 而且时间比我小 我只好退位让贤
                                                        _current_master = _api.master;
                                                        break;
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    //选举完成了,我是不是master?
                                    if (_current_master == null)
                                    {
                                        _current_master = _current_node;
                                        await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
                                    }
                                }
                            }
                            break;
                        case PasteSloveEvent.scanmaster:
                            {
                                if (_current_node == null)
                                {
                                    return;
                                }
                                _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
                                if (node_list != null && node_list.Count > 0)
                                {
                                    voting = true;
                                    //必须大于1,有2个及以上节点,才有master的说法
                                    foreach (var _node in node_list)
                                    {
                                        var _api = await slove_get(_node, _config.ApiAsk);
                                        if (_api.success && _api.master != null)
                                        {
                                            _current_master = _api.master;
                                            if (_current_node.host == _current_master.host)
                                            {
                                                await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
                                            }
                                            break;
                                        }
                                    }
                                    if (_current_master != null)
                                    {
                                        if (_current_node?.host != _current_master?.host)
                                        {
                                            //告知 master 我要加入集群
                                            var _join = await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), _config.ApiJoinSlove);
                                            if (!_join.success)
                                            {
                                                //这里失败了,咋办??? 未完待续
                                                voting = true;
                                                _current_master = null;
                                            }
                                        }
                                    }
                                    else
                                    {
                                        //饶了一圈,全部不能访问
                                        _current_master = _current_node;
                                        voting = false;
                                        await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
                                    }
                                }
                                else
                                {
                                    //节点信息不完整 还是只有这么一个节点? 未完待续
                                    _current_master = _current_node;
                                    voting = false;
                                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
                                }
                            }
                            break;
                        case PasteSloveEvent.suremaster:
                            {
                                if (_current_node == null)
                                {
                                    return;
                                }
                                if (node_list != null && node_list.Count > 0)
                                {
                                    var _post = new PasteMasterResponse { };
                                    _post.master = _current_node;
                                    _post.nodes = node_list?.ToArray();
                                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
                                    //必须大于1,有2个及以上节点,才有master的说法
                                    foreach (var _node in node_list)
                                    {
                                        if (_node.host != _current_node.host)
                                        {
                                            var _api = await slove_post(_node, _postBody, _config.ApiMaster);
                                        }
                                    }
                                    await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 1, body = _postBody, from = _current_node, time = DateTime.Now.ToUnixTimeSeconds() });
                                }
                            }
                            break;
                        case PasteSloveEvent.msg_to_master:
                            {
                                var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
                                if (CurrentIsMaster)
                                {
                                    _message.from = _current_node;
                                    _message.to = _current_node;
                                    await ChannelCluster.Writer.WriteAsync(_message);
                                }
                                else
                                {
                                    //通过 api 发送给 master
                                    if (_current_master != null)
                                    {
                                        _message.from = _current_node;
                                        _message.to = _current_master;
                                        var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
                                        //如果发送失败,暂停业务,进行vote阶段
                                        var _join = await slove_post(_current_master, _postBody, _config.ApiMsg);
                                        if (!_join.success)
                                        {
                                            if (_read.try_time > 0)
                                            {
                                                _read.try_time = _read.try_time - 1;
                                                //这里失败了,咋办??? 未完待续
                                                await _channel_slove.Writer.WriteAsync(_read);
                                            }
                                        }
                                    }
                                }
                            }
                            break;
                        case PasteSloveEvent.msg_to_node:
                            {
                                if (node_list == null || node_list.Count == 0)
                                {
                                    return;
                                }
                                var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
                                if (_read.node_id != 0)
                                {
                                    var nodes = node_list.Where(x => x.id == _read.node_id).ToList();
                                    if (nodes != null && nodes.Count > 0)
                                    {
                                        foreach (var _node in nodes)
                                        {
                                            _message.to = _node;
                                            var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
                                            var _api = await slove_post(_node, _postBody, _config.ApiMsg);
                                        }
                                    }
                                }
                                if (_read.group != 0)
                                {
                                    var nodes = node_list.Where(x => x.group == _read.group).ToList();
                                    if (nodes != null && nodes.Count > 0)
                                    {
                                        foreach (var _node in nodes)
                                        {
                                            _message.to = _node;
                                            var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
                                            var _api = await slove_post(_node, _postBody, _config.ApiMsg);
                                        }
                                    }
                                }
                                if (_read.group == 0 && _read.node_id == 0)
                                {
                                    foreach (var _node in node_list)
                                    {
                                        _message.to = _node;
                                        var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
                                        var _api = await slove_post(_node, _postBody, _config.ApiMsg);
                                    }
                                }
                            }
                            break;
                        case PasteSloveEvent.msg_to_all_node:
                            {
                                if (node_list == null || node_list.Count == 0)
                                {
                                    return;
                                }
                                var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
                                foreach (var _node in node_list)
                                {
                                    _message.to = _node;
                                    var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
                                    var _api = await slove_post(_node, _postBody, _config.ApiMsg);
                                }
                            }
                            break;
                        case PasteSloveEvent.removenode:
                            {
                                if (CurrentIsMaster)
                                {
                                    var _node = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteNodeModel>(_read.message);
                                    await RemoveCurrentNode(_node);
                                    //下发群发消息
                                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
                                    var _messge = new PasteSloveMessage
                                    {
                                        body = _read.message,
                                        from = _current_node,
                                        msg_type = 4,
                                    };
                                    await ChannelCluster.Writer.WriteAsync(_messge);
                                }
                            }
                            break;
                        case PasteSloveEvent.unziplist:
                            {
                                if (ziplist != null && ziplist.Count > 0)
                                {
                                    var _count = ziplist.Count;
                                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}将从队列ZIPLIST中发送{ziplist.Count}条消息");
                                    for (var k = 0; k < _count; k++)
                                    {
                                        if (!voting)
                                        {
                                            if (ziplist.Count > 0)
                                            {
                                                var _first = ziplist[0];
                                                await _channel_slove.Writer.WriteAsync(_first);
                                                ziplist.RemoveAt(0);
                                            }
                                        }
                                    }
                                }
                                break;
                            }
                        case PasteSloveEvent.findnode:
                            {
                                var _find = false;
                                if (!string.IsNullOrEmpty(node_code))
                                {
                                    if (node_list != null && node_list.Count > 0)
                                    {
                                        foreach (var _node in node_list)
                                        {
                                            if (String.IsNullOrEmpty(_node.node_code))
                                            {
                                                var _build = new PasteNodeModel
                                                {
                                                    group = _node.group,
                                                    id = _node.id,
                                                    host = _node.host,
                                                    node_code = node_code
                                                };
                                                var _api = await slove_post(_build, Newtonsoft.Json.JsonConvert.SerializeObject(_build), _config.ApiFind);
                                                if (_api.success)
                                                {
                                                    if (_api.node != null)
                                                    {
                                                        if (_api.node.node_code == node_code)
                                                        {
                                                            _find = true;
                                                            break;
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                                if (_find)
                                {
                                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
                                }
                            }
                            break;
                    }
                }
            }
            catch (Exception exl)
            {
                _logger.LogError(exl.Message);
                await Task.Delay(1000);
            }
            finally
            {
                ActionSloveEvent();
            }
        }

        /// <summary>
        /// 知道当前节点信息后,注册到节点或者更新当前节点信息
        /// </summary>
        /// <param name="host"></param>
        /// <param name="id"></param>
        /// <param name="group"></param>
        public async void Register(string host, int id, int group = 0, string name = "")
        {
            Console.WriteLine($"Register At:{host}");
            var _current = new PasteNodeModel
            {
                group = group,
                host = host,
                id = id,
                name = name,
                last_time = DateTime.Now.ToUnixTimeSeconds()
            };
            _current_node = _current;
            await AddNodeToList(_current);
            //准备问询
            await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
        }

        /// <summary>
        /// 有新的节点加入 可以是业务上读取节点列表,然后遍历调用这个
        /// </summary>
        /// <param name="input"></param>
        public async void Join(PasteNodeModel input)
        {
            await AddNodeToList(input);
            //群发
            if (CurrentIsMaster)
            {
                await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });

                //告知系统有新的节点
                var _messge = new PasteSloveMessage
                {
                    body = Newtonsoft.Json.JsonConvert.SerializeObject(input),
                    from = _current_node,
                    msg_type = 3,
                };
                await ChannelCluster.Writer.WriteAsync(_messge);
            }
        }

        /// <summary>
        /// 添加节点
        /// </summary>
        /// <param name="input"></param>
        /// <param name="scan">加入后,是否搜索master</param>
        public async Task AddNodeToList(PasteNodeModel input)
        {
            try
            {
                await _semaphoreSlim.WaitAsync();
                //Console.WriteLine($"--join--:{input.host}");
                if (node_list == null)
                {
                    node_list = new List<PasteNodeModel> { input };
                }
                else
                {
                    var find = node_list.Where(x => x.host == input.host).FirstOrDefault();
                    if (find != null && find != default)
                    {
                        find.id = input.id;
                        find.group = input.group;
                    }
                    else
                    {
                        node_list.Add(input);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex.Message);
            }
            finally
            {
                _semaphoreSlim.Release();
            }
        }

        /// <summary>
        /// 扫描master 这个发生在加入新的节点信息后 或者启动后
        /// </summary>
        public async void ScanMaster()
        {
            await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
        }

        /// <summary>
        /// 健康检查,返回当前信息
        /// </summary>
        /// <returns></returns>
        public PasteApiResponse Health()
        {
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node, node_count = node_list != null ? node_list.Count : 0 };
        }

        /// <summary>
        /// 被问询 当前有没有master
        /// </summary>
        public PasteApiResponse Ask()
        {
            if (_current_node != null)
            {
                if (_current_master == null)
                {
                    _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
                    //_current_master = _current_node;
                    //CurrentIsMaster = true;
                    //进入分发流程,告知我是master
                    _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.votemaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
                }
            }
            //if (_current_node != null)
            //{
            //    _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
            //}
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 集群间的消息
        /// </summary>
        /// <param name="msg"></param>
        /// <returns></returns>
        public async Task<PasteApiResponse> Msg(PasteSloveMessage msg)
        {
            await ChannelCluster.Writer.WriteAsync(msg);
            msg_count++;
            if (msg.msg_type == 2)
            {

            }
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 有master分发消息,说他是master
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        public async Task<PasteApiResponse> Vote(PasteNodeModel input)
        {
            if (_current_node != null)
            {
                //排除当前节点问当前节点
                if (_current_node.host != input.host)
                {
                    if (_current_master == null)
                    {
                        //当前没有master 那来的就是master吧
                        _current_master = input;
                    }
                    else
                    {
                        //当前有master
                        if (_current_master.host != input.host)
                        {
                            if (_current_master.vote_time < input.vote_time)
                            {
                                var _api = await slove_get(_current_master, _config.ApiHealth);
                                if (_api.success)
                                {
                                    return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
                                }
                                else
                                {
                                    _current_master = input;
                                }
                            }
                        }
                        return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
                    }
                }
            }
            //返回 哪个是master
            return new PasteApiResponse { success = true, master = input, node = _current_node };
        }

        /// <summary>
        /// master选举完成,master广播说他是master
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        public async Task<PasteApiResponse> Master(PasteMasterResponse input)
        {
            var _old_is_master = CurrentIsMaster;

            _current_master = input.master;
            if (input.nodes != null)
            {
                foreach (var _node in input.nodes)
                {
                    await AddNodeToList(_node);
                }
            }
            voting = false;
            //如果队列有暂存消息,则发送!
            await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.unziplist });
            var _new_is_master = CurrentIsMaster;
            if (_old_is_master && !_new_is_master)
            {
                await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 2, body = Newtonsoft.Json.JsonConvert.SerializeObject(input), from = _current_node });
            }
            //返回 哪个是master
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        public async Task<PasteApiResponse> Leave(PasteNodeModel input)
        {
            if (_current_node != null)
            {
                if (_current_node.host != input.host)
                {
                    //谁离线了
                    //如果自己是master 则应该是节点离线 通知其他节点,节点列表变更
                    //如果自己是node 离线的是master 则自己进去选择
                    if (CurrentIsMaster)
                    {
                        if (node_list != null && node_list.Count > 0)
                        {
                            await RemoveCurrentNode(input);
                            if (node_list.Count > 0)
                            {
                                var _post = new PasteMasterResponse { };
                                _post.master = _current_node;
                                _post.nodes = node_list?.ToArray();
                                var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
                                foreach (var _node in node_list)
                                {
                                    if (_node.host != _current_node.host && _node.host != input.host)
                                    {
                                        var _api = await slove_post(_node, _postBody, _config.ApiMaster);
                                    }
                                }
                            }
                        }
                        //告知系统有新的节点
                        var _messge = new PasteSloveMessage
                        {
                            body = Newtonsoft.Json.JsonConvert.SerializeObject(input),
                            from = _current_node,
                            msg_type = 4,
                        };
                        await ChannelCluster.Writer.WriteAsync(_messge);
                    }
                    else
                    {
                        if (_current_master != null && input.host == _current_master.host)
                        {
                            _current_master = null;
                            await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                        }
                    }
                }
            }
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        public async Task<PasteApiResponse> Remove(PasteNodeModel input)
        {
            if (_current_node != null)
            {
                if (_current_node.host == input.host)
                {
                    //我居然被移除了,赶紧加回去
                    await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
                }
                await RemoveCurrentNode(input);
            }
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 移除节点
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        private async Task RemoveCurrentNode(PasteNodeModel input)
        {
            try
            {
                await _semaphoreSlim.WaitAsync();
                if (node_list != null)
                {
                    var find = node_list.Where(x => x.host == input.host).FirstOrDefault();
                    if (find != null && find != default)
                    {
                        node_list.Remove(find);
                    }
                }
            }
            catch (Exception exl)
            {
                _logger.LogError(exl.Message);
            }
            finally
            {
                _semaphoreSlim.Release();
            }
        }

        /// <summary>
        /// 打印当前状态
        /// </summary>
        /// <returns></returns>
        public string Status()
        {
            return $"总节点数:{node_list?.Count} Master:{_current_master?.host} Current:{_current_node?.host} Count:{msg_count}";
        }

        /// <summary>
        /// 打印当前状态
        /// </summary>
        /// <returns></returns>
        public PasteApiResponse State()
        {
            return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
        }

        /// <summary>
        /// 离线 告诉除了自己的第一个节点,我要离线了!
        /// </summary>
        private async Task _leave()
        {
            if (CurrentIsMaster)
            {
                if (node_list != null && node_list.Count > 0)
                {
                    if (_current_node != null)
                    {
                        foreach (var _node in node_list)
                        {
                            if (_node.host != _current_node?.host)
                            {
                                var _api = await slove_post(_node, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");
                                if (_api.success)
                                {
                                    break;
                                }
                            }
                        }
                    }
                }
            }
            else
            {
                if (_current_master != null && _current_node != null)
                {
                    await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");
                }
            }
        }

        #region

        /// <summary>
        /// 
        /// </summary>
        /// <param name="node"></param>
        /// <param name="path"></param>
        /// <returns></returns>
        private async Task<PasteApiResponse> slove_get(PasteNodeModel node, string path)
        {
            var _read = await get($"{node.host}{path}", node.host);
            if (_read.code == 200)
            {
                if (node.error_time > 0)
                {
                    node.error_time = 0;
                }
                node.last_time = DateTime.Now.ToUnixTimeSeconds();
                return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);
            }
            else
            {
                node.error_time++;
                return new PasteApiResponse { success = false };
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="node"></param>
        /// <param name="body"></param>
        /// <param name="path"></param>
        /// <returns></returns>
        private async Task<PasteApiResponse> slove_post(PasteNodeModel node, string body, string path)
        {
            var _read = await post($"{node.host}{path}", body, node.host);
            if (_read.code == 200)
            {
                if (node.error_time > 0)
                {
                    node.error_time = 0;
                }
                node.last_time = DateTime.Now.ToUnixTimeSeconds();
                return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);
            }
            else
            {
                node.error_time++;
                return new PasteApiResponse { success = false };
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="url"></param>
        /// <returns></returns>
        private async Task<(int code, string response)> get(string url, string host)
        {
            try
            {
                var request = new HttpRequestMessage(HttpMethod.Get, url);
                request.Headers.Add("slovetoken", _config.SloveToken);
                if (_current_node != null)
                {
                    request.Headers.Add("fromnode", _current_node.host);
                }
                if (_current_master != null)
                {
                    request.Headers.Add("masternode", _current_master.host);
                }
                request.Headers.Add("ContentType", "application/json;charset=utf-8");
                var client = _httpClientFactory.CreateClient();
                client.Timeout = TimeSpan.FromSeconds(4);
                client.DefaultRequestHeaders.Add("ContentType", "application/json;charset=utf-8");
                var response = await client.SendAsync(request);
                using var stream = new StreamReader(await response.Content.ReadAsStreamAsync(), Encoding.UTF8);
                var backstream = stream.ReadToEnd();
                if (response.StatusCode == HttpStatusCode.OK)
                {
                    _logger.LogInformation(backstream);
                }
                else
                {
                    _logger.LogError(backstream);
                }
                return ((int)response.StatusCode, backstream);
            }
            catch (Exception exl)
            {
                _logger.LogError(exl.Message);
                return (500, exl.Message);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="url"></param>
        /// <param name="postdata"></param>
        /// <returns></returns>
        private async Task<(int code, string response)> post(string url, string postdata, string host)
        {
            try
            {
                using (HttpContent httpcontent = new StringContent(postdata))
                {
                    httpcontent.Headers.Add("slovetoken", _config.SloveToken);
                    if (_current_node != null)
                    {
                        httpcontent.Headers.Add("fromnode", _current_node.host);
                    }
                    if (_current_master != null)
                    {
                        httpcontent.Headers.Add("masternode", _current_master.host);
                    }
                    httpcontent.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json;chartset=utf-8");
                    var client = _httpClientFactory.CreateClient();
                    client.Timeout = TimeSpan.FromSeconds(4);
                    client.DefaultRequestHeaders.Referrer = new Uri(url);
                    var response = await client.PostAsync(url, httpcontent);
                    var backstream = await response.Content.ReadAsStringAsync();
                    if (response.StatusCode != HttpStatusCode.OK && response.StatusCode != HttpStatusCode.NoContent && response.StatusCode != HttpStatusCode.Created)
                    {
                        _logger.LogError($"URL:{url} CODE:{response.StatusCode} POST:{postdata} BACK:{backstream}");
                    }
                    return ((int)response.StatusCode, backstream);
                }
            }
            catch (Exception exl)
            {
                _logger.LogError(exl.Message);
                return (500, exl.Message);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        public void Dispose()
        {

            //离开集合
            _leave().Wait();

            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
            }
            //Task.CompletedTask;
        }

        #endregion
        //是否要压入队列 
        //统计数据打印 某一个消息的次数!
    }

}

标签:node,PasteSpider,current,host,源码,集群,var,master,节点
From: https://www.cnblogs.com/pastespider/p/18244253

相关文章

  • ThreadLocal源码分析
    目录0x00ThreadLocal0x01ThreadLocalMap0x02ThreadLocal内存泄漏0x00ThreadLocalThreadLocal提供了线程局部的变量,但和普通局部变量不同,同一个ThreadLocal变量可以被多个线程共享,而不是线程私有的。在ThreadLocal源代码中有一个使用例子,代码如下:importjava.ut......
  • Chromium源码阅读:深入理解Mojo框架的设计思想,并掌握其基本用法(2)
    我们继续分析Chromium的Mojo模块。DispatcherDispatcher是MojoIPC系统中的一个关键概念。它是一个虚基类类(或接口),用于实现与特定MojoHandle相关联的Mojo核心API调用。在Mojo系统中,应用程序通过这些API与各种类型的IPC机制进行交互,如消息管道、共享缓冲区......
  • 青否数字人直播源码代理端后台操作步骤!
    青否数字人直播源码代理端后台,我们将详细介绍一下数字人的代理端后台的详细操作步骤!1.代理端入口2.代理后台预览基本设置,账号管理,资金管理,克隆端。2.1基本设置设置一些账号的基本信息包括名称,logo,二维码等等,点击“提交”即可2.2账号管理打开账号管理,可搜索已创建的......
  • 云原生Kubernetes系列项目实战-k8s集群+高可用负载均衡层+防火墙
    一、Kubernetes区域可采用Kubeadm方式进行安装:名称主机部署服务master192.168.91.10docker、kubeadm、kubelet、kubectl、flannelnode01192.168.91.11docker、kubeadm、kubelet、kubectl、flannelnode02192.168.91.20docker、kubeadm、kubelet、kubectl、flannel1.系统初......
  • C#(asp.net) 非物质文化遗产建档管理系统设-计算机毕业设计源码91695
    摘 要信息化社会内需要与之针对性的信息获取途径,但是途径的扩展基本上为人们所努力的方向,由于站在的角度存在偏差,人们经常能够获得不同类型信息,这也是技术最为难以攻克的课题。针对非物质文化遗产建档管理系统等问题,对非物质文化遗产建档管理系统进行研究分析,然后开发设计出......
  • 最新AI系统+ChatGPT网站H5源码+AI绘画系统,DALL-E3文生图,详细图文搭建教程/文档分析/识
    目录一、文章前言系统文档 二、系统演示三、系统功能模块3.1AI全模型支持/插件系统AI模型提问文档分析​识图理解能力3.2GPts应用3.2.1GPTs应用3.2.2GPTs工作台3.2.3自定义创建预设应用3.3AI专业绘画3.3.1文生图/图生图(垫图)3.3.2 局部编辑重绘3.3.3 ......
  • 源码编译构建LAMP
    Apache起源源于APatchyServer,著名的开源Web服务软件1995年时,发布Apache服务程序的1.0版本由Apache软件基金会(ASF)负责维护最新的名称为“ApacheHTTPServer”官方站点:http://httpd.apache.org/主要特点开发源代码/跨平台应用支持多种网页编程语言模块化涉及、运行稳定、......
  • 短剧对接广告联盟app平台开发源码
    短剧对接广告联盟APP平台开发的源码涉及多方面的技术整合与策略设计。以下是一些关键步骤和考虑因素:需求分析:深入理解短剧APP的市场定位、目标用户群体以及广告联盟的具体需求。这有助于确定系统的基本功能和特色服务,确保开发的APP能够满足用户和广告主的实际需求1。系统......
  • SSM-小区物业管理系统-48954(免费领源码+开发文档)可做计算机毕业设计JAVA、PHP、爬虫、
    基于SSM小区物业管理系统摘要随着计算机科学技术日渐成熟,人们已经深刻认识到了计算机功能的强大,计算机已经进入到了人类社会发展的各个领域,并且发挥着十分重要的作用。每个社区的物业管理是一项系统而复杂的工作,它需要一个团队互相配合、分工协作。在该领域,传统的手工存取......
  • 【Jenkins+K8s】持续集成与交付 (二十):K8s集群通过Deployment方式部署安装Jenkins
    ......