首页 > 编程语言 >zookeeper源码(07)leader、follower和observer

zookeeper源码(07)leader、follower和observer

时间:2024-01-30 13:11:07浏览次数:30  
标签:qp 07 observer zk self long 源码 new Leader

Leader

构造方法

public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
    this.self = self;
    this.proposalStats = new BufferStats();

    // 获取节点间通信地址
    Set<InetSocketAddress> addresses;
    if (self.getQuorumListenOnAllIPs()) {
        addresses = self.getQuorumAddress().getWildcardAddresses();
    } else {
        addresses = self.getQuorumAddress().getAllAddresses();
    }

    // 创建ServerSocket并bind地址,add到serverSockets集,启动LearnerCnxAcceptor时使用
    addresses.stream()
      .map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum()))
      .filter(Optional::isPresent)
      .map(Optional::get)
      .forEach(serverSockets::add);

    this.zk = zk;
}

lead方法

QuorumPeer使用lead方法启动leader节点,从lead方法入手分析leader流程并分析重要的方法:

void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        self.tick.set(0);
        // 使用ZooKeeperServer的loadData方法加载db数据
        // 加载数据、清理session、生成快照(takeSnapshot)
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // 启动线程接收Learner连接,创建LearnerHandler与客户端通信
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        // 获取上一次同步最终epoch并计算本次的epoch和zxid
        long epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());
        // 设置新的zxid
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

        synchronized (this) {
            lastProposed = zk.getZxid();
        }

        newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        QuorumVerifier curQV = self.getQuorumVerifier();
        if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
            // qv.version == 0
            try {
                QuorumVerifier newQV = self.configFromString(curQV.toString());
                newQV.setVersion(zk.getZxid());
                self.setLastSeenQuorumVerifier(newQV, true);
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
            newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        // 等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch
        // follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式
        waitForEpochAck(self.getMyId(), leaderStateSummary);
        self.setCurrentEpoch(epoch); // 设置新的currentEpoch
        self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
        self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);

        try {
            // 等待follower的newLeaderAck
            waitForNewLeaderAck(self.getMyId(), zk.getZxid());
        } catch (InterruptedException e) {
            // 略
            return;
        }
        // 启动zookeeperServer
        startZkServer();

        self.setZabState(QuorumPeer.ZabState.BROADCAST);
        self.adminServer.setZooKeeperServer(zk);

        // We ping twice a tick, so we only update the tick every other iteration
        boolean tickSkip = true;
        String shutdownMessage = null;

        while (true) {
            synchronized (this) {
                long start = Time.currentElapsedTime();
                long cur = start;
                long end = start + self.tickTime / 2;
                // 等待tickTime / 2毫秒
                while (cur < end) {
                    wait(end - cur);
                    cur = Time.currentElapsedTime();
                }

                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }

                // 用来判断learner同步状态
                SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
                if (self.getLastSeenQuorumVerifier() != null &&
                    self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                    syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                }

                syncedAckSet.addAck(self.getMyId());

                // 查询learner的ack状态
                for (LearnerHandler f : getLearners()) {
                    if (f.synced()) {
                        syncedAckSet.addAck(f.getSid());
                    }
                }

                if (!this.isRunning()) { // shutdown
                    break;
                }

                // 判断超半数learner已是同步状态
                // 1个tickTime周期判断一次
                if (!tickSkip && !syncedAckSet.hasAllQuorums() &&
                    !(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) &&
                      self.getQuorumVerifier().revalidateOutstandingProp(
                          this, new ArrayList<>(outstandingProposals.values()), lastCommitted))) {
                    // Lost quorum of last committed and/or last proposed
                    shutdownMessage = "Not sufficient followers synced";
                    break;
                }
                tickSkip = !tickSkip;
            }
            // ping learner
            // 1个tickTime周期ping两次
            for (LearnerHandler f : getLearners()) {
                f.ping();
            }
        }
        if (shutdownMessage != null) {
            // leader goes in looking state
            shutdown(shutdownMessage);
        }
    } finally {
        zk.unregisterJMX(this);
    }
}

getEpochToPropose方法

获取上一次同步的最终epoch并计算zxid的值:

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized (connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch + 1; // 更新最新epoch
        }
        if (isParticipant(sid)) {
            connectingFollowers.add(sid);
        }
        QuorumVerifier verifier = self.getQuorumVerifier();
        // 连接的follower超过了半数
        if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {
            waitingForNewEpoch = false;
            self.setAcceptedEpoch(epoch); // 设置新的epoch
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            if (sid == self.getMyId()) {
                timeStartWaitForEpoch = start;
            }
            long cur = start;
            long end = start + self.getInitLimit() * self.getTickTime();
            // 等待initLimit*tickTime毫秒,如果还是waitingForNewEpoch状态抛错,会触发重新选举
            while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum");
            }
        }
        return epoch;
    }
}

waitForEpochAck方法

等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch,follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式:

public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
    synchronized (electingFollowers) {
        if (electionFinished) {
            return;
        }
        // 略
        QuorumVerifier verifier = self.getQuorumVerifier();
        if (electingFollowers.contains(self.getMyId()) && verifier.containsQuorum(electingFollowers)) {
            electionFinished = true;
            electingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit() * self.getTickTime();
            while (!electionFinished && cur < end) {
                electingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (!electionFinished) {
                throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
            }
        }
    }
}

waitForNewLeaderAck方法

等待足够数量的Leader.ACK请求上来,之后才能开始正常通信:

public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException {

    synchronized (newLeaderProposal.qvAcksetPairs) {

        if (quorumFormed) {
            return;
        }

        long currentZxid = newLeaderProposal.packet.getZxid();
        if (zxid != currentZxid) {
            LOG.error("NEWLEADER ACK from sid: {} is from a different epoch - current 0x{} received 0x{}",
                      sid, Long.toHexString(currentZxid), Long.toHexString(zxid));
            return;
        }

        // Note that addAck already checks that the learner is a PARTICIPANT.
        newLeaderProposal.addAck(sid);

        if (newLeaderProposal.hasAllQuorums()) {
            quorumFormed = true;
            newLeaderProposal.qvAcksetPairs.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit() * self.getTickTime();
            while (!quorumFormed && cur < end) {
                newLeaderProposal.qvAcksetPairs.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (!quorumFormed) {
                throw new InterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");
            }
        }
    }
}

LearnerCnxAcceptor类

启动LearnerCnxAcceptor线程:

// Start thread that waits for connection requests from new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

LearnerCnxAcceptor类:

public void run() {
    if (!stop.get() && !serverSockets.isEmpty()) {
        ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
        CountDownLatch latch = new CountDownLatch(serverSockets.size());

        // 启动LearnerCnxAcceptorHandler
        serverSockets.forEach(serverSocket ->
                executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));

        try {
            latch.await();
        } catch (InterruptedException ie) {
        } finally {
            // 关闭连接、线程池
        }
    }
}

LearnerCnxAcceptorHandler类启动监听,接受连接:

class LearnerCnxAcceptorHandler implements Runnable {
    private ServerSocket serverSocket;
    private CountDownLatch latch;

    LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) {
        this.serverSocket = serverSocket;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            while (!stop.get()) {
                acceptConnections(); // 接受连接
            }
        } catch (Exception e) {
            // 关闭
        } finally {
            latch.countDown(); // countdown到0会唤醒LearnerCnxAcceptor
        }
    }

    private void acceptConnections() throws IOException {
        Socket socket = null;
        boolean error = false;
        try {
            socket = serverSocket.accept(); // 接受客户端连接
            socket.setSoTimeout(self.tickTime * self.initLimit); // timeout
            socket.setTcpNoDelay(nodelay);

            BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
            // 封装LearnerHandler对象,与客户端通信
            LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
            fh.start();
        } catch (Exception e) {
            // 略
        } finally {
            // 略
        }
    }
}

LearnerHandler

与客户端通信。

关键字段

protected final Socket sock; // 客户端socket
// Leader对象
final LearnerMaster learnerMaster;
// 给learner的唯一标识
protected long sid = 0;
// 发送队列
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
// zxid
protected volatile long lastZxid = -1;
// 输出输入流
private BinaryInputArchive ia;
private BinaryOutputArchive oa;
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;
// learner类型 PARTICIPANT/OBSERVER
private LearnerType learnerType = LearnerType.PARTICIPANT;

run方法

  1. 接收Leader.FOLLOWERINFO或Leader.OBSERVERINFO数据包,解析type、sid等关键字段,计算newEpoch和newLeaderZxid

  2. 发送Leader.LEADERINFO数据包,包含newLeaderZxid值

  3. 读取Leader.ACKEPOCH数据包,解析对端的epoch、zxid

  4. 根据对端zxid判断是否需要同步数据、如何同步数据(txnlog/committedlog/snapshot)

    peerLastZxid = ss.getLastZxid(); // 对端最新processZxid
    
    // 同步txnlog或committedlog数据,或者返回true使用SNAP方式同步快照数据
    boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
    
    // 比对maxCommittedLog、minCommittedLog与peerLastZxid同步txnlog和committedlog数据或者使用SNAP同步数据
    // committedlog在内存里面,性能更好
    
  5. 同步txnlog和committedlog数据

    if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
        // 对端lastZxid在minCommittedLog和maxCommittedLog之间
        // 直接使用committedlog同步
        Iterator<Proposal> itr = db.getCommittedLog().iterator();
        currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
        needSnap = false;
    } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
        // 使用txnlog和committedLog同步
    
        // 默认"最新snapshot文件字节数 * 0.33"
        long sizeLimit = db.calculateTxnLogSizeLimit();
        // 从txnlog查找数据,当数据字节数大于sizeLimit将返回空集,强制使用SNAP同步
        Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
        if (txnLogItr.hasNext()) {
            // 使用txnlog同步
            currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
    
            // txnlog同步未达到minCommittedLog表示txnlog和committedLog数据存在缺失
            // 将强制使用SNAP同步
            if (currentZxid < minCommittedLog) {
                currentZxid = peerLastZxid;
                // Clear out currently queued requests and revert to sending a snapshot
                queuedPackets.clear();
                needOpPacket = true;
            } else {
                // 使用committedlog同步
                Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
                currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                needSnap = false;
            }
        }
        // 略
    }
    
  6. 启动转发功能

    // Start forwarding
    leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
    
    // 把toBeApplied数据(待commit状态)发出去
    // 添加到forwardingFollowers/observingLearners集
    
  7. 如果needSnap为true则需要发送SNAP请求让learner读取输入流加载dataTree

    long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
    // 发送SNAP请求
    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
    messageTracker.trackSent(Leader.SNAP);
    bufferedOutput.flush();
    
    // 将dataTree序列化发给learner
    learnerMaster.getZKDatabase().serializeSnapshot(oa);
    oa.writeString("BenWasHere", "signature");
    bufferedOutput.flush();
    
  8. 发送NEWLEADER请求

    if (getVersion() < 0x10000) {
        QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
        oa.writeRecord(newLeaderQP, "packet");
    } else {
        QuorumPacket newLeaderQP = new QuorumPacket(
            Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
        queuedPackets.add(newLeaderQP);
    }
    
  9. 启动sendPackets线程:从queuedPackets取消息发给learner节点

  10. 等待NEWLEADER ACK响应

qp = new QuorumPacket();
ia.readRecord(qp, "packet");

messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
    return;
}

learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
  1. 等待zookeeperServer启动完成

  2. 发送UPTODATE请求,告知follower处于最新状态,并且可以开始响应客户端

    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    
  3. 启动while循环与客户端保持通信,处理ACK、PING、REVALIDATE、REQUEST等请求

Follower

包含了follower的逻辑。

followLeader方法

the main method called by the follower to follow the leader.

void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

    long connectionTime = 0;
    boolean completedSync = false;

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        // 查找leader服务器
        QuorumServer leaderServer = findLeader();
        try {
            // 连接leader服务器
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            // 获取事务id
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            // zxid >> 32L得到epoch
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            // 与leader同步数据
            syncWithLeader(newEpochZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            long syncTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
            if (self.getObserverMasterPort() > 0) {
                // 创建ObserverMaster用来链式复制,此处不做分析
                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // 保持通信
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp); // 处理leader的数据包
            }
        } catch (Exception e) {
            // ...
        }
    } finally {
        // ...
    }
}

connectToLeader方法

protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
    this.leaderAddr = multiAddr;
    Set<InetSocketAddress> addresses;
    if (self.isMultiAddressReachabilityCheckEnabled()) {
        addresses = multiAddr.getAllReachableAddressesOrAll();
    } else {
        addresses = multiAddr.getAllAddresses();
    }
    ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
    CountDownLatch latch = new CountDownLatch(addresses.size());
    AtomicReference<Socket> socket = new AtomicReference<>(null);
    // 使用LeaderConnector异步建立连接,此处考虑到了多地址的情况
    addresses.stream()
        .map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);

    try {
        latch.await();
    } catch (InterruptedException e) {
    } finally {
        // 关闭executor
    }

    if (socket.get() == null) {
        throw new IOException("Failed connect to " + multiAddr);
    } else {
        sock = socket.get();
        sockBeingClosed.set(false);
    }
    // 认证 略
    self.authLearner.authenticate(sock, hostname);

    // 获取输入输出流
    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    // 启动发送线程,基于BlockingQueue的生产者消费者模式
    if (asyncSending) {
        startSendingThread();
    }
}

registerWithLeader方法

protected long registerWithLeader(int pktType) throws IOException {
    // 1. 先发送一个Leader.FOLLOWERINFO类型数据包:
    // Leader.FOLLOWERINFO, zxid, sid, protocolVersion, quorumVersion
    long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

    LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());

    writePacket(qp, true); // 把数据包写出去

    // 2. 读取leader的Leader.LEADERINFO数据包
    readPacket(qp);
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 解析newEpoch
    if (qp.getType() == Leader.LEADERINFO) { // 使用1.0版本协议
        leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        byte[] epochBytes = new byte[4];
        final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
        if (newEpoch > self.getAcceptedEpoch()) {
            wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
            self.setAcceptedEpoch(newEpoch); // 设置acceptEpoch
        } else if (newEpoch == self.getAcceptedEpoch()) {
            wrappedEpochBytes.putInt(-1);
        } else {
            throw new IOException("...");
        }
        // 3. 发送ACKEPOCH类型数据包:
        // 包含self.lastLoggedZxid和self.currentEpoch
        QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } else {
        // 低版本分支,略
    }
}

syncWithLeader方法

  1. 读leader数据包

    • DIFF - 表示数据已经是最新,可以直接同步新数据

    • SNAP - 将leader输入流(leader的dataTree快照数据)反序列化到zkDb

      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      
    • TRUNC - 将数据truncate到指定位置

      boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      
  2. 继续读leader数据包,leader可能使用txnlog或committedlog同步数据

  3. 同步数据并提交:

    • PROPOSAL - 提案数据会放入packetsNotCommitted集待处理

    • COMMIT/COMMITANDACTIVATE - 提交数据会放入packetsCommitted集待处理

    • INFORM/INFORMANDACTIVATE - 同上

    • NEWLEADER - leader已经停止同步数据,follower会takeSnapshot、setCurrentEpoch、将packetsNotCommitted都提交给zk、响应ACK

      // fzk.logRequest(p.hdr, p.rec, p.digest);
      
      public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
          Request request = new Request(
              hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
          request.setTxnDigest(digest);
          if ((request.zxid & 0xffffffffL) != 0) {
              pendingTxns.add(request); // 待处理的事务集
          }
          syncProcessor.processRequest(request); // 持久化磁盘
      }
      
    • UPTODATE - leader会等待足够的follower响应ACK并且确定各种组件已启动之后,发送一个UPTODATE数据包,表示follower已经处于同步状态,停止同步,跳出循环

  4. 处理packetsNotCommitted和packetsCommitted集,处理事务或写磁盘

    FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
    for (PacketInFlight p : packetsNotCommitted) {
        fzk.logRequest(p.hdr, p.rec, p.digest);
    }
    for (Long zxid : packetsCommitted) {
        fzk.commit(zxid);
    }
    
    // 使用RequestProcessor处理Request
    // 后续再详细介绍
    
  5. Observer会执行下面代码处理

    ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
    for (PacketInFlight p : packetsNotCommitted) {
        Long zxid = packetsCommitted.peekFirst();
        if (p.hdr.getZxid() != zxid) {
            // log warning message if there is no matching commit
            // old leader send outstanding proposal to observer
            continue;
        }
        packetsCommitted.remove();
        Request request = new Request(
            p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
        request.setTxnDigest(p.digest);
        ozk.commitRequest(request);
    }
    

processPacket方法

在连接建立、数据处于同步状态后,follower会阻塞读取来自leader的数据包,之后使用processPacket方法处理:

// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
    readPacket(qp);
    processPacket(qp);
}

processPacket方法:

protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x{} expected 0x{}",
                     Long.toHexString(hdr.getZxid()), Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();

        if (hdr.getType() == OpCode.reconfig) {
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
            self.setLastSeenQuorumVerifier(qv, true);
        }

        // 封装Request使用syncProcessor.processRequest(request)写磁盘
        fzk.logRequest(hdr, txn, digest);
        // 略
        break;
    case Leader.COMMIT:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        // 使用commitProcessor.commit(request)提交请求
        fzk.commit(qp.getZxid());
        // 略
        break;
    case Leader.COMMITANDACTIVATE:
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        // 略
        break;
    case Leader.UPTODATE:
        // 正常情况下主从复制数据不应该出现这种类型数据包
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

Observer

observeLeader方法

void observeLeader() throws Exception {
    zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
    long connectTime = 0;
    boolean completedSync = false;
    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        // 获取leader或一个observerMaster服务器
        QuorumServer master = findLearnerMaster();
        try {
            // 连接leader或observerMaster
            connectToLeader(master.addr, master.hostname);
            connectTime = System.currentTimeMillis();
            long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }

            final long startTime = Time.currentElapsedTime();
            self.setLeaderAddressAndId(master.addr, master.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            syncWithLeader(newLeaderZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            final long syncTime = Time.currentElapsedTime() - startTime;
            ServerMetrics.getMetrics().OBSERVER_SYNC_TIME.add(syncTime);
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning() && nextLearnerMaster.get() == null) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            closeSocket();
            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        currentLearnerMaster = null;
        zk.unregisterJMX(this);
        if (connectTime != 0) {
            long connectionDuration = System.currentTimeMillis() - connectTime;
            messageTracker.dumpToLog(leaderAddr.toString());
        }
    }
}

processPacket方法

protected void processPacket(QuorumPacket qp) throws Exception {
    TxnLogEntry logEntry;
    TxnHeader hdr;
    TxnDigest digest;
    Record txn;
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        LOG.warn("Ignoring proposal");
        break;
    case Leader.COMMIT:
        LOG.warn("Ignoring commit");
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        ((ObserverZooKeeperServer) zk).sync();
        break;
    case Leader.INFORM:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        logEntry = SerializeUtils.deserializeTxn(qp.getData());
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
        request.setTxnDigest(digest);
        ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
        obs.commitRequest(request); // 提交
        break;
    case Leader.INFORMANDACTIVATE:
        // reconfig功能使用
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

Leader与Follower通信总结

Leader                                                                           Follower
                     FOLLOWERINFO/OBSERVERINFO数据包发送acceptEpoch
         <-------------------------------------------------------------------
                             leader计算newEpoch、newZxid

                             LEADERINFO数据包发送最新的zxid
         ------------------------------------------------------------------->
                               follower接受newEpoch

                     ACKEPOCH数据包发送lastLoggedZxid、currentEpoch
         <------------------------------------------------------------------
                                leader确定数据同步方式

                           DIFF/TRUNC/SNAP或者同步数据(loop)
         ------------------------------------------------------------------->

                                    NEWLEADER数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------

                                    UPTODATE数据包
         ------------------------------------------------------------------->


                                    PROPOSAL数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------

                                     COMMIT数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------


标签:qp,07,observer,zk,self,long,源码,new,Leader
From: https://www.cnblogs.com/xugf/p/17996887

相关文章

  • A025 《极限挑战》编程 源码
    一、课程介绍本节课将利用所学习的知识,制作一个空投物资的动画效果。二、重难点解析whileTruewhileTrue:...当while循环中的代码执行到最后一行后,又会跳转到while循环处开始重新执行下一次循环。获取画笔坐标通过xcor()可以获取到画笔的x坐标值,通过ycor()可以获取......
  • 洛谷题单指南-排序-P2676 [USACO07DEC] Bookshelf B
    原题链接:https://www.luogu.com.cn/problem/P2676题意解读:要使能够到书架顶的牛数量最少,优先选高的牛即可,直到总身高超过书架高度,简单的排序+贪心,下面给出代码。100分代码:#include<bits/stdc++.h>usingnamespacestd;constintN=20005;inth[N];intn,b;intmain......
  • 龙哥量化:通达信(副图)打板专家技术指标源码公式源码选股公式源码
    如果您需要代写公式,请联系我。龙哥QQ:591438821龙哥微信:Long622889新建一个副图公式,放在副图看信号新建一个条件选股公式,用来选股,都用这个源码 X_1:=MA(CLOSE,20);X_2:=HHV(X_1,5);X_3:=X_1-(X_2-X_1);X_4:=MA(CLOSE,60);XG:CLOSE>REF(CLOSE,1)ANDMA(CLOSE,5)>MA(CLOSE,......
  • 龙哥量化:通达信(副图)稳赚趋势策略技术指标源码公式源码
    如果您需要代写公式,请联系我。龙哥QQ:591438821龙哥微信:Long622889介绍:主力线上穿操作线之后,主力线以45°角上穿分水岭,同时操作线也以45°角向上运行,这中情况下涨停概率在90%以上,此时买入 VAR1:=(EMA(C,12)-EMA(C,26))*100+50;VAR2:=EMA(VAR1,9);VAR3:=((HHV(H,21)-C)/(......
  • 龙哥量化:通达信(副图)买在起涨点技术指标源码公式源码
    如果您需要代写公式,请联系我。龙哥QQ:591438821龙哥微信:Long622889黄金点三个时买入,蓝点出时毫不犹豫出半仓,加之和布林轨结合用,站稳中轨持半仓,直到第二波蓝点出就全止盈。出第一蓝点时同时破中轨肯定是大阴线就全止盈出局。VAR1:=1090630;喜来财:DRAWNULL,NODRAW;珍珠点:IF(DA......
  • 初中英语优秀范文100篇-073Pen friend-笔友
    PDF格式公众号回复关键字:SHCZFW073记忆树1Itgoeswithoutsayingthatfriendscarrymuchweightinourlife.翻译不言而喻,朋友在我们的生活中占有很重要的地位。简化记忆生活句子结构It(主语)+goes(谓语)+withoutsaying(介词短语作状语,表示“不言而喻”),使用了一般......
  • ThreadPoolExecutor源码阅读
    目录简介继承结构ExecutorExecutorServiceAbstractExecutorServiceExecutorCompletionService线程池配置代码分析成员变量方法总结参考链接本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏......
  • 通达信蒙氏剑鞘线主图指标公式源码
    {波峰}TY:=H;A1:=REF(TY,10)=HHV(TY,2*10+1);B1:=FILTER(A1,10);C1:=BacKSET(B1,10+1);HD:=FILTER(C1,10);A2:=REF(TY,10)=LLV(TY,2*10+1);B2:=FILTER(A2,10);C002:=BACKSET(B2,10+1);LD:=FILTER(C002,10);波峰:REF(H,BArslAst(HD)),COLOR00808F,POINTDOT,LINETHICK4;S......
  • 通达信黄金VIP量比指标公式源码副图
    T1:=20;T2:=60;T3:=120;LONG1:IF(C>Ema(C,T1),1,0),COLORWHITE;LONG2:IF(C>EMA(C,T2),1,0),COLORWHITE;LONG3:IF(C>EMA(C,T3),1,0),COLORWHITE;VAR0:=(WINNER(C*0.9))*100;stICKLINE(1,0,VAR0,3,0),COLOR000077;STICKLINE(1,0,VAR0,2.5,0),COLOR000099;......
  • 通达信黑马秘籍主图指标公式源码副图
    M1:MA(CLOSE,13),COLORYELLOW,LINETHICK2; M2:MA(CLOSE,34),LINETHICK2,COLORWHITE; M3:MA(CLOSE,55),COLORGREEN,LINETHICK2; A:MA(C,5)COLORBLUE,LINETHICK3; B:=A<ref(a,1); IF(B-1,A,DRAWNULL)COLORRED,LINETHICK3; DRAWTEXT(LAST(C<o,3,0)and=""......