首页 > 编程语言 >elastic-job源码(2)-选举机制

elastic-job源码(2)-选举机制

时间:2023-04-26 19:34:01浏览次数:48  
标签:event elastic void jobname job 源码 election 节点 leader

选举机制:利用zookeeper分布式锁机制,每一个job都存在节点选举机制,用于job分片处理。    Job在初始化的时候就会实施选举机制 如下初始化的代码: 

public void registerStartUpInfo(final boolean enabled) {
    //开始所有的监听器
    listenerManager.startAllListeners();
    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
    leaderService.electLeader();
    //{namespace}/{ipservers} 设置enable处理
    serverService.persistOnline(enabled);
    //临时节点   /{namespave}/instances 放置运行服务实例信息
    instanceService.persistOnline();
    //开启一个异步服务
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

listenerManager.startAllListeners();会开启一个选举相关的listenerManager  ElectionListenerManager.class leaderService.electLeader();执行选举功能    第一步:执行选举功能
public void electLeader() {
    log.debug("Elect a new leader now.");
    this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());
    log.debug("Leader election completed.");
}
public void executeInLeader(String key, LeaderExecutionCallback callback) {
    try {
        LeaderLatch latch = new LeaderLatch(this.client, key);


        try {
            latch.start();
            latch.await();
            callback.execute();
        } catch (Throwable var7) {
            try {
                latch.close();
            } catch (Throwable var6) {
                var7.addSuppressed(var6);
            }


            throw var7;
        }


        latch.close();
    } catch (Exception var8) {
        this.handleException(var8);
    }


}

{job name}/leader/election/latch节点加zk锁,在抢到锁之后,调用callback对象中的execute方法  
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        //{jobname}/leader/election/instance 不存在
        if (!hasLeader()) {
            //创建临时节点 {jobname}/leader/election/instance 值为 当前运行的实例值 例如:10.100.16.75@-@134642 前面是ip地址,后面是产生的随机数
            //当应用实例与zk断开重新连接时,该节点信息会清除
            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        }
    }
}

第二步:ElectionListenerManager.class开启监听
@Override
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}

执行start方法 有两个监听 LeaderElectionJobListener:用于leader宕机之后重新选举监听 LeaderAbdicationJobListener :用于监听leader宕机数据处理   LeaderElectionJobListener.java
@Override
public void onChange(final DataChangedEvent event) {
    //1.schedulerMap 和 jobInstanceMap 没有job信息
    //2.{jobname}/service/{ip} 节点数据为DISABLE 或者 ({jobname}/leader/election/instance 节点的类型为删除且{jobname}/servers 节点的值是ENABLED 且  {jobname}/instances 节点下有其他的在线实例)
    //当前运行的job实例宕机,并且有其他运行的实例
    if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) {
        //重新选举
        leaderService.electLeader();
    }
}

LeaderAbdicationJobListener.java

@Override
public void onChange(final DataChangedEvent event) {
    //{jobname}/leader/election/instance节点的实例id和JobRegistry对象中的实例id相等
    //{jobname}/service/{ip}/ 是DISABLED
    //就是实例下线
    if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) {
        //删除{jobname}/leader/election/instance 节点
        leaderService.removeLeader();
    }
}

 

 

 

 

 

标签:event,elastic,void,jobname,job,源码,election,节点,leader
From: https://www.cnblogs.com/lingyujuan/p/17357063.html

相关文章

  • 扩展 jol 源码包 打印 Mark Word
    参考: https://blog.csdn.net/qq_38505969/article/details/1234463101、下载源码并进行方法扩展2、pom.xml<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http......
  • 在线直播源码,自定义AlertDialog设置宽高并去掉默认的边框
    在线直播源码,自定义AlertDialog设置宽高并去掉默认的边框1、先写一个自定义的AlertDialog。 packagecom.phone.common_library.dialog; importandroid.annotation.SuppressLint;importandroid.content.Context;importandroid.content.DialogInterface;importandroid.vie......
  • go channel源码阅读
    gochannel源码阅读channel介绍channel是一个类型管道,通过它可以在groutine之间发送消息核心数据结构channel内部数据结构是固定长度的双向循环列表按顺序往里面写数据,写满之后又从0开始写chan中的两个重要组件是buf和waitq,所有的行为和实现都是围绕着两个组件进行的typ......
  • junit源码分析
    JUnit源码分析(一)——Command模式和Composite模式JUnit的源码相比于spring和hibernate来说比较简单,但麻雀虽小,五脏俱全,其中用到了比较多的设计模式。很多人已经在网上分享了他们对JUnit源码解读心得,我这篇小文谈不出什么新意,本来不打算写,可最近工作上......
  • Telegram 源码解读点滴记录
    1.test_callable_plain,作用:判断参数是否与函数声明一致usingfalse_t=char;structtrue_t{ false_tdata[2];};static_assert(sizeof(false_t)!=sizeof(true_t),"Ican'twork:(");template< typenameMethod, typename...Args, typename=decltype(......
  • Django之视图函数层 (必会三板斧 JsonResponse对象 request对象获取文件 FBV与CBV
    目录视图层之必会三板斧用来处理请求的视图函数都必须返回HttpResponse对象情况一:啥也不返回这里会报一个没有返回HttpResponse对象的错误,由此可见必须要返回一个HttpResponse对象情况二:返回HttpResponse对象点击Ctrl键查看源码可见是HttpResponse类,所以会返回一个值情......
  • jdk并发包 CopyOnWriteArrayList源码分析
    CopyOnWriteArrayList是jdk1.5并法包里面用于处理高并发下,读多写少的情况下,降低锁等待的集合类。下面对该类实现做一个简要的分析1,首先CopyOnWriteArrayList是实现了List接口,对=List接口的相关方法进行了实现。2,下面的它的add方法,会首先加锁,然后copy原List内部的数组,然后对新数组长......
  • ray-分布式计算框架-集群与异步Job管理
    0.ray简介ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包RayAIRuntimeML应用程序库集RayCore通用分布式计算库Task--Ray允许任意Python函数在单独的Pythonworker上运行,这些异步Python函数称为任务Actor......
  • Django框架——路由分发、名称空间、虚拟环境、视图层三板斧、JsonResponse对象、requ
    路由分发#Django支持每个应用都可以有自己独立的路由层、静态文件、模版层。基于该特性多人开发项目就可以完全解耦合,之后利用路由分发还可以整合到一起多个应用都有很多路由与视图函数的对应关系这个时候可以拆分到各自的路由层中使用路由分发之前总路由直接是路由与视图......
  • Turndown 源码解析:二、规则
    规则集包含一系列规则,决定各种标签如何反编译。单个规则的格式是:{ filter:String|String[]|function(node), replacement:function(node,content,options),}filter字段用于判断节点是否适用单条规则。如果它是字符串,则判断node.nodeName===filter;如果它是字符串数......