一文讲解zookeeper-数据同步源码的详情

开课吧开课吧锤锤2021-06-17 10:07

    在上一篇对zookeeper选举实现分析之后,我们知道zookeeper集群在选举结束之后,leader节点将进入LEADING状态,follower节点将进入FOLLOWING状态;此时集群中节点将进行数据同步操作,以保证数据一致。只有数据同步完成之后zookeeper集群才具备对外提供服务的能力。

Java

    LEADING

    当节点在选举后角色确认为leader后将会进入LEADING状态,源码如下:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

    QuorumPeer在节点状态变更为LEADING之后会创建leader实例,并触发lead过程。

void lead() throws IOException, InterruptedException {
    try {
		// 省略

        /**
         * 开启线程用于接收 follower 的连接请求
         */
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;

        /**
         * 阻塞等待计算新的 epoch 值,并设置 zxid
         */
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());          
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        
        /**
         * 阻塞等待接收过半的 follower 节点发送的 ACKEPOCH 信息; 此时说明已经确定了本轮选举后 epoch 值
         */
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            /**
             * 阻塞等待 超过半数的节点 follower 发送了 NEWLEADER ACK 信息;此时说明过半的 follower 节点已经完成数据同步
             */
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            // 省略
        }

        /**
         * 启动 zk server,此时集群可以对外正式提供服务
         */
        startZkServer();

        // 省略
}

    从lead方法的实现可得知,leader与follower在数据同步过程中会执行如下过程:

    接收follower连接

    计算新的epoch值

    通知统一epoch值

    数据同步

    启动zkserver对外提供服务

    FOLLOWING

    下面在看下follower节点进入FOLLOWING状态后的操作:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                // 省略
            case OBSERVING:
                // 省略
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

    QuorumPeer在节点状态变更为FOLLOWING之后会创建follower实例,并触发followLeader过程。

void followLeader() throws InterruptedException {
    // 省略
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            /**
             * follower 与 leader 建立连接
             */
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            /**
             * follower 向 leader 提交节点信息用于计算新的 epoch 值
             */
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            
            /**
             * follower 与 leader 数据同步
             */
            syncWithLeader(newEpochZxid);                
            
             // 省略

        } catch (Exception e) {
             // 省略
        }
    } finally {
        // 省略
    }
}

    从followLeader方法的实现可得知,follower与leader在数据同步过程中会执行如下过程:

    请求连接leader

    提交节点信息计算新的epoch值

    数据同步

    下面我们看下在各个环节的实现细节;

    LeaderFollower建立通信

    follower请求连接

protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}           
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

    follower会通过选举后的投票信息确认leader节点地址,并发起连接(总共有5次尝试连接的机会,若连接不通则重新进入选举过程)

    leader接收连接

class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    public LearnerCnxAcceptor() {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
    }

    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                	/**
                	 * 接收 follower 的连接,并开启 LearnerHandler 线程用于处理二者之间的通信
                	 */
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    // 省略
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
}

    从LearnerCnxAcceptor实现可以看出leader节点在为每个follower节点连接建立之后都会为之分配一个LearnerHandler线程用于处理二者之间的通信。

    计算新的epoch值

    follower在与leader建立连接之后,会发出FOLLOWERINFO信息

long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

protected long registerWithLeader(int pktType) throws IOException{
    /**
     * 发送 follower info 信息,包括 last zxid 和 sid
     */
	long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
    /**
	 * follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
	 */
    writePacket(qp, true);
    
    // 省略
} 

    接下来我们看下leader在接收到FOLLOWERINFO信息之后做什么(参考LearnerHandler)

public void run() {
	try {
	    // 省略
	    /**
	     * leader 接收 follower 发送的 FOLLOWERINFO 信息,包括 follower 节点的 zxid,sid,protocol version
	     * @see Learner.registerWithleader()
	     */
	    QuorumPacket qp = new QuorumPacket();
	    ia.readRecord(qp, "packet");

	    byte learnerInfoData[] = qp.getData();
	    if (learnerInfoData != null) {
	    	if (learnerInfoData.length == 8) {
	    		// 省略
	    	} else {
	            /**
	             * 高版本的 learnerInfoData 包括 long 类型的 sid, int 类型的 protocol version 占用 12 字节
	             */
	    		LearnerInfo li = new LearnerInfo();
	    		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
	    		this.sid = li.getServerid();
	    		this.version = li.getProtocolVersion();
	    	}
	    }

	    /**
	     * 通过 follower 发送的 zxid,解析出 foloower 节点的 epoch 值
	     */
	    long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	    
	    long peerLastZxid;
	    StateSummary ss = null;
	    long zxid = qp.getZxid();

	    /**
	     * 阻塞等待计算新的 epoch 值
	     */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
	  
	    // 省略
	}

    从上述代码可知,leader在接收到follower发送的FOLLOWERINFO信息之后,会解析出follower节点的acceptedEpoch值并参与到新的epoch值计算中。(具体计算逻辑参考方法getEpochToPropose)

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        // epoch 用来记录计算后的选举周期值
        // follower 或 leader 的 acceptedEpoch 值与 epoch 比较;若前者大则将其加一
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch+1;
        }
        // connectingFollowers 用来记录与 leader 已连接的 follower
        connectingFollowers.add(sid);
        QuorumVerifier verifier = self.getQuorumVerifier();
        // 判断是否已计算出新的 epoch 值的条件是 leader 已经参与了 epoch 值计算,以及超过一半的节点参与了计算
        if (connectingFollowers.contains(self.getId()) && 
                                        verifier.containsQuorum(connectingFollowers)) {
            // 将 waitingForNewEpoch 设置为 false 说明不需要等待计算新的 epoch 值了
            waitingForNewEpoch = false;
            // 设置 leader 的 acceptedEpoch 值
            self.setAcceptedEpoch(epoch);
            // 唤醒 connectingFollowers wait 的线程
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // 若未完成新的 epoch 值计算则阻塞等待
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum");        
            }
        }
        return epoch;
    }
}

    从方法getEpochToPropose可知leader会收集集群中过半的followeracceptedEpoch信息后,选出一个最大值然后加1就是newEpoch值;在此过程中leader会进入阻塞状态直到过半的follower参与到计算才会进入下一阶段。

    通知新的epoch值

    leader在计算出新的newEpoch值后,会进入下一阶段发送LEADERINFO信息(同样参考LearnerHandler)

public void run() {
	try {
	    // 省略

	    /**
	     * 阻塞等待计算新的 epoch 值
	     */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            /**
             * 计算出新的 epoch 值后,leader 向 follower 发送 LEADERINFO 信息;包括新的 newEpoch
             */
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();

           	// 省略
        }
	}
	// 省略
}
protected long registerWithLeader(int pktType) throws IOException{
	// 省略

    /**
     * follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);

    /**
     * follower 接收 leader 发送的 LEADERINFO 信息
     */
    readPacket(qp);

    /**
     * 解析 leader 发送的 new epoch 值
     */        
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	if (qp.getType() == Leader.LEADERINFO) {
    	// we are connected to a 1.0 server so accept the new epoch and read the next packet
    	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    	byte epochBytes[] = new byte[4];
    	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

    	/**
    	 * new epoch > current accepted epoch 则更新 acceptedEpoch 值
    	 */
    	if (newEpoch > self.getAcceptedEpoch()) {
    		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
    		self.setAcceptedEpoch(newEpoch);
    	} else if (newEpoch == self.getAcceptedEpoch()) {   		
            wrappedEpochBytes.putInt(-1);
    	} else {
    		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
    	}

    	/**
    	 * follower 向 leader 发送 ACKEPOCH 信息,包括 last zxid
    	 */
    	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    	writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } 
} 

    从上述代码可以看出在完成newEpoch值计算后的leader与follower的交互过程:

    leader向follower发送LEADERINFO信息,告知follower新的epoch值

    follower接收解析LEADERINFO信息,若newepoch值大于currentacceptedepoch值则更新acceptedEpoch

    follower向leader发送ACKEPOCH信息,反馈leader已收到新的epoch值,并附带follower节点的lastzxid

    数据同步

    LearnerHandler中leader在收到过半的ACKEPOCH信息之后将进入数据同步阶段

public void run() {
        try {
            // 省略
            // peerLastZxid 为 follower 的 last zxid
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
           
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    /**
                     * follower 与 leader 的 zxid 相同说明 二者数据一致;同步方式为差量同步 DIFF,同步的zxid 为 peerLastZxid, 也就是不需要同步
                     */
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    // peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中间
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        /**
                         * 在遍历 proposals 时,用来记录上一个 proposal 的 zxid
                         */
                        long prevProposalZxid = minCommittedLog;

                        boolean firstPacket=true;
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // 跳过 follower 已经存在的提案
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                if (firstPacket) {
                                    firstPacket = false;
                                    if (prevProposalZxid < peerLastZxid) {
                                        /**
                                         * 此时说明有部分 proposals 提案在 leader 节点上不存在,则需告诉 follower 丢弃这部分 proposals
                                         * 也就是告诉 follower 先执行回滚 TRUNC ,需要回滚到 prevProposalZxid 处,也就是 follower 需要丢弃 prevProposalZxid ~ peerLastZxid 范围内的数据
                                         * 剩余的 proposals 则通过 DIFF 进行同步
                                         */
                                        packetToSend = Leader.TRUNC;                                        
                                        zxidToSend = prevProposalZxid;
                                        updates = zxidToSend;
                                    }
                                }

                                /**
                                 * 将剩余待 DIFF 同步的提案放入到队列中,等待发送
                                 */
                                queuePacket(propose.packet);
                                /**
                                 * 每个提案后对应一个 COMMIT 报文
                                 */
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null, null);
                                queuePacket(qcommit);
                            }
                        }
                    } else if (peerLastZxid > maxCommittedLog) {                    
                        /**
                         * follower 的 zxid 比 leader 大 ,则告诉 follower 执行 TRUNC 回滚
                         */
                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else {
                    }
                } 

            } finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                 // 数据同步完成之后会发送 NEWLEADER 信息
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            /**
             * 发送数据同步方式信息,告诉 follower 按什么方式进行数据同步
             */
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                /**
                 * 如果是全量同步的话,则将 leader 本地数据序列化写入 follower 的输出流
                 */
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            /**
             * 开启个线程执行 packet 发送
             */
            sendPackets();
            
            /**
             * 接收 follower ack 响应
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            /**
             * 阻塞等待过半的 follower ack
             */
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            /**
             * leader 向 follower 发送 UPTODATE,告知其可对外提供服务
             */
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            // 省略
        } 
    }

    从上述代码可以看出leader和follower在进行数据同步时会通过peerLastZxid与maxCommittedLog,minCommittedLog两个值比较最终决定数据同步方式。

    DIFF(差异化同步)

    follower的peerLastZxid等于leader的peerLastZxid

    此时说明follower与leader数据一致,采用DIFF方式同步,也即是无需同步

    follower的peerLastZxid介于maxCommittedLog,minCommittedLog两者之间

    此时说明follower与leader数据存在差异,需对差异的部分进行同步;首先leader会向follower发送DIFF报文告知其同步方式,随后会发送差异的提案及提案提交报文

    交互流程如下:

    Leader                 Follower

      |          DIFF         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

    示例:假设leader节点的提案缓存队列对应的zxid依次是:

 0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005

    而follower节点的peerLastZxid为0x500000003,则需要将0x500000004,0x500000005两个提案进行同步;那么数据包发送过程如下表:

Java

    TRUNC+DIFF(先回滚再差异化同步)

    在上文DIFF差异化同步时会存在一个特殊场景就是虽然follower的peerLastZxid介于maxCommittedLog,minCommittedLog两者之间,但是follower的peerLastZxid在leader节点中不存在;此时leader需告知follower先回滚到peerLastZxid的前一个zxid,回滚后再进行差异化同步。

    交互流程如下:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

    示例:假设集群中三台节点A,B,C某一时刻A为Leader选举周期为5,zxid包括:(0x500000004,0x500000005,0x500000006);假设某一时刻leaderA节点在处理完事务为0x500000007的请求进行广播时leaderA节点服务器宕机导致0x500000007该事物没有被同步出去;在集群进行下一轮选举之后B节点成为新的leader,选举周期为6对外提供服务处理了新的事务请求包括0x600000001,0x600000002;

集群节点 ZXID 列表
A 0x500000004, 0x500000005, 0x500000006, 0x500000007
B 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002
C 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002

    此时节点A在重启加入集群后,在与leaderB节点进行数据同步时会发现事务0x500000007在leader节点中并不存在,此时leader告知A需先回滚事务到0x500000006,在差异同步事务0x600000001,0x600000002;那么数据包发送过程如下表:

报文类型 ZXID
TRUNC 0x500000006
PROPOSAL 0x600000001
COMMIT 0x600000001
PROPOSAL 0x600000002
COMMIT 0x600000002

    TRUNC(回滚同步)

    若follower的peerLastZxid大于leader的maxCommittedLog,则告知follower回滚至maxCommittedLog;该场景可以认为是TRUNC+DIFF的简化模式

    交互流程如下:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
         

    SNAP(全量同步)

    若follower的peerLastZxid小于leader的minCommittedLog或者leader节点上不存在提案缓存队列时,将采用SNAP全量同步方式。该模式下leader首先会向follower发送SNAP报文,随后从内存数据库中获取全量数据序列化传输给follower,follower在接收全量数据后会进行反序列化加载到内存数据库中。

    交互流程如下:

    Leader                 Follower

      |         SNAP          |  
      | --------------------> |
      |         DATA          |  
      | --------------------> |
         

    leader在完成数据同步之后,会向follower发送NEWLEADER报文,在收到过半的follower响应的ACK之后此时说明过半的节点完成了数据同步,接下来leader会向follower发送UPTODATE报文告知follower节点可以对外提供服务了,此时leader会启动zkserver开始对外提供服务。

    FOLLOWER数据同步

    下面我们在看下数据同步阶段FOLLOWER是如何处理的,参考Learner.syncWithLeader

protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        /**
         * 接收 leader 发送的数据同步方式报文
         */
        readPacket(qp);
        
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                
            }
            else if (qp.getType() == Leader.SNAP) {
                // 执行加载全量数据
            } else if (qp.getType() == Leader.TRUNC) {
                // 执行回滚
            }
            else {
            
            }
            
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    // 处理提案
                    break;
                case Leader.COMMIT:
                    // commit proposal
                    break;
                case Leader.INFORM:
                    // 忽略
                    break;
                case Leader.UPTODATE:
                    // 设置 zk server
                    self.cnxnFactory.setZooKeeperServer(zk);
                    // 退出循环                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    /**
                     * follower 响应 NEWLEADER ACK
                     */
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        // 启动 zk server
        zk.startup();
        
    }

    从上述代码中可以看出follower在数据同步阶段的处理流程如下:

    follower接收leader发送的数据同步方式(DIFF/TRUNC/SANP)报文并进行相应处理

    当follower收到leader发送的NEWLEADER报文后,会向leader响应ACK(leader在收到过半的ACK消息之后会发送UPTODATE)

    当follower收到leader发送的UPTODATE报文后,说明此时可以对外提供服务,此时将启动zkserver

    小结

    最后用一张图总结下zk在完成选举后数据同步的过程如下图所示:

java

    以上就是由开课吧老师断风雨提供的“一文讲解zookeeper-数据同步源码的详情”一文,更多Java教程相关内容尽在开课吧广场Java教程频道!

免责声明:本站所提供的内容均来源于网友提供或网络搜集,由本站编辑整理,仅供个人研究、交流学习使用。如涉及版权问题,请联系本站管理员予以更改或删除。
有用
分享