# 原理

# 架构

  • 基本架构:
    • 运行一些 Kafka 服务器,组成集群。
    • 用户运行客户端程序,连接到 Kafka 服务器,作为 Producer 生产消息,或者作为 consumer 消费消息。

# Broker

:Kafka 服务器,负责存储、管理消息。

  • broker 会将消息以日志文件的形式存储,存放在 logs/<topic>-<partition>/ 目录下,因此会受到文件系统的影响。

  • 虽然 broker 将消息保存在磁盘中,但采取以下措施提高了读写速度:

    • 生产消息、消费消息时,是按顺序读写磁盘,因此比随机读写的速度快很多。
    • 生产消息时,先将消息写入 MMAP 内存映射文件,然后再 flush 到磁盘。因此写入速度更快。
    • 消费消息时,用 sendfile 快速发送文件。
  • Kafka 集群中会选出一个 broker 作为 Controller 。

    • 每个 broker 在启动之后,都会尝试到 zk 中创建 /controller 节点,创建成功则当选。
    • Controller 负责管理整个集群,比如在每个 partition 的所有副本中选出一个作为 leader replica 。
    • 增加 broker 的数量,就可以对 Kafka 集群横向扩容。

# Producer

:生产者,即生产消息的客户端,负责发布消息到 broker 。

  • Producer 向某个 topic 发布消息时,默认会将消息随机分配到不同的 partition 。
    • 可以指定 partition ,也可以指定均衡策略来自动分配 partition 。

# Consumer

:消费者,即消费消息的客户端,负责从 broker 消费消息。

  • consumer 消费消息的步骤:
    1. 发送请求到 broker ,订阅一个或多个 topic 。
    2. 定时发送请求到 broker ,轮询自己订阅的 topic 。如果存在可消费的消息,则拉取到本机。
  • consumer 消费消息的过程类似于下载文件,消息被 "消费" 之后并不会被删除,除非超过 broker 的存储限制。

# Consumer Group

:消费者组,用于在逻辑上对 consumer 分组管理。

  • 客户端运行 consumer 实例时,可以指定其所属的 consumer group 。
    • 如果不指定,则该 consumer 的 group 为空,不会被 Coordinator 协调。
  • 一个 consumer 同时只能消费一个 partition ,因此通常用一组 consumer 同时消费一个 topic 下的不同 partition ,通过并行消费来提高消费速度。
    • 当一个 group 消费一个 topic 时,如果 partition 的数量小于 consumer 的数量,就会有 consumer 空闲。
      • 因此,最好将 partition 的数量设置成与 consumer 数量相同,或者为 consumer 数量的整数倍,以便于平均分配。
    • 不同 group 之间相互独立,即使同时消费同一个 topic 下的同一个 partition 也互不影响。

# Coordinator

  • Kafka 会根据 consumer group ID 的哈希值,分配 topic: __consumer_offsets 中的一个 partition ,用于存储该 group 的 offset 信息。

    • 该 partition 的 leader replica 所在的 broker 运行的 GroupCoordinator 类,负责管理该 group 的 consumer、offset 。
  • Coordinator 管理的 consumer group 分为多个状态(group state):

    PreparingRebalance    # 准备开始 Rebalance
    CompletingRebalance   # 即将完成 Rebalance ,正在发送分配方案
    Stable                # 已完成 Rebalance ,可以正常消费
    Empty                 # 组内没有成员
    Dead                  # 组内没有成员,且 offset 等 metadata 已删除,不能响应请求
    
  • consumer 启动时,如果指定了所属的 consumer group ,则会发送带有 UNKNOWN_MEMBER_ID 标志的 JoinGroup 请求给 Coordinator ,请求加入指定的 group ,并请求分配 member id 。

    • GroupCoordinator 会给每个 consumer 分配一个 member id ,格式为 client.id-UUID ,其中 UUID 为随机生成的十六进制编号。
      • 第一个加入 group 的 consumer 会担任组长(Group Leader)。
      • 每个 consumer 会定时向 Coordinator 发送 Heartbeat 请求,以维持在线。否则 Coordinator 会从该 group 删除该 consumer 。
    • 组长负责分配各个 consumer 消费的 partition ,该过程称为 Consumer Rebalance 。流程如下:
      1. 组长从 Coordinator 获取该组的 consumer 列表,分配各个 consumer 消费的 partition 。
      2. 组长发送 SyncGroup 请求给 Coordinator ,告知分配方案。
      3. Coordinator 等收到 consumer 的 Heartbeat 请求时,在响应中告知已发生 Rebalance 。
      4. consumer 删掉内存中的 UUID 等成员信息,重新加入该 group ,进入新的 generation 。
    • 当 group 的 consumer 或 partition 数量变化时,都会自动触发一次 Rebalance 。
    • 每次 Rebalance 时,group 就开始一个新时代(generation)。
      • 每个 generation 拥有一个从 0 递增的编号。
  • 日志示例:

    # broker 1 的 Coordinator 被分配了任务,开始管理 __consumer_offsets partition 对应的 consumer group
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 28
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 1
    # broker 1 的 Coordinator 被取消分配,停止管理一些 consumer group
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 3
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 0
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 24
    
    # 一个成员加入 stable 状态的 consumer group ,被分配了 member id
    INFO	[GroupCoordinator 1]: Dynamic Member with unknown member id joins group test_group_1 in Stable state. Created a new member id consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 for this member and add to the group.
    # 准备开始 rebalance ,原因是加入一个新成员,其 group instance id 为 None ,说明不是 Static Member
    INFO	[GroupCoordinator 1]: Preparing to rebalance group test_group_1 in state PreparingRebalance with old generation 38 (__consumer_offsets-21) (reason: Adding new member consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 with group instance id None)
    # group 进入 stable 状态,开始一个新 generation ,拥有 3 个成员
    INFO	[GroupCoordinator 1]: Stabilized group test_group_1 generation 39 (__consumer_offsets-21) with 3 members
    # Coordinator 收到 leader 发来的分配方案
    INFO	[GroupCoordinator 1]: Assignment received from leader for group test_group_1 for generation 39. The group has 3 members, 0 of which are static.
    
    • Preparing to rebalance 的几种 reason 示例:
      Adding new member $memberId with group instance id $groupInstanceId     # 加入一个新成员
      removing member $memberId on LeaveGroup                                 # 一个成员发出 LeaveGroup 请求,主动离开 group
      removing member $memberId on heartbeat expiration                       # 一个成员因为 Heartbeat 超时,被移出 group
      
  • Rebalance 期间,所有 consumer 都要暂停消费,开销较大。因此应该尽量避免触发 Rebalance ,比如重启 consumer 时。

    • consumer 重启之后,会发送 JoinGroup 请求重新加入 group ,被分配一个新的 member id , 触发一次 Rebalance 。
      • 而旧的 member id 不再使用,等到 Heartbeat 超时,又会触发一次 Rebalance 。
    • Kafka v2.3 开始,consumer 增加了配置参数 group.instance.id 。启用该参数时,consumer 会从默认的 Dynamic Member 变成 Static Member 类型。
      • consumer 重启之后发送 JoinGroup 请求时,Coordinator 会识别出它是 Static Member ,会分配一个新 UUID ,并删除之前的 member id 。因此不会触发 Rebalance ,除非 Heartbeat 超时。
      • 日志示例:
        INFO	[GroupCoordinator 1]: Static member Some(static_member_1) of group test_group_1 with unknown member id rejoins, assigning new member id static_member_1-cdf1c4ea-2f1c-4f4d-bc46-bf443e5f7322, while old member id static_member_1-8b5d89b3-0757-4441-aeaa-50e7f9f55cee will be removed.
        INFO	[GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.
        

# Zookeeper

  • Kafka 采用 Zookeeper 作为分布式框架,记录 broker、topic、consumer 等信息。
    • 每个 broker 启动时,会到 zk 记录自己的 IP 和端口,供其它 broker 发现和访问。
    • 根据 broker.id 识别每个 broker ,即使 IP 和端口变化也会自动发现。
    • Kafka 集群第一次启动时会生成一个随机的 cluster ID ,保存到 zk 中。
      • broker 每次连接 zk 时都会检查 cluster ID 是否一致,因此一个 zk 集群不能供多个 Kafka 集群使用。
  • Kafka 在 zk 中使用的 znode 示例:
    /
    ├── admin               # 用于传递一些管理命令
    │   └── delete_topics   # 默认为 null
    ├── cluster
    │   └── id              # 记录 Kafka 集群的 id
    ├── brokers
    │   ├── ids             # 记录 broker 的信息。各个 broker 会在其下创建临时节点,当 broker 下线时会被自动删除
    │   ├── seqid
    │   └── topics          # 记录 topic 的信息
    ├── config
    ├── consumers
    ├── controller          # 记录当前的 controller ,这是一个临时节点
    ├── controller_epoch    # 记录 controller 当选的 epoch
    ...
    
    • 例:
      [zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1
      {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":-1,"features":{},"host":"10.0.0.1","timestamp":"1627960927890","port":9092,"version":5}
      [zk: localhost:2181(CONNECTED) 1] get /brokers/topics/__consumer_offsets/partitions/0/state
      {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,3,1]}
      [zk: localhost:2181(CONNECTED) 3] get /controller
      {"version":1,"brokerid":1,"timestamp":"1627960928034"}
      

# 消息

  • Kafka 处理数据的基本单位为消息(message),又称为 record 。
  • 消息采用 Kafka 自定的格式封装,类似于 TCP 报文。

# Topic

:主题,用于在逻辑上对消息分组管理。

  • 不同 topic 之间相互独立,它们的消息互不影响。

# Partition

:分区。

  • topic 在存储时会拆分成一个或多个 partition 。

    • 每个 partition 可以存储多个副本。
    • partition 存储到磁盘时,每隔一定时间或大小就划分出一个日志段(LogSegment)文件。
  • broker 每收到一条新消息时,先看它属于哪个 topic ,然后考虑分配到哪个 partition 中存储。

    • Kafka 会尽量将同一 topic 的各个 partition 存储到不同的 broker 上,从而分散负载。
    • Kafka 会尽量将同一 partition 的各个 replica 存储到不同的 broker 上,从而抵抗单点故障。
    • 客户端只需要连接 broker 就能生产、消费消息,不需要关心消息的实际存储位置。
  • Kafka 数据目录的结构如下:

    /data/kafka-logs/                     # 数据日志的存储目录,其下每个子目录的命名格式为 <topic>-<partition id>
    ├── test-0/                           # 该目录用于存储 test 主题的 0 号分区
    │   ├── 00000000000000000000.index    # 第一个 LogSegment 的索引文件,其中第一个消息的 offset 为 0
    │   ├── 00000000000000000000.log      # 第一个 LogSegment 的数据日志文件
    │   ├── 00000000000000367814.index    # 第二个 LogSegment 的索引文件,其中第一个消息的 offset 为 367814
    │   ├── 00000000000000367814.log
    │   ├── leader-epoch-checkpoint       # 记录每一任 leader 当选时的 epoch 和 offset
    │   └── partition.metadata
    ├── test-2/
    └── test-3/
    
    • partition 采用稀疏索引,避免单个索引文件的体积过大。读取某个 offset 的消息时,需要先找到它对应的 LogSegment ,再根据该 LogSegment 的 index 文件找到消息。

# Replica

  • 每个 partition 会存储多个副本(replica),从而备份数据。

  • Kafka 会自动在每个 partition 的所有副本中选出一个作为 leader replica ,而其它副本称为 follower replica

    • leader 可以进行读写操作,负责处理客户端的访问请求。
    • follower 只能进行读操作,负责与 leader 的数据保持一致,从而备份数据。
    • 客户端读写 partition 时看到的总是 leader ,看不到 follower 。
  • Assigned Replicas

    • :指一个 partition 拥有的所有 replica 。
  • Preferred replica

    • :指 assigned replicas 中的第一个 replica 。
    • 新建一个 partition 时,一般由 preferred replica 担任 leader replica 。
      • 当所在的 broker 故障时,会选出其它 replica 担任 leader 。
      • 当 preferred replica 恢复时,会担任普通的 replica 。但 kafka 会自动尝试让 preferred replica 重新担任 leader ,该过程称为 preferred-replica-election、Partition Rebalance 。
  • In Sync Replicas (ISR)

    • :指一个 partition 中与 leader 保持同步的所有 replica 。
    • 如果一个 follower 的滞后时间超过 replica.lag.time.max.ms ,或者 leader 连续这么长时间没有收到该 follower 的 fetch 请求,则认为它失去同步,从 ISR 中移除。
      • 例如:IO 速度过慢,使得 follower 从 leader 复制数据的速度,比 leader 新增数据的速度慢,就会导致 lastCaughtUpTimeMs 一直没有更新,最终失去同步。
    • leader 本身也属于 ISR 。
    • 只有 ISR 中的 replica 可以被选举为 leader 。
  • Under-replicated Replicas Set

    • :指一个 partition 中与 leader 没有同步的 replica 。
    • 当一个 follower 将 leader 的最后一条消息(Log End Offset)之前的日志全部成功复制之后,则认为该 follower 已经赶上了 leader ,记录此时的时刻作为该 follower 的 lastCaughtUpTimeMs
    • Kafka 的 ReplicaManager 会定期计算每个 follower 的 lastCaughtUpTimeMs 与当前时刻的差值,作为该 follower 对 leader 的滞后时间。

# Offset

  • partition 中存储的每个消息都有一个唯一的偏移量(offset),用于索引。

    • offset 的值采用 Long 型变量存储,容量为 64 bit 。
    • 生产者生产消息时、消费者消费消息时,offset 都会自动递增。
      • 因此,partition 类似于先入先出的队列,先生产的消息会先被消费。
  • Log Start Offset

    • :partition 中第一个消息的偏移量。刚创建一个 topic 时,该值为 0 。每次 broker 清理消息日志之后,该值会增大一截。
  • Log End Offset (LEO)

    • :partition 中最新一个消息的偏移量。
  • High Watemark (HW)

    • :partition 允许被 consumer 看到的最高偏移量。
    • partition 的 leader 新增一个消息时,会更新 LEO 的值,并传给 follower 进行同步。因此 HW 的值总是小于等于 LEO 。
    • consumer 只能看到 HW ,不知道 LEO 的值。
  • Consumer Committed Offset

    • :某个 consumer 在某个 partition 中最后一次消费的消息的偏移量。
    • 它由 Coordinator 记录,可以保证在 Rebalance 之后 consumer 不会重复消费。
    • Kafka 能确保一个消息成功生产,但不能确保消息被消费,需要客户端主动更新 Committed Offset 。
  • Consumer Current Offset

    • :某个 consumer 在某个 partition 中下一次希望消费的消息的偏移量。
    • 它由 consumer 自己记录,可以保证在多次调用 poll() 方法时不会重复消费。
      • 如果记录的 offset 偏小,就会重复消费。如果记录的 offset 偏大,就会遗漏消费。
  • Consumer Lag

    • :consumer 在消费某个 partition 时的滞后量,即还有多少个消息未消费。
    • 它的值等于 HW - Consumer Committed Offset