# 原理

# 架构

  • zk server 可以只部署单实例,也可以部署多个实例,组成集群。

    • 每个 server 都拥有整个集群的数据副本,客户端连接到任一 server 即可访问集群。
  • 部署多个 zk server 时,需要至少 Quorum 数量的节点正常工作,集群才可用。

    • 部署的 server 数量建议为奇数,为偶数时并不会提高可用性。
    • 部署 1 个 server 时,不能组成集群,只能工作在 standalone 模式。
    • 部署 2 个 server 时,能组成一个最小的 zk 集群,但存在单点故障的风险。
      • 任一 server 故障时,剩下的 server 不超过集群的半数,不能投票决策。
    • 部署 3 个 server 时,组成的 zk 集群最多允许 1 个 server 故障。
    • 部署 4 个 server 时,组成的 zk 集群也是最多允许 1 个 server 故障。
  • zk 集群中的 server 分为三种角色:

    • leader
      • :领导者,有权发出提议(Proposal)、投票。
      • 提议通过之后,就会更新集群的数据。
      • 可以处理客户端的读、写请求。
    • follower
      • :跟随者,有权投票。
      • 会复制 leader 的数据到本机,因此可以处理客户端的读请求。
      • 收到客户端的写请求时,会转发给 leader ,相当于反向代理。
    • observer
      • :观察者。
      • 无权投票,只能听从投票结果,因此不会影响集群的可用性,可以允许故障。
      • 可以像 follower 一样接收客户端的读写请求,因此能提高集群的并发量。
      • leader、follower 是 zk 集群在选举时自动分配的角色,而 observer 是由用户指定的可选角色。

# 事务

  • 每个事务拥有一个十六进制编号,称为 zxid 。
    • zxid 用 long 型变量存储,长度为 64 位。
    • zxid 的前 32 位用于记录当前的 epoch 编号,后 32 位用于记录这是当前 epoch 中的第几个事务,从 0 开始递增。例如 zxid=0x2000004b2 。
  • 当事务日志达到一定数量时,zk 会将当前全部 znode 的数据保存为磁盘中的一个快照文件,然后创建新的事务日志文件。
    • 创建快照文件、事务日志文件时,采用当前最新一个 zxid 作为文件后缀名。
    • 快照文件、事务日志文件默认一直不会删除,因此可以让 zk 数据回滚到历史状态。
  • 保存快照的过程中,如果新产生了事务,也会被加入快照。
    • 此时事务日志文件的 zxid 会略大于快照文件的 zxid 。
    • 该快照称为模糊快照,不过 zk 依然可以根据它恢复数据。

# ZAB

  • zk 采用的分布式共识协议是 ZAB(Zookeeper Atomic Broadcast),与 Raft 协议相似。

    • 默认采用 FastLeaderElection 选举算法。选举过程很快,一般低于 1 s 。
    • Leader 的任期称为 epoch 。
      • 每个 epoch 拥有一个从 0 开始递增的数字编号。
  • ZAB 协议为 zk server 划分了多个运行阶段:

    • 启动:每个 server 在启动时,不知道自己的角色,需要开始选举,选出 leader ,或者发现已有的 leader 。
    • election :选举
    • discovery :发现领导者
    • synchronization :同步跟随者
    • broadcast :广播事务

# election

  1. 有权投票的 server 进入 LOOKING 竞选状态。向其它 server 发送自己的投票,包含以下信息:
    • epoch :本轮投票的编号。
    • zxid :提议的 leader 的最新 zxid 。
    • leader :提议的 leader 的 ID 。默认推举自己。
  2. 每个 server 收到其它 server 的投票时,会与自己的投票进行对比。
    • 依次比较 epoch、zxid、leader 三种值的大小,取值越大则投票的权重越高。
    • 如果自己投票的权重更大,则发送给对方 server 。否则采用对方的投票作为自己的新投票。
    • 这样会尽量推举拥有最新事务的 server 作为 leader 。
  3. 重复 1-2 步,直到一个 server 发现有 Quorum 数量的 server 推举自己为 leader ,则将 epoch 加 1 ,向所有 server 发送自己成为 leader 的消息。
  4. 投票成功。各个 server 被分配了角色,进入 LEADING、FOLLOWING 或 OBSERVING 状态。
    • 但此时 follower 不一定能访问到 leader ,也尚未同步数据,要等到 broadcast 阶段才能供客户端访问。

# discovery

  1. follower 连接到 leader ,发送 FOLLOWERINFO 消息。
  2. 等 Quorum 数量的 follower 建立连接之后,leader 停止接受新连接。并向已有的 follower 发送 LEADERINFO(e) 消息,提议开始一个新的 epoch ,其 e 值大于这些 follower 的 acceptedEpoch 。
    • 统计的 Quorum 数量包括了 leader、follower ,不包括 observer 。
  3. follower 收到 LEADERINFO(e) 消息,进行判断:
    • 如果 e > acceptedEpoch ,则设置 acceptedEpoch = e ,并回复 ACKEPOCH(e) ,表示接受新的 epoch 。
    • 如果 e == acceptedEpoch ,则跳到执行下一步。
    • 如果 e < acceptedEpoch ,则断开连接,重新开始选举。
  4. 等 Quorum 数量的 follower 回复 ACKEPOCH 之后,leader 开始下一阶段。
    • 所有 follower 都必须满足以下两个条件之一,否则 leader 断开连接,重新开始选举:
      follower.currentEpoch <  leader.currentEpoch
      follower.currentEpoch == leader.currentEpoch && follower.lastZxid <= leader.lastZxid
      
    • 如果上述过程失败或超时,则重新开始选举。

# synchronization

  1. follower 连接到 leader ,加入 Learner 队列。
  2. leader 给 follower 同步数据。有多种同步模式:
    • 如果 follower 落后少量事务,则 leader 发送 DIFF 消息,同步缺少的事务。
    • 如果 follower 落后太多事务,则 leader 发送 SNAP 消息,同步全部 znode 的快照。
    • 如果 follower 的 zxid 比 leader 还大,则发送 TRUNC 消息,丢弃多出的事务。
  3. 当 follower 同步完数据之后,leader 向它发送 NEWLEADER(e) 消息,提议自己作为该 epoch 的 leader 。
  4. follower 收到 NEWLEADER(e) 消息,停止同步,设置 currentEpoch = e ,然后回复 ACK 消息。
  5. 等 Quorum 数量的 follower 回复 ACK 之后,leader 正式成为该 epoch 的 leader ,向所有 follower 发送 UPTODATE 消息,表示超过 Quorum 数量的 follower 已完成同步。
  6. follower 收到 UPTODATE 消息,开始接受客户端连接。

# broadcast

  • 提交一个新事务的流程如下,类似于两阶段提交:
    1. leader 开始一个新事务,广播 PROPOSE 消息给所有 follower ,提议该事务。
    2. follower 收到 PROPOSE 消息,将事务写入磁盘的事务日志文件。然后回复 ACK(zxid) 消息给 leader ,表示直到该 zxid 的事务都已经被接受。
    3. 等 Quorum 数量的 follower 回复 ACK 之后, leader 正式提交该事务。然后广播 COMMIT(zxid) 消息给所有 follower ,表示直到该 zxid 的事务都已经被提交。
  • leader 确保超过 Quorum 数量的 follower 同步了最新事务之后,才会开始下一个事务。
    • 尚未同步的 follower 必须按 zxid 顺序执行事务,即优先执行 zxid 最小的事务,且不允许跳过事务。
    • 如果 follower 与 PROPOSE 消息的 epoch 不同,则不会接受提议,而是先经历 discovery、synchronization 阶段。
  • 如果 follower 在一定时间内没有收到 leader 的 PROPOSE 或 ping 消息,则认为 leader 故障,状态从 FOLLOWING 变为 LOOKING ,重新选举。
    • 此时原 leader 可能依然处于 LEADING 状态,可以被客户端连接,但它没有 Quorum 数量的 follower ,不能提交事务。
    • 因此,集群中可能同时存在多个 leader ,但最多只有一个 leader 能拥有 Quorum 数量的 follower ,不会发生脑裂。
  • 如果 leader 在一定时间内没有等到 Quorum 数量的 follower 回复 ACK ,则状态从 LEADING 变为 LOOKING ,重新选举。

# 源代码

  • QuorumPeer 类负责根据 ZAB 协议管理服务器,定义如下:

    public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
        public static class QuorumServer {    // 服务器对象
            public long id;
            public String hostname;
            public LearnerType type = LearnerType.PARTICIPANT;
            ...
        }
        public enum ServerState {   // 服务器的状态
            LOOKING,
            FOLLOWING,
            LEADING,
            OBSERVING
        }
        public enum ZabState {      // 服务器在 ZAB 协议中的运行阶段
            ELECTION,
            DISCOVERY,
            SYNCHRONIZATION,
            BROADCAST
        }
        public enum SyncMode {      // synchronization 阶段的同步模式
            NONE,
            DIFF,
            SNAP,
            TRUNC
        }
        public enum LearnerType {   // server 类型。Learner 类是 Followers 类和 Observers 类的父类,用于同步 leader 的数据
            PARTICIPANT,            // 参选者,在选举之后可以变成 leader 或 follower
            OBSERVER                // 观察者
        }
        public void run() {         // zk 会创建一个子线程去执行 QuorumPeer.run()
            LOG.debug("Starting quorum peer");
            try {
                while (running) {   // 循环体,根据 server 的状态执行一些操作
                    switch (getPeerState()) {
                        case LOOKING:               // LOOKING 状态要进行选举
                            LOG.info("LOOKING");
                            ...
                        case OBSERVING:
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                            ...
                        case FOLLOWING:
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                            ...
                        case LEADING:
                            LOG.info("LEADING");
                            setLeader(makeLeader(logFactory));
                            leader.lead();           // LEADING 状态则作为 leader 角色保持运行
                            ...
                    }
                }
    
            } finally {
                LOG.warn("QuorumPeer main thread exited");
                ...
            }
        }
        ...
    }
    
  • QuorumCnxManager 类负责管理服务器之间的通信。

    • 通信的消息采用以下数据结构:
      class QuorumPacket {
          int type;           // 消息的类型。例如取值为 3 时表示 ACK 类型
          long zxid;
          byte[] data;        // 消息的内容
          List<Id> authinfo;
      }
      
    • 收、发的消息会放在队列 recvQueue、queueSendMap 中,异步处理。
    • FastLeaderElection 选举算法的投票采用以下数据结构:
      public static class Notification {
          public static final int CURRENTVERSION = 0x2;
          int version;        // 投票格式的版本
      
          long sid;           // 发送该投票的 server 的 id
          QuorumPeer.ServerState state;   // 发送该投票的 server 的状态
          QuorumVerifier qv;
      
          long leader;        // 提议的 leader
          long electionEpoch; // 本轮选举的 epoch
          long peerEpoch;     // 提议的 leader 的 epoch
          long zxid;          // 提议的 leader 的最新 zxid
      }
      

# 日志

  • 在 3 节点的 zk 集群中,重启其中一个节点,日志示例如下:
    # 该 server 启动之后,默认监听 TCP 3888 端口,用于选举
    INFO  [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1065] - 1 is accepting connections now, my election bind port: /0.0.0.0:3888
    # 启动之后处于 LOOKING 状态,发起一轮投票
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@1374] - LOOKING
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):FastLeaderElection@944] - New election. My id = 1, proposed zxid=0x500000342
    # 发送一个投票给其它节点。其中 my state 表示当前节点的状态, n.* 表示 Notification 中的信息
    INFO  [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:1, n.state:LOOKING, n.leader:1, n.round:0x1, n.peerEpoch:0x5, n.zxid:0x500000342, message format version:0x2, n.config version:0x0
    # 投票时,各个 server 需要两两连接,但每对 server 之间只需建立一个 TCP 连接。因此只允许由 id 更大的 server 主动建立连接,断开其余连接
    INFO  [QuorumConnectionThread-[myid=1]-2:QuorumCnxManager@513] - Have smaller server identifier, so dropping the connection: (myId:1 --> sid:3)
    INFO  [QuorumConnectionThread-[myid=1]-1:QuorumCnxManager@513] - Have smaller server identifier, so dropping the connection: (myId:1 --> sid:2)
    INFO  [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1070] - Received connection request from /10.0.0.2:46728
    INFO  [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1070] - Received connection request from /10.0.0.3:35372
    # 收到其它 server 发来的投票。可见它们不处于 LOOKING 状态,已存在 leader
    INFO  [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:3, n.state:LEADING, n.leader:3, n.round:0xa9, n.peerEpoch:0x5, n.zxid:0x400000000, message format version:0x2, n.config version:0x0
    INFO  [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:2, n.state:FOLLOWING, n.leader:3, n.round:0xa9, n.peerEpoch:0
    x5, n.zxid:0x400000000, message format version:0x2, n.config version:0x0
    INFO  [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:3, n.state:LEADING, n.leader:3, n.round:0xa9, n.peerEpoch:0x5
    , n.zxid:0x400000000, message format version:0x2, n.config version:0x0
    ...
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@857] - Peer state changed: following
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@1456] - FOLLOWING
    ...
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Follower@75] - FOLLOWING - LEADER ELECTION TOOK - 23 MS
    # 该 server 结束选举,接着进入 discovery、synchronization 阶段
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - discovery
    INFO  [LeaderConnector-/10.0.0.3:2888:Learner$LeaderConnector@330] - Successfully connected to leader, using address: /10.0.0.3:2888
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - synchronization
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@511] - Getting a diff from the leader 0x500000342
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@868] - Peer state changed: following - synchronization - diff
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@677] - Learner received NEWLEADER message
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@661] - Learner received UPTODATE message
    INFO  [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - broadcast