# 原理

# 架构

  • 基本架构:
    • 运行一些 Kafka 服务器,组成集群。
    • 用户运行客户端程序,连接到 Kafka 服务器,作为 producer 生产消息,或者作为 consumer 消费消息。

# Broker

:Kafka 服务器,负责存储、管理消息。

  • Kafka 集群中会选出一个 broker 担任 Controller 。
    • 每个 broker 在启动时,都会尝试到 zk 中创建 /controller 节点,创建成功则当选 Controller 。
    • Controller 负责管理整个集群,比如在每个 partition 的所有副本中选出一个作为 leader replica 。其它 broker 跟随 Controller 的决策。
    • 增加 broker 的数量,即可对 Kafka 集群横向扩容。

# Producer

:生产者,即生产消息的客户端,负责写入消息到 broker 。

# Consumer

:消费者,即消费消息的客户端,负责从 broker 读取消息。

  • consumer 消费消息的步骤:
    1. 发送请求到 broker ,订阅一个或多个 topic 。
    2. 定时发送请求到 broker ,轮询自己订阅的 topic 。如果存在可消费的消息,则拉取到本机。
  • broker 的消息可供多个 consumer 同时消费,因此消息被消费之后并不会被删除,除非超过 broker 的存储限制。

# Consumer Group

:消费者组,用于在逻辑上对 consumer 分组管理。

  • 客户端运行 consumer 实例时,可以指定其所属的 consumer group 。
    • 如果不指定,则该 consumer 的 group 为空,不会被 Coordinator 协调。
  • 一个 consumer 同时只能消费一个 partition ,因此通常用一组 consumer 同时消费一个 topic 下的不同 partition ,通过并行消费来提高消费速度。
    • 当一个 group 消费一个 topic 时,如果 consumer 的数量大于 partition 的数量,则部分 consumer 会空闲、不能消费。
      • 因此,最好将 partition 的数量设置成与 consumer 数量相同,或者为 consumer 数量的整数倍,以便于平均分配。
    • 不同 group 之间相互独立,即使同时消费同一个 topic 下的同一个 partition 也互不影响。

# Coordinator

# __consumer_offsets

  • Kafka 会自动创建一个名为 __consumer_offsets 的内部主题,用于存储所有 consumer group 的 Committed Offset 。

    • 每当一个 consumer group 提交 offset 到 Kafka 时, Kafka 会根据 group.id 的哈希值,分配 __consumer_offsets 中的某个 partition ,用于记录该 group 的 offset 。
    • 该 partition 的 leader replica 所在的 broker 运行的 GroupCoordinator 类,负责管理该 group 的 consumer、offset 。简称为 Coordinator 。
    • __consumer_offsets 主题中每条消息的格式:
      • key 为 {"Group":"xx", "Topic":"xx", "Partition":xx, ...}
      • value 为 {"Offset":xx, "CommitTimestamp":xx, ...}
  • __consumer_offsets 的删除规则。

    • 它默认配置了 log.cleanup.policy=compact ,对于存储的 offset 数据不会定时删除,而是压缩,从而尽量记录每个 consumer group 对每个 topic 的最新一次 Committed Offset 。
    • 如果 consumer group 取消订阅 topic ,则其 offset 依然会被 Kafka 记录在 __consumer_offsets 中,导致该 consumer group 的 Lag 越来越大,触发监控告警。用户可用 kafka-consumer-groups.sh 脚本来修改、删除 offset 。
    • 如果一个 consumer group 的任一成员在线,则不允许用户修改、删除其 offset ,避免出错。但允许删除 consumer group 或 topic ,此时会自动删除其 offset 。
    • 如果一个 consumer group 长时间未提交新的 offset(可能是未运行、运行了但没有新消息),
      • 当前 topic 可能已将 log.retention.hours 时长之前的消息删除,导致 topic 中的 earliest offset ,大于 consumer group 在 __consumer_offsets 中记录的旧 offset 。此时 consumer 调用 poll() 会消费失败,报错:Offset out of range ,然后自动触发 auto.offset.reset 策略,使得下一次调用 poll() 时能正常消费。
      • 如果满足 offsets.retention.minutes 条件,则 kafka 会自动从 __consumer_offsets 中删除该 consumer group 的 offset ,相当于该 consumer group 从未消费过该 topic 。此时 consumer 调用 poll() 会自动触发 auto.offset.reset 策略。
    • 即使长时间没有生产消息、消费消息,一个 topic 永远不会被 Kafka 自动删除,只会被用户手动删除。
  • Coordinator 管理的 consumer group 分为多种状态(group state):

    PreparingRebalance    # 准备开始 rebalance
    CompletingRebalance   # 即将完成 rebalance ,正在发送分配方案
    Stable                # 已完成 rebalance ,可以正常消费
    Empty                 # 组内没有成员
    Dead                  # 组内没有成员,且 offset 等 metadata 已删除,不能响应请求
    

# rebalance

  • consumer 启动时,如果指定了所属的 consumer group ,则会发送带有 UNKNOWN_MEMBER_ID 标志的 JoinGroup 请求给 Coordinator ,请求加入指定的 group ,并请求分配 member id 。

    • GroupCoordinator 会给每个 consumer 分配一个 member id ,格式为 client.id-UUID ,其中 UUID 为随机生成的十六进制编号。
      • 第一个加入 group 的 consumer 会担任组长(Group Leader)。
      • 每个 consumer 会定时向 Coordinator 发送 Heartbeat 请求,以维持在线。否则 Coordinator 会从该 group 删除该 consumer 。
    • 组长负责分配各个 consumer 消费的 partition ,该过程称为 Consumer rebalance 。流程如下:
      1. 组长从 Coordinator 获取该组的 consumer 列表,分配各个 consumer 消费的 partition 。
      2. 组长发送 SyncGroup 请求给 Coordinator ,告知分配方案。
      3. Coordinator 等收到 consumer 的 Heartbeat 请求时,在响应中告知已发生 rebalance 。
      4. consumer 删掉内存中的 UUID 等成员信息,重新加入该 group ,进入新的 generation 。
    • 当 group 的 consumer 或 partition 数量变化时,都会触发一次 rebalance 。
    • 每次 rebalance 时,group 就开始一个新时代(generation)。
      • 每个 generation 拥有一个从 0 递增的编号。
  • 日志示例:

    # broker 1 的 Coordinator 被分配了任务,开始管理 __consumer_offsets partition 对应的 consumer group
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 28
    INFO	[GroupCoordinator 1]: Elected as the group coordinator for partition 1
    # broker 1 的 Coordinator 被取消分配,停止管理一些 consumer group
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 3
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 0
    INFO	[GroupCoordinator 1]: Resigned as the group coordinator for partition 24
    
    # 一个成员加入 stable 状态的 consumer group ,被分配了 member id
    INFO	[GroupCoordinator 1]: Dynamic Member with unknown member id joins group test_group_1 in Stable state. Created a new member id consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 for this member and add to the group.
    # 准备开始 rebalance ,原因是加入一个新成员,其 group instance id 为 None ,说明不是 Static Member
    INFO	[GroupCoordinator 1]: Preparing to rebalance group test_group_1 in state PreparingRebalance with old generation 38 (__consumer_offsets-21) (reason: Adding new member consumer-1-5ee75316-16c0-474f-9u2d-6e57f4b238b3 with group instance id None)
    # group 进入 stable 状态,开始一个新 generation ,拥有 3 个成员
    INFO	[GroupCoordinator 1]: Stabilized group test_group_1 generation 39 (__consumer_offsets-21) with 3 members
    # Coordinator 收到 leader 发来的分配方案
    INFO	[GroupCoordinator 1]: Assignment received from leader for group test_group_1 for generation 39. The group has 3 members, 0 of which are static.
    
    • Preparing to rebalance 的几种 reason 示例:
      Adding new member $memberId with group instance id $groupInstanceId     # 加入一个新成员
      removing member $memberId on LeaveGroup                                 # 一个成员发出 LeaveGroup 请求,主动离开 group
      removing member $memberId on heartbeat expiration                       # 一个成员因为 Heartbeat 超时,被移出 group
      
  • rebalance 期间,当前 consumer group 的所有 consumer 都要暂停消费,开销较大。因此应该尽量减少 rebalance ,而 relalance 的原因通常是 consumer 数量变化,常见的几种情况如下:

    1. 如果一个 consumer 刚启动,则会向 broker 发送 JoinGroup 请求,加入 group ,被分配一个 member id ,触发一次 rebalance 。
    2. 如果一个 consumer 终止,不再运行。则等到 Heartbeat 超时,broker 会认为该 consumer 下线,触发一次 rebalance 。
    3. 上述 consumer 启动、终止的情况一般不频繁,可以容忍它触发 rebalance 。但有的情况下,consumer 会频繁启动、终止,比如被 k8s HPA 改变 consumer 数量。
      • 解决方案:额外开发一个应用,称为 dispatcher ,让它作为唯一的 consumer 连接到 broker ,获取消息。而原本的应用连接到 dispatcher ,间接获取消息。
      • 使用 dispatcher 还能解决另一个问题:group 中的 consumer 数量,大于当前 topic 的 partition 数量,导致部分 consumer 空闲、不能消费。
    4. 如果一个 consumer 消费太慢,连续调用 poll() 的时间间隔超过 max.poll.interval.ms ,也会导致 Heartbeat 超时,触发 rebalance 。
      • 解决方案:增加 max.poll.interval.ms 阈值,或者优化 consumer 客户端代码,例如减少每次拉取的数据量从而减少消费耗时、更快地开始下一次消费,例如从同步消费改为异步消费。
    5. 如果一个 consumer 终止,然后又重启。则不记得自己之前的 member id ,依然会发送 JoinGroup 请求,加入 group ,被分配一个新的随机 member id ,触发一次 rebalance 。
      • 而旧的 member id 不再使用,等到 Heartbeat 超时,又会触发一次 rebalance 。因此 consumer 重启时会触发两次 rebalance 。
      • 解决方案:Kafka v2.3 开始,consumer 增加了配置参数 group.instance.id ,用于避免 consumer 重启时触发 rebalance 。
        • 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id。
          当 consumer 重启之后发送 JoinGroup 请求时,Coordinator 会识别出它是 Static Member ,属于 rejoin ,因此分配一个新 member id ,并删除旧的 member id 。
          这样不会触发 rebalance ,除非 consumer 重启太慢,导致 Heartbeat 超时。
        • 日志示例:
          INFO	[GroupCoordinator 1]: Static member Some(static_member_1) of group test_group_1 with unknown member id rejoins, assigning new member id static_member_1-cdf1c4ea-2f1c-4f4d-bc46-bf443e5f7322, while old member id static_member_1-8b5d89b3-0757-4441-aeaa-50e7f9f55cee will be removed.
          INFO	[GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.
          
        • 如果一个 group 中只运行了一个 consumer ,则用户可以配置一个固定的 group.instance.id 值。
        • 如果一个 group 中运行了多个 consumer ,则用户需要在客户端增加一些代码,给每个 consumer 配置一个互不相同的、长期不变的 group.instance.id 值。
          例如以 k8s StatefulSet 方式部署多个 consumer ,它们的 Pod 名称会从 0 开始编号。可让每个 consumer 通过环境变量读取自己的 POD_NAME ,用作 group.instance.id 。

# Zookeeper

  • Kafka 采用 Zookeeper 作为分布式系统的底层数据库,记录 broker、topic、consumer 等信息。
    • 每个 broker 启动时,会到 zk 记录自己的 IP 和端口,供其它 broker 发现和访问。
    • 根据 broker.id 识别每个 broker ,即使 IP 和端口变化也会自动发现。
    • Kafka 集群第一次启动时会生成一个随机的 cluster.id ,记录到 data/meta.properties 文件和 zk 中。
      • broker 每次连接 zk 时,都会检查自己 data/meta.properties 文件中记录的 cluster.id ,是否与 zk 中记录的一致。因此一个 zk 集群不能供多个 Kafka 集群使用。
  • Kafka 在 zk 中使用的 znode 示例:
    /
    ├── admin               # 用于传递一些管理命令
    │   └── delete_topics   # 默认为 null
    ├── cluster
    │   └── id              # 记录 Kafka 集群的 id
    ├── brokers
    │   ├── ids             # 记录 broker 的信息。各个 broker 会在其下创建临时节点,当 broker 下线时会被自动删除
    │   ├── seqid
    │   └── topics          # 记录 topic 的信息
    ├── config
    ├── consumers
    ├── controller          # 记录当前的 controller ,这是一个临时节点
    ├── controller_epoch    # 记录 controller 当选的 epoch
    ...
    
    • 例:
      [zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1
      {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":-1,"features":{},"host":"10.0.0.1","timestamp":"1627960927890","port":9092,"version":5}
      [zk: localhost:2181(CONNECTED) 1] get /brokers/topics/__consumer_offsets/partitions/0/state
      {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,3,1]}
      [zk: localhost:2181(CONNECTED) 3] get /controller
      {"version":1,"brokerid":1,"timestamp":"1627960928034"}
      

# 消息

  • Kafka 处理数据的基本单位为消息(message),又称为 record 。

  • Kafka 采用自定的格式封装 message ,然后基于 TCP 协议传输,类似于 HTTP 报文。每条 message 的内容分为几个部分:

    • value
      • :message 负责传输的主体内容,可以是 text、json、binary 等格式,只要 consumer 能识别。
      • 如果 value 取值为空(不管 key 的取值),则这条 message 称为墓碑(tombstone),用途如下:
        • 配置 log.cleanup.policy=compact 时,对于各种取值的 key 只保留最新的一条 message 。如果用户想把最新一条 message 也删除,则可以发送具有相同 key 的 tombstone 消息,让它成为该 key 的最新一条 message 。
        • tombstone 消息会像普通消息一样被 consumer 消费。而 consumer 收到 tombstone 消息,就知道该 key 的历史消息已被删除。
        • Kafka 会将 tombstone 消息至少保留 delete.retention.ms=24h 时长再删除,从而尽量让 consumer 收到 tombstone 消息。
    • key
      • :一个字符串。
      • 如果 key 不为空,则会根据 key 的哈希值选择将 message 写入第几个 partition 。
      • 如果 key 为空值,则会采用 Round-robin 算法,为 message 分配一个 partition 。
    • headers
      • :一组键值对,用于添加用户自定义的元数据、注释,类似于 HTTP headers 。
      • 默认情况下,每条 message 的 value、key、headers 为空值,因此 producer 可以发送内容为空的 message 。
  • 生产消息时,broker 将每条消息以追加方式写入 LogSegment 文件。为了尽量顺序读写磁盘,不支持修改已生产的消息。

    • 不支持删除单条消息,但可用 bin/kafka-delete-records.sh 删除某个 partition 在某个 offset 之前的所有消息。

# Topic

:主题,用于在逻辑上分组管理消息。

  • 不同 topic 之间相互独立,它们的消息互不影响。

# Partition

:分区。

  • Kafka 在逻辑上用 topic 分组管理消息,而物理存储时每个 topic 会拆分成一个或多个 partition 。

    • 每个 partition 存储该 topic 数据的一个子集。
    • 每个 partition 单独工作。即使一个 partition 故障,其它 partition 也可用于读写消息。
  • 例:假设一个名为 test 的 topic ,分成 3 个 partition 。简单的生产、消费流程如下:

    1. producer 发送 10 条消息到该 topic ,分散存储到 3 个 partition 。此时,3 个 partition 分别存储 4、3、3 条消息,记录的 offset 分别为 3、2、2 。
    2. consumer 获取该 topic 的全部消息,即从 3 个 partition 分别获取消息。
    3. consumer 知道自己目前在 3 个 partition 的消费进度分别为 3、2、2 ,于是记录到 Committed Offset 变量。等之后 producer 发送了新消息时,就从 3、2、2 位置处继续消费。
  • 客户端只要连接到 Kafka 集群的任一 broker ,就能查询到所有 broker 的 IP 、各个 partition 存储在哪个 broker 。

    • 当用户向某个 partition 生产、消费消息时,客户端会自动查询到储该 partition 的 leader replica 存储在哪个 broker ,然后向该 broker 建立 TCP 连接。
    • producer 发送每条消息时,需要先决定将这条消息写入哪个 topic 的哪个 partition ,再查询到存储该 partition 的 broker IP ,然后以 batch 为单位发送消息到 broker 。该过程默认会自动完成,用户也可以主动调整。
  • producer 决定将消息写入 topic 的第几个 partition 时,有多种分配策略:

    • 默认情况下,不指定 partition、key 。此时采用 Round-robin 算法,将消息轮流写入各个 partition ,实现负载均衡。
    • 不指定 partition ,但指定了消息的 key 。此时采用 Key-ordering 算法,根据 key 的哈希值选择将 message 写入第几个 partition 。如果几条消息的 key 相同,则会被写入同一个 partition 。
    • 指定 partition 的编号。此时将消息写入这个 partition ,不会写入其它 partition 。
  • 关于顺序性。

    • 单个 partition 中的所有消息按照 offset 顺序被写入,也按照 offset 顺序被读取。属于先入先出的队列,先生产的消息的 offset 更小,会先被消费。
      • 例如 producer 写入两条消息 A、B 到一个 partition 中。等 consumer 读取消息时,顺序一定也是 A、B 。
    • 如果一个 topic 分成多个 partition ,则这些 partition 之间的消息不能控制顺序。
      • 例如 producer 写入两条消息 A、B 到两个 partition 中。等 consumer 读取消息时,不能确定先读取哪个 partition ,因此顺序可能是 A、B ,也可能是 B、A 。
    • 如果用户希望 consumer 读取消息的顺序,与 producer 写入消息的顺序一致,则可以在 topic 之下只创建一个 partition ,实现全局顺序。
      • 缺点:这样不支持并行消费,大幅降低了消费速度。
      • 也可以实现局部顺序:由 producer 的业务代码将消息分成不同类型,同种类型的消息使用相同的 key ,从而根据 key 的哈希值写入同一个 partition 。
    • 如果用户希望实现严格的顺序,则还需要给 producer 配置 max.in.flight.requests.per.connection=1 ,限制已发送但未收到 ack 的 batch 同时只有 1 个。
      • 默认配置为 5 。如果同时发送多个 batch ,则先发送的 batch 可能因为 retries 而延后到达 broker ,导致顺序出错。
      • 缺点:这样大幅降低了生产速度。

# LogSegment

  • broker 将数据以日志文件的形式存储在磁盘中。

    • broker 会在磁盘为每个 partition 分别创建一个目录,目录名格式为 <topic>-<partition> ,比如 topic_A-0、topic_A-1 。
    • 新增消息时,会保存到 partition 目录下的日志段(LogSegment)文件中。每隔一定时间或大小会创建一个新的 LogSegment 文件。
  • partition 目录的结构示例:

    /data/kafka-logs/                     # 数据日志的存储目录,其下每个子目录的命名格式为 <topic>-<partition id>
    ├── test-0/                           # 该目录用于存储 test 主题的 0 号分区
    │   ├── 00000000000000000000.index    # 第一个 LogSegment 的索引文件,其中第一条消息的 offset 为 0
    │   ├── 00000000000000000000.log      # 第一个 LogSegment 的数据日志文件
    │   ├── 00000000000000367814.index    # 第二个 LogSegment 的索引文件,其中第一条消息的 offset 为 367814
    │   ├── 00000000000000367814.log
    │   ├── leader-epoch-checkpoint       # 记录每一任 leader 当选时的 epoch 和 offset
    │   └── partition.metadata
    ├── test-2/
    ├── test-3/
    └── test-4/
    
    • 将消息写入 LogSegment 文件时,采用稀疏索引,避免单个 index 文件的体积过大。读取某个 offset 的消息时,需要先找到它所属的 LogSegment ,再根据该 LogSegment 的 index 文件找到消息。
  • 虽然 Kafka 将消息保存在磁盘中,读写速度比不上 Redis 等内存数据库,但 Kafka 采取了以下优化措施:

    • 生产消息时,每条消息以追加方式写入 LogSegment 文件,因此读写 LogSegment 文件时主要是顺序读写磁盘,比随机读写磁盘快了两个数量级。
    • 生产消息时,采用 MMAP 零拷贝技术,将 JVM 内存中的 LogSegment 数据拷贝到 Page Cache ,更快地将消息写入磁盘。
    • 消费消息时,采用 sendfile 零拷贝技术,将磁盘中的 LogSegment 数据拷贝到 Page Cache ,更快地将消息发送到 Socket 。

# Replica

  • 每个 partition 可以存储一份或多份全量数据,每份称为一个副本(replica)。

  • Kafka 会自动在每个 partition 的所有副本中选出一个作为 leader replica ,而其它副本称为 follower replica

    • leader 可以进行读写操作,负责处理客户端的访问请求。
    • follower 不处理客户端的访问请求,只负责与 leader 的数据保持一致,从而备份数据。
    • 客户端读写 partition 时看到的总是 leader ,看不到 follower 。
  • 关于可用性。

    • 如果一个 replica 故障,则 broker 会从其它 replica 中选出一个担任新的 leader ,供客户端读取消息。也能继续写入消息,因为 broker 通常配置了 min.insync.replicas=1
    • 如果一个 broker 故障,其上存储的所有 partition 的 replica 不可用,则 Kafka 并不会创建新的 replica 。
    • Kafka 会尽量将同一 topic 的各个 partition 存储到不同的 broker 上,从而分散负载。
    • Kafka 会强制将同一 partition 的各个 replica 存储到不同的 broker 上,从而抵抗单点故障。
  • Assigned Replicas

    • :指一个 partition 拥有的所有 replica 。
  • Preferred replica

    • :指 assigned replicas 中的第一个 replica 。
    • 新建一个 partition 时,一般由 preferred replica 担任 leader replica 。
      • 当所在的 broker 故障时,会选出其它 replica 担任 leader 。
      • 当 preferred replica 恢复时,会担任普通的 replica 。但 kafka 会自动尝试让 preferred replica 重新担任 leader ,该过程称为 preferred-replica-election、Partition rebalance 。
  • In Sync Replicas (ISR)

    • :指一个 partition 中与 leader 保持同步的所有 replica 。
    • 如果一个 follower 的滞后时间超过 replica.lag.time.max.ms ,或者 leader 连续这么长时间没有收到该 follower 的 fetch 请求,则认为它失去同步,从 ISR 中移除。
      • 例如:IO 速度过慢,使得 follower 从 leader 复制数据的速度,比 leader 新增数据的速度慢,就会导致 lastCaughtUpTimeMs 一直没有更新,最终失去同步。
    • leader 本身也属于 ISR 。
    • 只有 ISR 中的 replica 可以被选举为 leader 。
  • Under-replicated Replicas Set

    • :指一个 partition 中与 leader 没有同步的 replica 。
    • 当一个 follower 将 leader 的最后一条消息(Log End Offset)之前的日志全部成功复制之后,则认为该 follower 已经赶上了 leader ,记录此时的时刻作为该 follower 的 lastCaughtUpTimeMs
    • Kafka 的 ReplicaManager 会定期计算每个 follower 的 lastCaughtUpTimeMs 与当前时刻的差值,作为该 follower 对 leader 的滞后时间。

# Offset

  • partition 中存储的每条消息都有一个唯一的偏移量(offset),用于索引。

    • offset 存储为 Long 型变量,容量为 64 bit ,从 0 开始递增。
  • Log Watermark

    • :partition 中最旧一条消息的 offset 。
    • 新建一个 partition 时,该值为 0 。每次执行 log.cleanup.policy 之后,该值会增大一截。
  • Log End Offset (LEO)

    • :等于 partition 中最新一条消息的 offset 加 1 ,即预计下一条新消息的 offset 。
  • High Watermark (HW)

    • :partition 允许被 consumer 看到的最高 offset 。
    • partition 的 leader 新增一条消息时,会更新 LEO 的值,并传给 follower 进行同步,同步完之后才将该 LEO 记作 HW 。
      • ISR 可能包含多个 replica ,分别记录了一个 LEO ,而 HW 等于最小的哪个 LEO ,小于等于 leader 的 LEO 。
      • consumer 只能看到 HW ,不知道 LEO 的值。
      • 综上,HW 机制保证了 Kafka 多节点的数据一致性。即使 leader 故障,选出其它 replica 担任新的 leader ,consumer 看到的最新消息也一样。
  • Consumer Current Offset

    • :等于 consumer 在某个 partition 中下一条希望消费的消息的 offset 。
    • 它由 consumer 记录,从而避免重复消费。
  • Consumer Committed Offset

    • :等于 consumer 已提交到 broker 的 Current Offset 值。
    • 它由 Coordinator 记录,使得 rebalance 之后新的 Coordinator 也知道 consumer 的消费进度,避免重复消费。
    • 当客户端调用 poll() 拉取消息之后,Kafka 不知道消息是否被成功消费,需要客户端主动提交 Committed Offset 。
  • Consumer Lag

    • :consumer 在消费某个 partition 时的滞后量,即还剩多少条消息未消费。
    • 它的值等于 HW - Consumer Committed Offset
  • 例:

    1. 新建一个 partition ,此时 LEO 和 HW 都为 0 。
    2. 生产者写入 3 条消息,它们的 offset 依次为 0、1、2 。此时 LEO 和 HW 都为 3 。
    3. 消费者消费 3 条消息,此时 Committed Offset 追上 HW ,等于 3 。

# Kafka Streams

  • 早期的 Kafka 只支持批处理消息。 consumer 以 batch 为单位获取消息,处理完一个 batch 才获取下一个 batch 。
  • 后来的 Kafka 增加了 Kafka Streams 的 API ,用于流处理。
    • 优点:
      • 每生产一条消息,就立即消费、处理,不必等待凑够 batch 的数量,因此比批处理的延迟低、实时性好。
    • 缺点:
      • 比批处理的效率低、吞吐量低。就像 MySQL ,将 1000 条 INSERT 语句合并成一条 INSERT 语句再执行,总耗时会大幅降低。
      • Kafka Streams 目前的流处理功能比较简单,不如 Spark、Flink 等框架。