# 原理

# 架构

  • 基本架构:
    • 运行一些 Kafka 服务器,组成集群。
    • 用户运行客户端程序,连接到 Kafka 服务器,作为 producer 生产消息,或者作为 consumer 消费消息。

# Broker

:Kafka 服务器,负责存储、管理消息。

  • Kafka 集群中会选出一个 broker 担任 Controller 。
    • 每个 broker 在启动之后,都会尝试到 zk 中创建 /controller 节点,创建成功则当选。
    • Controller 负责管理整个集群,比如在每个 partition 的所有副本中选出一个作为 leader replica 。其它 broker 跟随 Controller 的决策。
    • 增加 broker 的数量,就可以对 Kafka 集群横向扩容。

# Producer

:生产者,即生产消息的客户端,负责发布消息到 broker 。

  • producer 向某个 topic 发布消息时,默认会将消息随机分配到不同的 partition 。
    • 可以指定 partition ,也可以指定均衡策略来自动分配 partition 。

# Consumer

:消费者,即消费消息的客户端,负责从 broker 消费消息。

  • consumer 消费消息的步骤:
    1. 发送请求到 broker ,订阅一个或多个 topic 。
    2. 定时发送请求到 broker ,轮询自己订阅的 topic 。如果存在可消费的消息,则拉取到本机。
  • consumer 消费消息的过程类似于下载文件,消息被 "消费" 之后并不会被删除,除非超过 broker 的存储限制。

# Consumer Group

:消费者组,用于在逻辑上对 consumer 分组管理。

  • 客户端运行 consumer 实例时,可以指定其所属的 consumer group 。
    • 如果不指定,则该 consumer 的 group 为空,不会被 Coordinator 协调。
  • 一个 consumer 同时只能消费一个 partition ,因此通常用一组 consumer 同时消费一个 topic 下的不同 partition ,通过并行消费来提高消费速度。
    • 当一个 group 消费一个 topic 时,如果 consumer 的数量大于 partition 的数量,则部分 consumer 会空闲、不能消费。
      • 因此,最好将 partition 的数量设置成与 consumer 数量相同,或者为 consumer 数量的整数倍,以便于平均分配。
    • 不同 group 之间相互独立,即使同时消费同一个 topic 下的同一个 partition 也互不影响。

# Coordinator

  • Kafka 会根据 consumer group ID 的哈希值,分配 topic: __consumer_offsets 中的一个 partition ,用于存储该 group 的 offset 信息。

    • 该 partition 的 leader replica 所在的 broker 运行的 GroupCoordinator 类,负责管理该 group 的 consumer、offset 。
  • Coordinator 管理的 consumer group 分为多个状态(group state):

    PreparingRebalance    # 准备开始 rebalance
    CompletingRebalance   # 即将完成 rebalance ,正在发送分配方案
    Stable                # 已完成 rebalance ,可以正常消费
    Empty                 # 组内没有成员
    Dead                  # 组内没有成员,且 offset 等 metadata 已删除,不能响应请求
    
  • consumer 启动时,如果指定了所属的 consumer group ,则会发送带有 UNKNOWN_MEMBER_ID 标志的 JoinGroup 请求给 Coordinator ,请求加入指定的 group ,并请求分配 member id 。

    • GroupCoordinator 会给每个 consumer 分配一个 member id ,格式为 client.id-UUID ,其中 UUID 为随机生成的十六进制编号。
      • 第一个加入 group 的 consumer 会担任组长(Group Leader)。
      • 每个 consumer 会定时向 Coordinator 发送 Heartbeat 请求,以维持在线。否则 Coordinator 会从该 group 删除该 consumer 。
    • 组长负责分配各个 consumer 消费的 partition ,该过程称为 Consumer rebalance 。流程如下:
      1. 组长从 Coordinator 获取该组的 consumer 列表,分配各个 consumer 消费的 partition 。
      2. 组长发送 SyncGroup 请求给 Coordinator ,告知分配方案。
      3. Coordinator 等收到 consumer 的 Heartbeat 请求时,在响应中告知已发生 rebalance 。
      4. consumer 删掉内存中的 UUID 等成员信息,重新加入该 group ,进入新的 generation 。
    • 当 group 的 consumer 或 partition 数量变化时,都会触发一次 rebalance 。
    • 每次 rebalance 时,group 就开始一个新时代(generation)。
      • 每个 generation 拥有一个从 0 递增的编号。
  • 日志示例:

    # broker 1 的 Coordinator 被分配了任务,开始管理 __consumer_offsets partition 对应的 consumer group
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 28
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 1
    # broker 1 的 Coordinator 被取消分配,停止管理一些 consumer group
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 3
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 0
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 24
    
    # 一个成员加入 stable 状态的 consumer group ,被分配了 member id
    INFO	[GroupCoordinator 1]: Dynamic Member with unknown member id joins group test_group_1 in Stable state. Created a new member id consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 for this member and add to the group.
    # 准备开始 rebalance ,原因是加入一个新成员,其 group instance id 为 None ,说明不是 Static Member
    INFO	[GroupCoordinator 1]: Preparing to rebalance group test_group_1 in state PreparingRebalance with old generation 38 (__consumer_offsets-21) (reason: Adding new member consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 with group instance id None)
    # group 进入 stable 状态,开始一个新 generation ,拥有 3 个成员
    INFO	[GroupCoordinator 1]: Stabilized group test_group_1 generation 39 (__consumer_offsets-21) with 3 members
    # Coordinator 收到 leader 发来的分配方案
    INFO	[GroupCoordinator 1]: Assignment received from leader for group test_group_1 for generation 39. The group has 3 members, 0 of which are static.
    
    • Preparing to rebalance 的几种 reason 示例:
      Adding new member $memberId with group instance id $groupInstanceId     # 加入一个新成员
      removing member $memberId on LeaveGroup                                 # 一个成员发出 LeaveGroup 请求,主动离开 group
      removing member $memberId on heartbeat expiration                       # 一个成员因为 Heartbeat 超时,被移出 group
      
  • rebalance 期间,当前 consumer group 的所有 consumer 都要暂停消费,开销较大。因此应该尽量减少 rebalance ,而 relalance 的原因通常是 consumer 数量变化,常见的几种情况如下。

    1. 如果一个 consumer 刚启动,则会向 broker 发送 JoinGroup 请求,加入 group ,被分配一个 member id ,触发一次 rebalance 。
    2. 如果一个 consumer 终止,不再运行。则等到 Heartbeat 超时,broker 会认为该 consumer 下线,触发一次 rebalance 。
    3. 上述 consumer 启动、终止的情况通常不频繁,可以容忍它触发 rebalance 。但有的情况下,consumer 会频繁启动、终止,比如被 k8s HPA 改变 consumer 数量。
      • 解决方案:额外开发一个应用,称为 dispatcher ,让它作为唯一的 consumer 连接到 broker ,获取消息。而原本的应用连接到 dispatcher ,间接获取消息。
      • 使用 dispatcher 还能解决另一个问题:group 中的 consumer 数量,大于当前 topic 的 partition 数量,导致部分 consumer 空闲、不能消费。
    4. 如果一个 consumer 终止,然后又重启。则不记得自己之前的 member id ,依然会发送 JoinGroup 请求,加入 group ,被分配一个新的随机 member id ,触发一次 rebalance 。
      • 而旧的 member id 不再使用,等到 Heartbeat 超时,又会触发一次 rebalance 。因此 consumer 重启时会触发两次 rebalance 。
      • 解决方案:Kafka v2.3 开始,consumer 增加了配置参数 group.instance.id ,用于避免 consumer 重启时触发 rebalance 。
        • 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id。
          当 consumer 重启之后发送 JoinGroup 请求时,Coordinator 会识别出它是 Static Member ,属于 rejoin ,因此分配一个新 member id ,并删除旧的 member id 。
          这样不会触发 rebalance ,除非 consumer 重启太慢,导致 Heartbeat 超时。
        • 日志示例:
          INFO	[GroupCoordinator 1]: Static member Some(static_member_1) of group test_group_1 with unknown member id rejoins, assigning new member id static_member_1-cdf1c4ea-2f1c-4f4d-bc46-bf443e5f7322, while old member id static_member_1-8b5d89b3-0757-4441-aeaa-50e7f9f55cee will be removed.
          INFO	[GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.
          
        • 如果一个 group 中只运行了一个 consumer ,则用户手动配置一个固定的 group.instance.id 即可。
        • 如果一个 group 中运行了多个 consumer ,则用户需要在客户端增加一些代码,自动给每个 consumer 配置一个互不相同的、持久不变的 group.instance.id 。
    5. 如果一个 consumer 消费太慢,连续调用 poll() 的间隔超过 max.poll.interval.ms ,则会导致 Heartbeat 超时,触发 rebalance 。
      • 解决方案:增加 max.poll.interval.ms ,或者优化 consumer 客户端代码,比如减少每次拉取的数据量从而减少消费耗时、从同步消费改为异步消费。

# Zookeeper

  • Kafka 采用 Zookeeper 作为分布式系统的底层数据库,记录 broker、topic、consumer 等信息。
    • 每个 broker 启动时,会到 zk 记录自己的 IP 和端口,供其它 broker 发现和访问。
    • 根据 broker.id 识别每个 broker ,即使 IP 和端口变化也会自动发现。
    • Kafka 集群第一次启动时会生成一个随机的 cluster ID ,保存到 zk 中。
      • broker 每次连接 zk 时都会检查 cluster ID 是否一致,因此一个 zk 集群不能供多个 Kafka 集群使用。
  • Kafka 在 zk 中使用的 znode 示例:
    /
    ├── admin               # 用于传递一些管理命令
    │   └── delete_topics   # 默认为 null
    ├── cluster
    │   └── id              # 记录 Kafka 集群的 id
    ├── brokers
    │   ├── ids             # 记录 broker 的信息。各个 broker 会在其下创建临时节点,当 broker 下线时会被自动删除
    │   ├── seqid
    │   └── topics          # 记录 topic 的信息
    ├── config
    ├── consumers
    ├── controller          # 记录当前的 controller ,这是一个临时节点
    ├── controller_epoch    # 记录 controller 当选的 epoch
    ...
    
    • 例:
      [zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1
      {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":-1,"features":{},"host":"10.0.0.1","timestamp":"1627960927890","port":9092,"version":5}
      [zk: localhost:2181(CONNECTED) 1] get /brokers/topics/__consumer_offsets/partitions/0/state
      {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,3,1]}
      [zk: localhost:2181(CONNECTED) 3] get /controller
      {"version":1,"brokerid":1,"timestamp":"1627960928034"}
      

# KRaft

  • Kafka v3.4 版本开始弃用 Zookeeper ,改用 broker 基于内置的 KRaft 协议维护分布式系统,从而大幅提高 broker 之间的通信效率,提高管理大量 partition 时的性能。
    • Zookeeper 模式下,各个 broker 会将元数据写入 Zookeeper ,并选出一个 broker 担任 Controller ,独自管理整个 Kafka 集群。
    • KRaft 模式下,各个 broker 不再连接 Zookeeper ,而是由一些(建议 3 或 5 个) broker 担任 Controller ,按 KRaft 协议进行分布式系统的决策,从而管理整个 Kafka 集群。

# 消息

  • Kafka 处理数据的基本单位为消息(message),又称为 record 。
  • 消息采用 Kafka 自定的格式封装,类似于 HTTP 报文。

# Topic

:主题,用于在逻辑上对消息分组管理。

  • 不同 topic 之间相互独立,它们的消息互不影响。

# Partition

:分区。

  • topic 在存储时会拆分成一个或多个 partition 。
    • 每个 partition 可以存储多个副本。
  • broker 每收到一条新消息时,先看它属于哪个 topic ,然后考虑分配到哪个 partition 中存储。
    • Kafka 会尽量将同一 topic 的各个 partition 存储到不同的 broker 上,从而分散负载。
    • Kafka 会强制将同一 partition 的各个 replica 存储到不同的 broker 上,从而抵抗单点故障。
    • 客户端只需要连接 broker 就能生产、消费消息,不需要关心消息的实际存储位置。

# LogSegment

  • broker 将数据以日志文件的形式存储在磁盘中。

    • broker 会在磁盘为每个 partition 分别创建一个目录,目录名格式为 <topic>-<partition> ,比如 __consumer_offsets-0、__consumer_offsets-1 。
    • 新增消息时,会保存到 partition 目录下的日志段(LogSegment)文件中。每隔一定时间或大小会创建一个新的 LogSegment 文件。
  • partition 目录的结构示例:

    /data/kafka-logs/                     # 数据日志的存储目录,其下每个子目录的命名格式为 <topic>-<partition id>
    ├── test-0/                           # 该目录用于存储 test 主题的 0 号分区
    │   ├── 00000000000000000000.index    # 第一个 LogSegment 的索引文件,其中第一个消息的 offset 为 0
    │   ├── 00000000000000000000.log      # 第一个 LogSegment 的数据日志文件
    │   ├── 00000000000000367814.index    # 第二个 LogSegment 的索引文件,其中第一个消息的 offset 为 367814
    │   ├── 00000000000000367814.log
    │   ├── leader-epoch-checkpoint       # 记录每一任 leader 当选时的 epoch 和 offset
    │   └── partition.metadata
    ├── test-2/
    ├── test-3/
    └── test-4/
    
    • 将消息写入 LogSegment 文件时,采用稀疏索引,避免单个 index 文件的体积过大。读取某个 offset 的消息时,需要先找到它所属的 LogSegment ,再根据该 LogSegment 的 index 文件找到消息。
  • 虽然 Kafka 将消息保存在磁盘中,比不上 Redis 内存数据库的读写速度,但 Kafka 采取了以下优化措施:

    • 生产消息时,每条消息以追加方式写入 LogSegment 文件,因此读写 LogSegment 文件时主要是顺序读写磁盘,比随机读写磁盘快了两个数量级。
    • 生产消息时,采用 MMAP 零拷贝技术,将 JVM 内存中的 LogSegment 数据拷贝到 Page Cache ,更快地将消息写入磁盘。
    • 消费消息时,采用 sendfile 零拷贝技术,将磁盘中的 LogSegment 数据拷贝到 Page Cache ,更快地将消息发送到 Socket 。

# Replica

  • 每个 partition 会存储多个副本(replica),从而备份数据。

  • Kafka 会自动在每个 partition 的所有副本中选出一个作为 leader replica ,而其它副本称为 follower replica

    • leader 可以进行读写操作,负责处理客户端的访问请求。
    • follower 不处理客户端的访问请求,只负责与 leader 的数据保持一致,从而备份数据。
    • 客户端读写 partition 时看到的总是 leader ,看不到 follower 。
  • Assigned Replicas

    • :指一个 partition 拥有的所有 replica 。
  • Preferred replica

    • :指 assigned replicas 中的第一个 replica 。
    • 新建一个 partition 时,一般由 preferred replica 担任 leader replica 。
      • 当所在的 broker 故障时,会选出其它 replica 担任 leader 。
      • 当 preferred replica 恢复时,会担任普通的 replica 。但 kafka 会自动尝试让 preferred replica 重新担任 leader ,该过程称为 preferred-replica-election、Partition rebalance 。
  • In Sync Replicas (ISR)

    • :指一个 partition 中与 leader 保持同步的所有 replica 。
    • 如果一个 follower 的滞后时间超过 replica.lag.time.max.ms ,或者 leader 连续这么长时间没有收到该 follower 的 fetch 请求,则认为它失去同步,从 ISR 中移除。
      • 例如:IO 速度过慢,使得 follower 从 leader 复制数据的速度,比 leader 新增数据的速度慢,就会导致 lastCaughtUpTimeMs 一直没有更新,最终失去同步。
    • leader 本身也属于 ISR 。
    • 只有 ISR 中的 replica 可以被选举为 leader 。
  • Under-replicated Replicas Set

    • :指一个 partition 中与 leader 没有同步的 replica 。
    • 当一个 follower 将 leader 的最后一条消息(Log End Offset)之前的日志全部成功复制之后,则认为该 follower 已经赶上了 leader ,记录此时的时刻作为该 follower 的 lastCaughtUpTimeMs
    • Kafka 的 ReplicaManager 会定期计算每个 follower 的 lastCaughtUpTimeMs 与当前时刻的差值,作为该 follower 对 leader 的滞后时间。

# Offset

  • partition 中存储的每个消息都有一个唯一的偏移量(offset),用于索引。

    • offset 的值采用 Long 型变量存储,容量为 64 bit 。
    • 生产者生产消息时、消费者消费消息时,offset 都会自动递增。
      • 因此,partition 类似于先入先出的队列,先生产的消息会先被消费。
  • Log Start Offset

    • :partition 中第一个消息的偏移量。刚创建一个 topic 时,该值为 0 。每次 broker 清理消息日志之后,该值会增大一截。
  • Log End Offset (LEO)

    • :partition 中最新一个消息的偏移量。
  • High Watemark (HW)

    • :partition 允许被 consumer 看到的最高偏移量。
    • partition 的 leader 新增一个消息时,会更新 LEO 的值,并传给 follower 进行同步,同步完之后才将该消息记作 HW 。
      • HW 取决于 ISR 中所有 replica 的最小 LEO 。
      • HW 的值总是小于等于 leader 的 LEO 。
      • consumer 只能看到 HW ,不知道 LEO 的值。
      • 综上,HW 机制保证了 Kafka 多节点的数据一致性。即使 leader 故障,选出其它 replica 担任新的 leader ,consumer 看到的最新消息也一样。
  • Consumer Committed Offset

    • :某个 consumer 在某个 partition 中最后一次消费的消息的偏移量。
    • 它由 Coordinator 记录,可以保证在 rebalance 之后 consumer 不会重复消费。
    • Kafka 能确保一个消息成功生产,但不能确保消息被消费,需要客户端主动更新 Committed Offset 。
  • Consumer Current Offset

    • :某个 consumer 在某个 partition 中下一次希望消费的消息的偏移量。
    • 它由 consumer 自己记录,可以保证在多次调用 poll() 方法时不会重复消费。
      • 如果记录的 offset 偏小,就会重复消费。如果记录的 offset 偏大,就会遗漏消费。
  • Consumer Lag

    • :consumer 在消费某个 partition 时的滞后量,即还有多少个消息未消费。
    • 它的值等于 HW - Consumer Committed Offset

# Kafka Streams

  • 早期的 Kafka 只支持批处理消息。消费者以 batch 为单位获取消息,处理完一个 batch 才获取下一个 batch 。
  • 后来的 Kafka 增加了 Kafka Streams 的 API ,用于流处理。
    • 优点:
      • 每生产一条消息,就立即消费、处理,不必等待凑够 batch 的数量,因此比批处理的延迟低、实时性好。
    • 缺点:
      • 比批处理的效率低、吞吐量低。就像 MySQL ,将 1000 条 INSERT 语句合并成一条 INSERT 语句再执行,总耗时会大幅降低。
      • Kafka Streams 目前的流处理功能比较简单,不如 Spark、Flink 等框架。