首页 > 编程语言 >zookeeper源码(03)启动流程

zookeeper源码(03)启动流程

时间:2023-10-30 13:12:46浏览次数:34  
标签:03 配置 int zookeeper protected 线程 new config 源码

本文将从启动类开始详细分析zookeeper的启动流程:

  1. 加载配置的过程
  2. 集群启动过程
  3. 单机版启动过程

启动类

org.apache.zookeeper.server.quorum.QuorumPeerMain类。

用于启动zookeeper服务,第一个参数用来指定配置文件,配置文件properties格式,例如以下配置参数:

  • dataDir - 数据存储目录
  • dataLogDir - txnlog(事务日志)存储目录,默认dataDir
  • clientPort - 接收客户端连接的端口,例如2181
  • tickTime - leader做quorum验证的周期时长,默认3000ms
  • initLimit - leader等待follower连接、数据同步ack的最大tick数量
  • syncLimit - leader发送同步数据等待ack的最大tick数量
  • server.id - 用于quorum协议的host:port[:port]格式的server列表

加载配置

QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
    config.parse(args[0]);
}

args[0]是配置文件名。

  1. 加载普通配置
  2. 加载quorumPeer配置
  3. 从dynamic文件加载quorumVerifier配置,zk会在processReconfig时生成dynamic文件,默认文件名zoo.cfg.dynamic.${version}格式,默认不开启Reconfig功能
  4. 从dynamic文件加载lastQuorumPeer配置,同上,默认zoo.cfg.dynamic.next文件

加载普通配置

QuorumPeerConfig封装以下字段:

// 监听客户端连接的地址,使用clientPort和clientPortAddress配置确定,用来创建cnxnFactory
protected InetSocketAddress clientPortAddress;
// 用来创建secureCnxnFactory,使用secureClientPort和secureClientPortAddress配置确定
protected InetSocketAddress secureClientPortAddress;
// quorum使用ssl通信
protected boolean sslQuorum = false;
// portUnification配置,使用UnifiedServerSocket创建套接字
protected boolean shouldUsePortUnification = false;
// 用来创建ObserverMaster,该组件可以实现链式的数据复制,减小leader的负载
protected int observerMasterPort;
// 自动重新加载ssl文件
protected boolean sslQuorumReloadCertFiles = false;
// 数据目录和事务log目录
protected File dataDir;
protected File dataLogDir;
// dynamicConfig文件名
protected String dynamicConfigFileStr = null;
// 配置文件名,非配置参数
protected String configFileStr = null;
// leader做quorum验证的周期时长,默认300ms
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
// 单个addr的client最大连接数
protected int maxClientCnxns = 60;
// 超时时长
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// metricsProvider.className配置,MetricsProvider实现类全名
protected String metricsProviderClassName = DefaultMetricsProvider.class.getName();
// 封装metricsProvider.*下面的配置
protected Properties metricsProviderConfiguration = new Properties();
// 本地session配置
protected boolean localSessionsEnabled = false;
protected boolean localSessionsUpgradingEnabled = false;
// client连接的backlog数设置
protected int clientPortListenBacklog = -1;
// leader等待follower连接、数据同步ack的最大tick数量
protected int initLimit;
// leader发送同步数据等待ack的最大tick数量
protected int syncLimit;
// 大于0时用来计算连接leader的超时时长
protected int connectToLearnerMasterLimit;
// 必须是3
protected int electionAlg = 3;
// 未使用的配置
protected int electionPort = 2182;
// 监听所有IP地址
protected boolean quorumListenOnAllIPs = false;
// myid配置
protected long serverId = UNSET_SERVERID;

protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;

// autopurge.snapRetainCount配置
protected int snapRetainCount = 3;
// autopurge.purgeInterval配置
protected int purgeInterval = 0;
// 开启同步
protected boolean syncEnabled = true;

protected String initialConfig;

// PARTICIPANT|OBSERVER
protected LearnerType peerType = LearnerType.PARTICIPANT;

/**
 * Configurations for the quorumpeer-to-quorumpeer sasl authentication
 */
protected boolean quorumServerRequireSasl = false;
protected boolean quorumLearnerRequireSasl = false;
protected boolean quorumEnableSasl = false;
protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE;
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;

// multi address related configs
// multiAddress.enabled配置
private boolean multiAddressEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "false"));
// multiAddress.reachabilityCheckEnabled配置
private boolean multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true"));
// multiAddress.reachabilityCheckTimeoutMs配置
private int multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(
    System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
                       String.valueOf(1000)));

// 创建QuorumVerifier时使用,不为null时会创建QuorumOracleMaj
protected String oraclePath;

// Minimum snapshot retain count.
private final int MIN_SNAP_RETAIN_COUNT = 3;
// JVM Pause Monitor feature switch
protected boolean jvmPauseMonitorToRun = false;
// JVM Pause Monitor warn threshold in ms
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
// JVM Pause Monitor info threshold in ms
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
// JVM Pause Monitor sleep time in ms
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;

在parse方法的最后一个else分支,会将其他配置前缀zookeeper.之后设置到System环境变量中:

System.setProperty("zookeeper." + key, value);

比如一些SSL相关配置参数:

ssl.quorum.keyStore.location=/path/to/keystore.jks
ssl.quorum.keyStore.password=password
ssl.quorum.trustStore.location=/path/to/truststore.jks
ssl.quorum.trustStore.password=password

解析配置文件之后,会根据ssl相关参数做ssl配置:

if (this.secureClientPortAddress != null) {
    configureSSLAuth();
}

默认会将X509AuthenticationProvider作为ssl认证组件:

// key = "zookeeper.authProvider.x509"
System.setProperty(ProviderRegistry.AUTHPROVIDER_PROPERTY_PREFIX + "x509",
                   "org.apache.zookeeper.server.auth.X509AuthenticationProvider");

其余的都是参数验证代码,不详细说明。

加载quorumPeer配置

// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
    // 解析quorum配置
    setupQuorumPeerConfig(zkProp, true);
    if (isDistributed() && isReconfigEnabled()) { // 默认reconfigEnabled==false分支进不来
        // we don't backup static config for standalone mode.
        // we also don't backup if reconfig feature is disabled.
        // 备份zoo.cfg到zoo.cfg.bak
        backupOldConfig();
    }
}

解析quorum配置:

void setupQuorumPeerConfig(Properties prop,
                           boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(
        prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
    // 读取${dataDir}/myid文件,给serverId赋值
    setupMyId();
    // 对比clientPortAddress配置与quorum配置进行重新赋值
    setupClientPort();
    // 对比peerType配置与quorum配置进行重新赋值
    setupPeerType();
    checkValidity(); // 参数验证
}

parseDynamicConfig方法需要看一下:

public static QuorumVerifier parseDynamicConfig(
        Properties dynamicConfigProp, int eAlg, boolean warnings,
        boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException {
    boolean isHierarchical = false;
    for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        // group.*和weight.*的参数配置
        if (key.startsWith("group") || key.startsWith("weight")) {
            isHierarchical = true;
        } else if (!configBackwardCompatibilityMode &&
                   !key.startsWith("server.") && !key.equals("version")) {
            throw new ConfigException("Unrecognised parameter: " + key);
        }
    }

    QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath);

    // 验证 略
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException {
    if (oraclePath == null) {
        return createQuorumVerifier(dynamicConfigProp, isHierarchical);
    } else {
        return new QuorumOracleMaj(dynamicConfigProp, oraclePath);
    }
}

private static QuorumVerifier createQuorumVerifier(
        Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException {
    if (isHierarchical) {
        return new QuorumHierarchical(dynamicConfigProp);
    } else {
        return new QuorumMaj(dynamicConfigProp);
    }
}

QuorumMaj

解析集群配置,配置形如:

server.1=server_config;client_config
server.2=server_config;client_config
server.3=server_config;client_config

# 配置两个也能启动,但是只能提供副本能力,无法保证高可用

# 使用分号分隔server_config和client_config

# 1. server_config格式:
#    host:quorumPort:electionPort 或 host:quorumPort:electionPort:type
#    可以配置多个,使用|分隔
#    例如:
#       127.0.0.1:2888:3888:PARTICIPANT

# 2. client_config可以没有,格式: port或host:port

构造方法:

public QuorumMaj(Properties props) throws ConfigException {
    for (Entry<Object, Object> entry : props.entrySet()) {
        String key = entry.getKey().toString();
        String value = entry.getValue().toString();
        if (key.startsWith("server.")) {
            int dot = key.indexOf('.');
            // 获取serverId
            long sid = Long.parseLong(key.substring(dot + 1));
            // 创建QuorumServer对象,解析value字符串
            // value格式: server_config或server_config;client_config
            // server_config格式是使用|分隔的列表,每个元素是:
            // host:quorumPort:electionPort或host:quorumPort:electionPort:type
            // client_config格式: port或host:port
            QuorumServer qs = new QuorumServer(sid, value);
            allMembers.put(Long.valueOf(sid), qs);
            if (qs.type == LearnerType.PARTICIPANT) {
                votingMembers.put(Long.valueOf(sid), qs); // 投票成员
            } else {
                observingMembers.put(Long.valueOf(sid), qs); // observer成员
            }
        } else if (key.equals("version")) {
            version = Long.parseLong(value, 16);
        }
    }
    half = votingMembers.size() / 2; // 成员半数,例如5/2=2
}

QuorumHierarchical

比QuorumMaj多了group和weight等特性。

从dynamic文件加载quorumVerifier配置

用于加载quorumVerifier信息:从dynamicConfigFileStr参数指定的文件加载quorumPeer配置,方式与上一小节一样。

从dynamic文件加载lastQuorumPeer配置

用于加载lastSeenQuorumVerifier信息:从zoo.cfg.dynamic.next文件加载lastSeenQuorumVerifier配置,方式与上一小节一样。

创建并启动DatadirCleanupManager

默认配置时不启动。

// Start and schedule the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
    config.getDataDir(),
    config.getDataLogDir(),
    config.getSnapRetainCount(), // 默认3
    config.getPurgeInterval());
purgeMgr.start();

启动周期任务:

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // 默认不启动
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }

    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

清理逻辑在PurgeTask的run方法:

public void run() {
    try {
        PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
    } catch (Exception e) {}
}

启动集群

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
}

isDistributed判断:

public boolean isDistributed() {
    return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}

// standaloneEnabled默认true

创建并启动MetricsProvider

final MetricsProvider metricsProvider;
try {
    metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
        config.getMetricsProviderClassName(), // DefaultMetricsProvider
        config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
    throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
// 注册到全局
ServerMetrics.metricsProviderInitialized(metricsProvider);

创建ServerCnxnFactory

ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

if (config.getClientPortAddress() != null) {
    // 默认使用NIOServerCnxnFactory实现类
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(),
                          config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

if (config.getSecureClientPortAddress() != null) {
    secureCnxnFactory = ServerCnxnFactory.createFactory();
    secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                                config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}

ServerCnxnFactory有两个主要的子类:

  • NIOServerCnxnFactory
  • NettyServerCnxnFactory

默认使用NIOServerCnxnFactory实现类,可以使用-Dzookeeper.serverCnxnFactory=xx来修改:

-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

ServerCnxnFactory

用于接收客户端连接、管理客户端session、处理客户端请求。

NIOServerCnxnFactory

基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:

  • 1个accept线程,用来接收客户端连接,交给selector线程处理
  • 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
  • 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
  • 1个清理线程,用于关闭空闲连接

线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。

configure方法:

  • 不支持ssl

  • 创建ConnectionExpirerThread线程

  • 根据核数确定各个线程的数量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 创建SelectorThread线程

  • 创建ServerSocketChannel、启动监听、设置非阻塞

  • 创建AcceptThread线程

start方法启动各种线程:

  • acceptThread
  • selectorThreads
  • workerPool
  • expirerThread

NettyServerCnxnFactory

基于Netty的ServerCnxnFactory实现,使用CnxnChannelHandler作为业务处理器。

后续会有文章详细分析。

创建并启动QuorumPeer

管理quorum协议,服务器可能处于以下三种状态:

  • Leader选举 - 每个服务器将选出一个leader,最初都会选自己
  • Follower节点 - 将与Leader同步并复制所有事务
  • Leader节点 - 处理请求并将其转发给Follower节点,大多数Follower节点必须同步,该请求才能被提交

创建QuorumPeer并使用QuorumPeerConfig为其设置属性:

public QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    jmxRemotePeerBean = new HashMap<>();
    adminServer = AdminServerFactory.createAdminServer(); // http管理的服务,使用JettyAdminServer实现类
    x509Util = createX509Util();
    initialize();
    reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); // 默认false不开启Reconfig功能
}

下面记录一下重要的步骤。

创建FileTxnSnapLog

quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));

FileTxnSnapLog类:操作TxnLog和SnapShot的入口类。

此步骤会创建dataDir和snapDir目录、判断数据目录可写、创建txnLog和snapLog对象访问数据文件。

创建并初始化ZKDatabase

quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.initConfigInZKDatabase();

维护zookeeper服务器内存数据库,包括session、dataTree和committedlog数据,从磁盘读取日志和快照后启动。

内部使用DataTree存储数据,先看一下创建和初始化阶段的代码。

构造方法:创建DataTree对象,创建/zookeeper/quota、/zookeeper/config节点,创建dataWatches和childWatches对象(使用WatchManager实现类)。

initConfigInZKDatabase方法:

public synchronized void initConfigInZKDatabase() {
    if (zkDb != null) {
        zkDb.initConfigInZKDatabase(getQuorumVerifier());
    }
}

public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
    try {
        if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
            // should only happen during upgrade
            this.dataTree.addConfigNode();
        }
        // 把当前QuorumVerifier保存到/zookeeper/config中
        // qv.toString()格式如下:
        // server.1=host1:2888:3888:participant;host1:2181\n
        // server.2=host2:2888:3888:participant;host2:2181\n
        // ...
        // version=2
        this.dataTree.setData(ZooDefs.CONFIG_NODE,
            qv.toString().getBytes(UTF_8), // data
            -1, // version
            qv.getVersion(), // txid
            Time.currentWallTime());
    } catch (NoNodeException e) {}
}

设置QuorumVerifier

quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
    quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}

初始化启动QuorumPeer

// 初始化QuorumAuthServer
quorumPeer.initialize();
// 启动QuorumPeer
quorumPeer.start();
// 线程阻塞
quorumPeer.join();

启动QuorumPeer方法:

public synchronized void start() {
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {}
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}

启动QuorumPeer流程

ZKDatabase加载

从txnlog和snapshot加载dataTree数据:

long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
  1. 倒序查找所有snapshot文件,从文件名解析snapZxid作为dataTree的lastProcessedZxid属性,文件内容解析到dataTree中
  2. 如果从snapshot文件未找到数据,则生成snapshot.0文件,将当前dataTree(空的)保存到里面
  3. 使用fastForwardFromEdits方法从txnlog加载数据

获取currentEpoch和acceptedEpoch的值:

// 当前zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 当前epoch = zxid >> 32L
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);

从${dataDir}/currentEpoch文件读取currentEpoch值:

currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作为currentEpoch并保存到文件
  2. 如果currentEpoch比epochOfZxid小,则继续查找${dataDir}/currentEpoch.tmp文件作为currentEpoch保存到文件,如果文件不存在则抛数据异常

从${dataDir}/acceptedEpoch文件读取acceptedEpoch值:

acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
  1. 如果文件不存在,直接使用epochOfZxid作为acceptedEpoch并保存到文件
  2. 如果acceptedEpoch比currentEpoch小则抛数据异常

启动serverCnxnFactory

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start(); // NIOServerCnxnFactory在启动阶段会启动内部的4类线程
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

启动AdminServer

默认使用JettyAdminServer实现类,负责提供管理端的http接口。

启动选举

public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            // 投自己一票,封装zxid和epoch
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    // electionType总是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    // TODO: use a factory rather than a switch
    // 可以使用策略模式替换switch语句
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        // 关闭oldQcm
        if (oldQcm != null) {
            oldQcm.halt();
        }
        // 用来启动ServerSocket监听
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            listener.start();
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        }
        break;
    default:
        assert false;
    }
    return le;
}

创建QuorumCnxManager对象:

public QuorumCnxManager createCnxnManager() {
    // 默认tickTime * syncLimit
    // 按照zoo_sample.cfg文件配置是2000 * 5
    int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
    return new QuorumCnxManager(
        this,
        this.getMyId(),
        this.getView(), // serverId->quorumServer
        this.authServer,
        this.authLearner,
        timeout,
        this.getQuorumListenOnAllIPs(), // 是否监听所有IP默认false
        this.quorumCnxnThreadsSize, // 默认20
        this.isQuorumSaslAuthEnabled());
}

QuorumCnxManager类:

This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
  1. 维护leader选举时server之间的tcp连接
  2. 确保两个server之间存在一个连接,如果两个server同时建立连接,则始终保留id大的一方建立的连接
  3. 队列缓存待发送的消息

FastLeaderElection类:

  • 使用TCP实现leader选举
  • 使用QuorumCnxManager管理连接
  • 某些参数可以改变选举行为,比如finalizeWait参数决定leader确定之前需要等待的时间

启动线程

QuorumPeer继承了ZooKeeperThread类,最后会使用super.start()启动线程。run方法while循环,根据当前的ServerState执行不同的逻辑。

启动单机版服务

启动入口

在QuorumPeerMain的initializeAndRun阶段:

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    // 启动单机版服务
    ZooKeeperServerMain.main(args);
}

ZooKeeperServerMain.main方法:

ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
    main.initializeAndRun(args);
}
// 略

initializeAndRun方法:

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    // 略

    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]); // args[0]是配置文件
    } else {
        config.parse(args); // args = {clientPortAddress, dataDir, tickTime, maxClientCnxns}
    }

    runFromConfig(config);
}

启动流程

  1. 创建FileTxnSnapLog对象
  2. 创建ZooKeeperServer对象
  3. 创建并启动AdminServer组件
  4. 创建并启动cnxnFactory和secureCnxnFactory用于接受客户端连接、处理客户端请求,会启动ZooKeeperServer、ZXDatabase等核心组件
  5. 创建并启动ContainerManager组件

标签:03,配置,int,zookeeper,protected,线程,new,config,源码
From: https://www.cnblogs.com/xugf/p/17797584.html

相关文章

  • CF1889C2. Doremy's Drying Plan (Hard Version)
    容易想到dp:设\(dp_{i,p}\)表示前\(i\)天,强制第\(i\)天dry,并且一共消除了\(p\)个区间的答案。转移时可以考虑枚举前面的决策\(j\),此时有转移方程:\[dp_{i,p}=\max(dp_{j,p-w})+1\]其中\(w\)为满足\(l\in(j,i],r\in[i,n]\)的区间\([l,r]\)个数。显然可以考虑套......
  • CF1889B. Doremy's Connecting Plan
    一开始不会先跳C了!差点满盘皆输!设\(i<j\),则\(i,j\)合并可以看作\(a_i\leftarrowa_i+a_j\)后删掉\(j\)!此时和初始局面本质相同!所以不妨先只看初始局面!不等式右侧和下标有关!显然若右侧\(i,j\)中只要有一个是\(1\),就会让右侧的值大幅减小!设\(1\)和\(i\)合并!则需满......
  • java中native源码查找方法
    以Object的hashCode()方法为例:1.下载openjdk源码或从github中查找,这里以github中查找为例;2.GitHub中查找https://github.com/bpupadhyaya/openjdk-8/tree/master/hotspot源码;3.搜索到Object.c源码文件,并查找hashCode字眼,如下所示: 4.由上可知,hashCode方法实际是调用的jvm.c......
  • CS61A hw03 make_anoymous_factorial()
    CS61Ahw03make_anoymous_factorial()自问自答&写在前面​ 写这些是因为这道练习没写出来,刚开始看到官方的solution也没看明白,通过从答案反推之后,有了一些对lambda表达式的一些理解,在此分享,观看之前还是希望经过自己思考之后再看,毕竟聪明的你都来学cs61a了,应该已经学会独立思考......
  • Could not resolve placeholder '' in value "${}"
    Couldnotresolveplaceholder''invalue"${}"背景用Jenkins构建maven自动化打包时,因为需要从properties文件读取参数来区分是本地Debug版还是正式上线版,配置完就不能用了。FailedtoloadApplicationContext原因两个错误都出现,因为本来是用IDEA的默认配置的resources文......
  • 基于SpringBoot框架的教学评价系统-计算机毕业设计源码+LW文档
    摘要随着时代的发展,我国的教育水平在不断的提高,但是很多时候为了更好的提高教学的质量,会让学生对当前的教学进行评价,教育工作者根据学生的评价发现当下教学中的一些不足,从而更好的提高教学质量,为了让教学评价变的更加的方便我们开发了本次的教学评价系统。本系统从用户的角度出......
  • 基于ssm的奖学金评定系统-计算机毕业设计源码+LW文档
    摘 要 随着时代的发展,我国高校的信息化程度也越来越高,很多高校都实现了数据的信息化管理,但是在很多高校还是使用传统人工的方式在对奖学金评定相关内容进行管理,为了提高高校的整体奖学金评定管理水平,为此我开发了本基于SSM框架的奖学金评定系统管理网站本基于SSM框架的奖学金......
  • 基于Vue2+elementUI的二手书管理系统-计算机毕业设计源码+LW文档
    摘 要本设计完成了基于Vue2+elementUI的二手书管理系统的设计与实现。现代移动化网络发展下,不同于以往的短信、邮件、收音机传递信息,网页是向用户传输信息的主要媒介之一。书籍也是向人们传递信息和知识的媒介,如今书籍印刷和出版的快速发展,以及社会文化水平的进步,越来越多的读书......
  • MT4源码,MT4安卓,MT4苹果,MT4CRM源码,MT5CRM源码
    MT4的服务端部署后,安卓苹果手机进行连接,并且连接CRM,组成一个完整的系统。整个流程是先部署MT4服务端,然后通过admin配置修改参数,用manager创建经理账户,然后对接CRM系统进行测试,并且下载安卓手机端测试服务端是否可以正常运行,这个我发一个测试安卓版MetaTrader4管理员-用户指南远程......
  • 初看vue3源码
    因为工作的原因又回到了vue的领域,需要加深对vue和vue生态的了解也许平时比较多人手机看别人解析怎么看vue源码的,自己动手看vue源码的还是比较少,这次我想自己动手看看首先吧代码获取到本地跑起来vue仓库地址https://github.com/vuejs/vue开发环境搭建指南https://github.com/......