# 原理
# 架构
zk server 可以只部署单实例,也可以部署多个实例,组成集群。
- 每个 server 都拥有整个集群的数据副本,客户端连接到任一 server 即可访问集群。
部署多个 zk server 时,需要至少 Quorum 数量的节点正常工作,集群才可用。
- 部署的 server 数量建议为奇数,为偶数时并不会提高可用性。
- 部署 1 个 server 时,不能组成集群,只能工作在 standalone 模式。
- 部署 2 个 server 时,能组成一个最小的 zk 集群,但存在单点故障的风险。
- 任一 server 故障时,剩下的 server 不超过集群的半数,不能投票决策。
- 部署 3 个 server 时,组成的 zk 集群最多允许 1 个 server 故障。
- 部署 4 个 server 时,组成的 zk 集群也是最多允许 1 个 server 故障。
zk 集群中的 server 分为三种角色:
- leader
- :领导者,有权发出提议(Proposal)、投票。
- 提议通过之后,就会更新集群的数据。
- 可以处理客户端的读、写请求。
- follower
- :跟随者,有权投票。
- 会复制 leader 的数据到本机,因此可以处理客户端的读请求。
- 收到客户端的写请求时,会转发给 leader ,相当于反向代理。
- observer
- :观察者。
- 无权投票,只能听从投票结果,因此不会影响集群的可用性,可以允许故障。
- 可以像 follower 一样接收客户端的读写请求,因此能提高集群的并发量。
- leader、follower 是 zk 集群在选举时自动分配的角色,而 observer 是由用户指定的可选角色。
- leader
# 事务
- 每个事务拥有一个十六进制编号,称为 zxid 。
- zxid 用 long 型变量存储,长度为 64 位。
- zxid 的前 32 位用于记录当前的 epoch 编号,后 32 位用于记录这是当前 epoch 中的第几个事务,从 0 开始递增。例如 zxid=0x2000004b2 。
- 当事务日志达到一定数量时,zk 会将当前全部 znode 的数据保存为磁盘中的一个快照文件,然后创建新的事务日志文件。
- 创建快照文件、事务日志文件时,采用当前最新一个 zxid 作为文件后缀名。
- 快照文件、事务日志文件默认一直不会删除,因此可以让 zk 数据回滚到历史状态。
- 保存快照的过程中,如果新产生了事务,也会被加入快照。
- 此时事务日志文件的 zxid 会略大于快照文件的 zxid 。
- 该快照称为模糊快照,不过 zk 依然可以根据它恢复数据。
# ZAB
zk 采用的分布式共识协议是 ZAB(Zookeeper Atomic Broadcast),与 Raft 协议相似。
- 默认采用 FastLeaderElection 选举算法。选举过程很快,一般低于 1 s 。
- Leader 的任期称为 epoch 。
- 每个 epoch 拥有一个从 0 开始递增的数字编号。
ZAB 协议为 zk server 划分了多个运行阶段:
- 启动:每个 server 在启动时,不知道自己的角色,需要开始选举,选出 leader ,或者发现已有的 leader 。
- election :选举
- discovery :发现领导者
- synchronization :同步跟随者
- broadcast :广播事务
# election
- 有权投票的 server 进入 LOOKING 竞选状态。向其它 server 发送自己的投票,包含以下信息:
- epoch :本轮投票的编号。
- zxid :提议的 leader 的最新 zxid 。
- leader :提议的 leader 的 ID 。默认推举自己。
- 每个 server 收到其它 server 的投票时,会与自己的投票进行对比。
- 依次比较 epoch、zxid、leader 三种值的大小,取值越大则投票的权重越高。
- 如果自己投票的权重更大,则发送给对方 server 。否则采用对方的投票作为自己的新投票。
- 这样会尽量推举拥有最新事务的 server 作为 leader 。
- 重复 1-2 步,直到一个 server 发现有 Quorum 数量的 server 推举自己为 leader ,则将 epoch 加 1 ,向所有 server 发送自己成为 leader 的消息。
- 投票成功。各个 server 被分配了角色,进入 LEADING、FOLLOWING 或 OBSERVING 状态。
- 但此时 follower 不一定能访问到 leader ,也尚未同步数据,要等到 broadcast 阶段才能供客户端访问。
# discovery
- follower 连接到 leader ,发送 FOLLOWERINFO 消息。
- 等 Quorum 数量的 follower 建立连接之后,leader 停止接受新连接。并向已有的 follower 发送 LEADERINFO(e) 消息,提议开始一个新的 epoch ,其 e 值大于这些 follower 的 acceptedEpoch 。
- 统计的 Quorum 数量包括了 leader、follower ,不包括 observer 。
- follower 收到 LEADERINFO(e) 消息,进行判断:
- 如果 e > acceptedEpoch ,则设置 acceptedEpoch = e ,并回复 ACKEPOCH(e) ,表示接受新的 epoch 。
- 如果 e == acceptedEpoch ,则跳到执行下一步。
- 如果 e < acceptedEpoch ,则断开连接,重新开始选举。
- 等 Quorum 数量的 follower 回复 ACKEPOCH 之后,leader 开始下一阶段。
- 所有 follower 都必须满足以下两个条件之一,否则 leader 断开连接,重新开始选举:
follower.currentEpoch < leader.currentEpoch follower.currentEpoch == leader.currentEpoch && follower.lastZxid <= leader.lastZxid
- 如果上述过程失败或超时,则重新开始选举。
- 所有 follower 都必须满足以下两个条件之一,否则 leader 断开连接,重新开始选举:
# synchronization
- follower 连接到 leader ,加入 Learner 队列。
- leader 给 follower 同步数据。有多种同步模式:
- 如果 follower 落后少量事务,则 leader 发送 DIFF 消息,同步缺少的事务。
- 如果 follower 落后太多事务,则 leader 发送 SNAP 消息,同步全部 znode 的快照。
- 如果 follower 的 zxid 比 leader 还大,则发送 TRUNC 消息,丢弃多出的事务。
- 当 follower 同步完数据之后,leader 向它发送 NEWLEADER(e) 消息,提议自己作为该 epoch 的 leader 。
- follower 收到 NEWLEADER(e) 消息,停止同步,设置 currentEpoch = e ,然后回复 ACK 消息。
- 等 Quorum 数量的 follower 回复 ACK 之后,leader 正式成为该 epoch 的 leader ,向所有 follower 发送 UPTODATE 消息,表示超过 Quorum 数量的 follower 已完成同步。
- follower 收到 UPTODATE 消息,开始接受客户端连接。
# broadcast
- 提交一个新事务的流程如下,类似于两阶段提交:
- leader 开始一个新事务,广播 PROPOSE 消息给所有 follower ,提议该事务。
- follower 收到 PROPOSE 消息,将事务写入磁盘的事务日志文件。然后回复 ACK(zxid) 消息给 leader ,表示直到该 zxid 的事务都已经被接受。
- 等 Quorum 数量的 follower 回复 ACK 之后, leader 正式提交该事务。然后广播 COMMIT(zxid) 消息给所有 follower ,表示直到该 zxid 的事务都已经被提交。
- leader 确保超过 Quorum 数量的 follower 同步了最新事务之后,才会开始下一个事务。
- 尚未同步的 follower 必须按 zxid 顺序执行事务,即优先执行 zxid 最小的事务,且不允许跳过事务。
- 如果 follower 与 PROPOSE 消息的 epoch 不同,则不会接受提议,而是先经历 discovery、synchronization 阶段。
- 如果 follower 在一定时间内没有收到 leader 的 PROPOSE 或 ping 消息,则认为 leader 故障,状态从 FOLLOWING 变为 LOOKING ,重新选举。
- 此时原 leader 可能依然处于 LEADING 状态,可以被客户端连接,但它没有 Quorum 数量的 follower ,不能提交事务。
- 因此,集群中可能同时存在多个 leader ,但最多只有一个 leader 能拥有 Quorum 数量的 follower ,不会发生脑裂。
- 如果 leader 在一定时间内没有等到 Quorum 数量的 follower 回复 ACK ,则状态从 LEADING 变为 LOOKING ,重新选举。
# 源代码
QuorumPeer 类负责根据 ZAB 协议管理服务器,定义如下:
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { public static class QuorumServer { // 服务器对象 public long id; public String hostname; public LearnerType type = LearnerType.PARTICIPANT; ... } public enum ServerState { // 服务器的状态 LOOKING, FOLLOWING, LEADING, OBSERVING } public enum ZabState { // 服务器在 ZAB 协议中的运行阶段 ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST } public enum SyncMode { // synchronization 阶段的同步模式 NONE, DIFF, SNAP, TRUNC } public enum LearnerType { // server 类型。Learner 类是 Followers 类和 Observers 类的父类,用于同步 leader 的数据 PARTICIPANT, // 参选者,在选举之后可以变成 leader 或 follower OBSERVER // 观察者 } public void run() { // zk 会创建一个子线程去执行 QuorumPeer.run() LOG.debug("Starting quorum peer"); try { while (running) { // 循环体,根据 server 的状态执行一些操作 switch (getPeerState()) { case LOOKING: // LOOKING 状态要进行选举 LOG.info("LOOKING"); ... case OBSERVING: LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); ... case FOLLOWING: LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); ... case LEADING: LOG.info("LEADING"); setLeader(makeLeader(logFactory)); leader.lead(); // LEADING 状态则作为 leader 角色保持运行 ... } } } finally { LOG.warn("QuorumPeer main thread exited"); ... } } ... }
QuorumCnxManager 类负责管理服务器之间的通信。
- 通信的消息采用以下数据结构:
class QuorumPacket { int type; // 消息的类型。例如取值为 3 时表示 ACK 类型 long zxid; byte[] data; // 消息的内容 List<Id> authinfo; }
- 收、发的消息会放在队列 recvQueue、queueSendMap 中,异步处理。
- FastLeaderElection 选举算法的投票采用以下数据结构:
public static class Notification { public static final int CURRENTVERSION = 0x2; int version; // 投票格式的版本 long sid; // 发送该投票的 server 的 id QuorumPeer.ServerState state; // 发送该投票的 server 的状态 QuorumVerifier qv; long leader; // 提议的 leader long electionEpoch; // 本轮选举的 epoch long peerEpoch; // 提议的 leader 的 epoch long zxid; // 提议的 leader 的最新 zxid }
- 通信的消息采用以下数据结构:
# 日志
- 在 3 节点的 zk 集群中,重启其中一个节点,日志示例如下:
# 该 server 启动之后,默认监听 TCP 3888 端口,用于选举 INFO [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1065] - 1 is accepting connections now, my election bind port: /0.0.0.0:3888 # 启动之后处于 LOOKING 状态,发起一轮投票 INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@1374] - LOOKING INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):FastLeaderElection@944] - New election. My id = 1, proposed zxid=0x500000342 # 发送一个投票给其它节点。其中 my state 表示当前节点的状态, n.* 表示 Notification 中的信息 INFO [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:1, n.state:LOOKING, n.leader:1, n.round:0x1, n.peerEpoch:0x5, n.zxid:0x500000342, message format version:0x2, n.config version:0x0 # 投票时,各个 server 需要两两连接,但每对 server 之间只需建立一个 TCP 连接。因此只允许由 id 更大的 server 主动建立连接,断开其余连接 INFO [QuorumConnectionThread-[myid=1]-2:QuorumCnxManager@513] - Have smaller server identifier, so dropping the connection: (myId:1 --> sid:3) INFO [QuorumConnectionThread-[myid=1]-1:QuorumCnxManager@513] - Have smaller server identifier, so dropping the connection: (myId:1 --> sid:2) INFO [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1070] - Received connection request from /10.0.0.2:46728 INFO [ListenerHandler-/0.0.0.0:3888:QuorumCnxManager$Listener$ListenerHandler@1070] - Received connection request from /10.0.0.3:35372 # 收到其它 server 发来的投票。可见它们不处于 LOOKING 状态,已存在 leader INFO [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:3, n.state:LEADING, n.leader:3, n.round:0xa9, n.peerEpoch:0x5, n.zxid:0x400000000, message format version:0x2, n.config version:0x0 INFO [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:2, n.state:FOLLOWING, n.leader:3, n.round:0xa9, n.peerEpoch:0 x5, n.zxid:0x400000000, message format version:0x2, n.config version:0x0 INFO [WorkerReceiver[myid=1]:FastLeaderElection$Messenger$WorkerReceiver@389] - Notification: my state:LOOKING; n.sid:3, n.state:LEADING, n.leader:3, n.round:0xa9, n.peerEpoch:0x5 , n.zxid:0x400000000, message format version:0x2, n.config version:0x0 ... INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@857] - Peer state changed: following INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@1456] - FOLLOWING ... INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Follower@75] - FOLLOWING - LEADER ELECTION TOOK - 23 MS # 该 server 结束选举,接着进入 discovery、synchronization 阶段 INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - discovery INFO [LeaderConnector-/10.0.0.3:2888:Learner$LeaderConnector@330] - Successfully connected to leader, using address: /10.0.0.3:2888 INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - synchronization INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@511] - Getting a diff from the leader 0x500000342 INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@868] - Peer state changed: following - synchronization - diff INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@677] - Learner received NEWLEADER message INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):Learner@661] - Learner received UPTODATE message INFO [QuorumPeer[myid=1](plain=0.0.0.0:2181)(secure=disabled):QuorumPeer@863] - Peer state changed: following - broadcast