# 部署

# 版本

  • Kafka 的版本列表 (opens new window)
    • 例如 kafka_2.13-2.6.0.tgz ,前面的 2.13 是指 Scala 编译器的版本,后面的 2.6.0 是指 Kafka 的版本。
    • 使用 Kafka 时,应该尽量让客户端与服务器的版本一致,避免不兼容。
  • v0.10.0.0
    • 于 2016 年发布。新增了 Kafka Streams 的 API ,用于流处理。
  • v0.11.0.0
    • 于 2017 年发布。改进了消息格式,支持给 message 添加 headers 。支持事务、幂等性。
  • v1.0.0
    • 于 2017 年发布。
  • v1.1.0
    • 于 2018 年发布。
    • 改进了 Controller ,提高更改 leader replica 的效率,将 broker 正常终止的耗时从几分钟减少到几秒。
  • v2.0.0
    • 于 2018 年发布。
  • v2.8.0
    • 于 2021 年发布。
    • 增加 KRaft 协议,但只是试用阶段。
  • v3.0.0
    • 于 2021 年发布。
    • 生产者默认设置了 acks=all 和 enable.idempotence=true ,实现最强的交付保证。
    • 消费者的配置参数 session.timeout.ms 默认值从 10s 增加到 45s 。
    • 弃用配置参数 log.message.format.version 和 message.format.version ,停止兼容旧的消息格式 v0 和 v1 。
  • v3.3.0
    • 于 2022 年发布。
    • KRaft 成为正式功能。
  • v3.4.0
    • 于 2023 年发布。
    • 支持将已有的 Kafka 集群,从 Zookeeper 模式升级到 KRaft 模式。
  • v3.5.0
    • 于 2023 年发布。
    • 弃用 Zookeeper ,请用户改用 Kafka 内置的 KRaft 协议。
  • v3.8.0
    • 于 2024 年发布。
    • 开始提供官方 Docker 镜像,执行 docker pull apache/kafka 即可拉取。

# 部署

  • 下载二进制版:

    wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
    

    解压后运行:

    bin/zookeeper-server-start.sh config/zookeeper.properties   # 启动 zookeeper 服务器
    bin/kafka-server-start.sh     config/server.properties      # 启动 kafka broker 服务器
    
    • 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
      • Kafka 发行版包含了 zk 的可执行文件,可以同时启动 kafka、zk 服务器,也可以在其它地方启动 zk 服务器。
    • Kafka 会尽快将生产的消息写入磁盘,因此 JVM 占用的内存一般不超过 6G , CPU 平均负载为 2~4 核。
      • kafka-server-start.sh 中默认配置了以下环境变量,限制 JVM 占用的内存:
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        
      • 主机应该留出一些空闲内存,相当于 JVM 内存的 0.5~1 倍,用作 Page Cache ,提高 Kafka 读写磁盘的速度。
      • 上述配置的 Kafka 单节点每分钟就能生产 1 百万条消息,部署多节点还能横向扩容。因此一般用户不会触及 Kafka 的性能瓶颈,常见的性能问题是 rebalance 频繁、消费者的消费速度慢。
    • 声明以下环境变量,即可让 broker 开启 JMX 监控,监听一个端口:
      export JMX_PORT=9091
      # export JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false"
      
  • 或者用 docker-compose 部署:

    version: '3'
    
    services:
      kafka:
        container_name: kafka
        image: bitnami/kafka:3.4.0
        restart: unless-stopped
        # stop_grace_period: 3m     # 终止 kafka 可能耗时几分钟,需要等待。否则强制终止 kafka 会来不及保存数据,导致重启时花更久时间恢复
        environment:
          ALLOW_PLAINTEXT_LISTENER: 'yes'
          # KAFKA_HEAP_OPTS: -Xmx2G -Xms2G
        ports:
          - 9092:9092
        # volumes:
        #   - ./config:/bitnami/kafka/config
        #   - ./data:/bitnami/kafka/data
    
    • Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
      • 该镜像会根据环境变量配置 server.properties 文件,这里直接挂载配置目录,通过 CUSTOM_INIT_SCRIPT 执行命令还原配置文件。

# 配置

  • kafka 的配置目录示例:
    config/
    ├── consumer.properties       # 消费者的配置文件
    ├── log4j.properties          # Java 日志的配置文件
    ├── producer.properties       # 生产者的配置文件
    ├── server.properties         # broker 的配置文件
    └── zookeeper.properties      # zk 的配置文件
    
    • 启动 broker 时只需读取 server.properties、log4j.properties 文件。

# server.properties

配置示例:

# 关于 kafka 集群
broker.id=1                               # 该 broker 在 Kafka 集群中的唯一标识符,默认为 -1 ,必须赋值为一个非负整数
listeners=PLAINTEXT://0.0.0.0:9092        # broker 监听的 Socket 地址
advertised.listeners=PLAINTEXT://10.0.0.1:9092  # 当前 broker 供其它 broker 和客户端访问的地址,它会登记到 zk 中。默认采用 listeners 的值

# 关于 zk
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181   # 要连接的 zk 节点,多个地址之间用逗号分隔
# zookeeper.connection.timeout.ms=6000

# 保存数据日志
log.dirs=/data/kafka-logs                 # broker 存放数据日志的目录,如果有多个目录则用逗号分隔
# log.roll.ms=...                         # 这是 kafka 旧版的配置参数,后来改名为 segment.ms
# segment.ms=604800000                    # 每个 LogSegment 文件的最长写入时间,超过该值则会关闭该文件,然后创建一个新的 LogSegment 用于写入。默认为 7d
# segment.bytes=1073741824                # 每个 LogSegment 的最大体积,超过该值则会关闭该文件,然后创建一个新的 LogSegment 用于写入。默认为 1G
# log.flush.interval.messages=null        # 每个 partition ,每接收多少条消息就 flush 一次,即将内存中的数据写入磁盘
# log.flush.interval.ms=null              # 每个 partition ,每经过多少毫秒就 flush 一次
                                          # 默认由 broker 自动控制何时 flush ,效率更高。而且每个 partition 存在 replication ,也不怕 broker 突然故障,未将内存中的数据写入磁盘
# log.flush.offset.checkpoint.interval.ms=60000 # 每隔多少毫秒,刷新一次 checkpoint

# 清理数据日志
# log.cleanup.policy=delete               # LogSegment 文件的清理策略(只会清理已关闭的 LogSegment ,不会清理当前打开的 LogSegment)
                                          # 默认为 delete ,删除保存时长超过 log.retention.hours 的 LogSegment
                                          # 可改为 compact ,这会将旧 LogSegment 中的数据拷贝到新 LogSegment 中,并且对于各种取值的 key 只保留最新的一条 message ,删除之前的 message
                                          # 可改为 delete,compact ,同时采用两种策略
# log.retention.check.interval.ms=300000  # 每隔多久检查一次所有 LogSegment ,如果超过 log.retention.bytes、log.retention.hours 阈值则清理。默认为 5min
log.retention.bytes=10737418240           # 如果一个 partition 占用的磁盘空间超过该值,则清理其中的旧 LogSegment 。默认为 -1 ,即不限制
                                          # 假设配置 log.retention.bytes=1G、num.partitions=6、default.replication.factor=2 ,则一个 topic 理论上最多占用 12G 磁盘,实际上可能超过一些才清理
log.retention.hours=24                    # 如果一个 LogSegment 文件的 modify time 距今超过该时长,则清理该 LogSegment 。默认为 7d 。如果设置为 -1 ,则不限制
                                          # 如果一条消息的保存时长超过 log.retention.hours ,但所在的 LogSegment 尚未关闭,则该消息不会被清理
                                          # 综上,如果想更及时地清理 kafka 消息,则建议减小 segment.ms 或 segment.bytes ,从而创建更小粒度的 LogSegment
# log.retention.minutes=null              # 默认未设置,采用 log.retention.hours 的值
# log.retention.ms=null                   # 默认未设置,采用 log.retention.minutes 的值。
# delete.retention.ms=86400000            # 默认为 24h 。采用 log.cleanup.policy=compact 时,会将 tombstone 消息至少保留该时长才删除

# 关于 topic
# auto.create.topics.enable=true          # 当客户端生产、消费一个不存在的 topic 时,是否自动创建该 topic
# delete.topic.enable=true                # 是否允许删除 topic
# compression.type=producer               # broker 将消息存储到磁盘时的压缩格式。取值为 producer 则采用与生产者相同的压缩格式
message.max.bytes=10485760                # 允许接收的最大 batch.size ,默认为 1M ,这里设置为 10M 。该参数作用于所有 topic ,也可以对每个 topic 分别设置 max.message.bytes

# 关于 partition
num.partitions=6                          # 新建 topic 的默认 partition 数,默认为 3
default.replication.factor=2              # 新建 partition 的默认副本数,默认为 1
# min.insync.replicas=1                   # 当生产者配置了 acks=all 时,新消息至少写入多少个 replica 才算成功,否则生产者抛出异常 NotEnoughReplicas
# auto.leader.rebalance.enable=true           # 是否自动进行 partition rebalance
# leader.imbalance.check.interval.seconds=300 # Controller 每隔多久检查一次是否执行 partition rebalance
# leader.imbalance.per.broker.percentage=10   # 如果该 broker 上的非 preferred leader 超过该百分比,则进行 partition rebalance

# 关于 __consumer_offsets
# offsets.topic.num.partitions=50         # __consumer_offsets 主题的 partition 数
# offsets.topic.replication.factor=3      # __consumer_offsets 主题的每个 partition 的副本数。部署单节点 Kafka 时必须减至 1
# offsets.retention.minutes=10080         # kafka v2.0 将该参数的默认值从 1d 改为 7d
    # 如果一个 consumer group 超过该时长未提交新的 offset ,并且该 consumer group 当前未在线,或未订阅相关 topic ,则从 __consumer_offsets 中删除该 consumer group 的 offset

# 关于进程
# background.threads=10                   # broker 处理后台任务的线程数
# num.network.threads=3                   # broker 处理网络请求的线程数
# num.io.threads=8                        # broker 处理磁盘 IO 的线程数,应该不小于磁盘数

# 关于网络
session.timeout.ms=60000                  # 会话超时,默认为 10s 。如果在该时长内 broker 一直没有收到 consumer 的 heartbeat 请求,则认为 consumer 下线
# heartbeat.interval.ms=3000              # broker 等待 heartbeat 请求的超时时间。通常设置为 session.timeout.ms 的 1/3 ,以允许超时两次
# socket.send.buffer.bytes=102400         # socket 发送消息的缓冲区大小,默认为 100K
# socket.receive.buffer.bytes=102400      # socket 接收消息的缓冲区大小,默认为 100K 。网络延迟高于 1ms 时,增加 buffer 有利于提高传输效率,但会占用更多内存
# socket.request.max.bytes=104857600      # socket 允许接收的单条消息的最大大小,默认为 100M
  • default.replication.factor 建议设置为 2 或 3 。
    • 副本数大于 1 时能提高可用性。假设每个 partition 的副本数为 n ,则 Kafka 集群最多允许 n-1 个 broker 故障。
    • 副本数最多允许跟 broker 数一样大,但副本数较多时,会线性增加读写磁盘的数据量,也会占用更多磁盘空间。
  • num.partitions 建议设置为 6 等公倍数,当消费组包含 1、2、3 个消费者时都可以平均分配。
    • 分区数与 broker 数没关系,只是限制了一个消费组中同时工作的消费者数量。
      • 假设 num.partitions 为 2 ,则每个消费组中同时只能有 ≤2 个消费者工作,并发数太少,因此消费速度慢。
    • 增加分区数,可以允许一个消费组中有更多消费组并发消费,从而线性提高消费速度。
      • 但分区数太多会降低 broker 的性能。
        • 每当一个分区被生产者、消费者使用时,broker 需要打开该分区在磁盘的几个数据文件,因此打开的文件描述符变多,占用更多内存。
        • 每个分区的状态变化时,broker 都需要实时同步到 zk ,导致 zk 负载变大。严重情况下,会导致生产、消费消息的延迟增加,吞吐量降低。
      • Kafka v1.1.0 改进了 Controller ,每个 broker 建议最多存储 4k 个 partition(包括副本数),每个集群最多 20w 个。
      • 如果用 KRaft 协议取代 zookeeper ,则可以管理数百万的 partition 。

# producer.properties

配置示例:

bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092   # 初始连接的 broker 地址。先通过它获取所有 broker 的 advertised.listeners 地址,再实际连接
# client.id=''                  # 客户端的 ID

# acks=1                        # 判断消息发送成功的策略,这决定了 Kafka 的数据可靠性
  # 取值为 0 ,则不等待 broker 的回复,直接认为消息已发送成功
  # 取值为 1 ,则等待 leader replica 确认接收消息
  # 取值为 all 或 -1 ,则等待消息被同步到所有 replica
# retries=2147483647            # 发送消息失败时,如果不超过 delivery.timeout.ms 时长,则最多尝试重发多少次。有的客户端会配置 retries=0 ,这是不可靠的
# retry.backoff.ms=100          # 发送消息失败时,间隔多久才 retry 发送。该间隔时间会指数级递增
# enable.idempotence=false      # 是否在生产消息时保证幂等性,避免重复生产一条消息。如果启用该功能,需要设置 acks=all 且 retries 大于 0

# batch.size=16384              # 限制每个 batch 的大小,默认为 16K
# buffer.memory=33554432        # 限制生产者用于发送消息的缓冲区大小,默认为 32M
# compression.type=none         # 发送消息时,batch 采用的压缩格式。默认不压缩,建议采用 lz4
max.request.size=10485760       # 限制生产者向 broker 发送的每个请求的大小,这限制了每个请求包含的 batch (压缩之后)数量。默认为 1M ,这里设置为 10M
# max.block.ms=60000            # 生产者调用 send() 等方法时,如果 buffer.memory 不足或 metadata 获取不到,阻塞等待的超时时间

# linger.ms=0                   # 生产者创建每个 batch 时,等待多久才发送。调大该值,有助于让每个 batch 包含更多消息
# request.timeout.ms=30000      # 发送请求给 broker 时,等待响应的超时时间
# delivery.timeout.ms=120000    # 生产者调用 send() 方法发送消息的超时时间,该值应该不低于 linger.ms + request.timeout.ms
  • producer 向同一个 partition 生产多条消息时,会等内存中缓冲了一定数量之后,才打包为一批(batch)发送。

    • 如果生产的第一条消息就超过 batch.size 限制,则依然会打包成一个 batch ,但只包含该消息。
    • Kafka 生产消息、消费消息、同步 replica 时,都会将消息打包成 batch 再传输,从而提高传输效率。
    • 每个 batch 可能包含 1~N 条消息,每次网络请求可能传输 0~N 个 batch 。
  • batch.size 不能大于 producer 的 max.request.size ,否则过大的 batch 不能装载到请求中。

    • 不能大于 broker 的 message.max.bytes ,否则请求不能被接收。
    • 不能大于 consumer 的 fetch.max.bytes ,否则不能被消费。
  • Kafka 支持将生产的消息进行压缩。

    • 一般流程:
      1. producer 将消息 batch 以某种压缩格式发送。
        • 增加 batch.size 有利于增加重复的消息内容,加强压缩率。
      2. broker 接收消息,默认以同种压缩格式存储。
      3. consumer 拉取消息,自动识别其压缩格式,然后解压。
    • 优点:
      • 减少消息占用的磁盘空间。
      • 如果 Kafka 吞吐量不高,则启用压缩之后,每个 batch 的体积更小,因此会减少网络 IO 量、传输耗时。
      • 如果 Kafka 吞吐量达到瓶颈,则启用压缩之后,每个 batch 的体积不变,但能打包更多条消息,因此会提高 Kafka 吞吐量。
    • 缺点:
      • 因为压缩和解压,客户端会增加一点 CPU 负载,通常可忽略不计。
    • Kafka 支持多种压缩格式:
      none
      gzip
      lz4
      snappy
      zstd
      
      • none 格式表示不压缩。虽然没有压缩、解压的耗时,但消息体积大,通过网络传输的耗时久。
      • 用 Nginx 日志作为 Kafka 消息来测试, gzip 压缩率大概为 5% 。
      • 比较压缩消息的 CPU 耗时: gzip >> zstd > snappy > lz4 > none
      • 比较消息体积: none >> lz4 > snappy > zstd > gzip

# consumer.properties

配置示例:

bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092

# group.id=null                     # 消费者组的名称
# group.instance.id=null            # 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id
# allow.auto.create.topics=false    # 订阅或主动分配 topic 时,如果该 topic 不存在,是否自动创建
# auto.offset.reset=latest          # 如果 Coordinator 没有记录 Consumer Committed Offset (可能是未创建、已过期删除),则默认从哪个 offset 处开始消费
    # 可选的取值:
    # earliest : 采用 partition 可见范围内最早的 offset
    # latest   : 采用 partition 最新的 offset ,即 High Watemark
    # none     :让 consumer 抛出异常
# enable.auto.commit=true           # 是否自动在后台提交 Consumer Committed Offset 。可以关闭该功能,由用户主动提交,更可靠
# auto.commit.interval.ms=5000      # 自动提交的间隔时长

# max.poll.records=500              # consumer 每次调用 poll() 方法,最多拉取多少条消息。这会影响、但不决定 fetch 请求的数量
# max.poll.interval.ms=300000       # consumer 每次调用 poll() 方法,最多间隔多久就应该再次调用 poll() 。如果超时,则 broker 认为 consumer 下线,触发 rebalance
# max.partition.fetch.bytes=1048576 # 每次发出 fetch 请求时,从每个 partition 最多获取多少数据。默认为 1M 。如果获取的第一条消息就超过该限制,则只返回该消息
# fetch.min.bytes=1                 # 每次发出 fetch 请求时,broker 应该至少累积多少数据才返回响应
# fetch.max.wait.ms=500             # 每次发出 fetch 请求时,broker 如果没有满足 fetch.min.bytes 的数据,则最多等待指定时长就返回响应
# fetch.max.bytes=57671680          # 每次发出 fetch 请求时,预期 broker 最多返回的数据量。默认为 55M
  • 生产者、消费者都有 client.id 标识符,允许重复。消费者还有 group.id 标识符。
  • 用户调用 consumer 的 poll() 方法,即可消费消息。
    • consumer 会在后台向 broker 发送 fetch 请求,拉取数据,暂存到内存中的队列中,直到用户消费之后才删除。
    • consumer 占用的最大内存大概为 max.partition.fetch.bytes * partition_count ,或者 fetch.min.bytes * broker_count 。

# SASL

  • Kafka broker 支持通过 JAAS 框架启用 SASL 认证。

    • 默认不要求身份认证,可以被其它 broker、client 直接连接,因此不安全。
    • 可启用以下 SASL 认证机制:
      • PLAIN
      • GSSAPI (Kerberos)
      • OAUTHBEARER
      • SCRAM-SHA-256
  • 对于通过身份认证的用户,Kafka 支持配置 ACL 规则,控制用户的访问权限。

    • 默认的 ACL 规则为空,因此不允许用户访问任何资源,除非是 super 用户。
  • Kafka 传输数据时默认未加密,称为 PLAINTEXT 方式。

    • 可以启用 SSL 加密通信,但会增加通信延迟。
    • 是否启用 SSL ,与是否启用 SASL 认证无关。
    • 注意 PLAIN 是一种 SASL 认证机制,而 PLAINTEXT 是指数据未加密传输。
  • 例:启用 PLAIN 认证

    1. 修改 config/server.properties 配置文件,将通信协议从默认的 PLAINTEXT:// 改为 SASL_PLAINTEXT:// ,即采用 SASL 认证 + 未加密通信。

      listeners=INTERNAL_IP://0.0.0.0:9092,PUBLIC_IP://0.0.0.0:9093                 # 定义两个 listener 。当客户端通过某个 listener 连接时,就采用相应的 advertised.listeners
      advertised.listeners=INTERNAL_IP://10.0.0.1:9092,PUBLIC_IP://1.1.1.1:9093
      listener.security.protocol.map=INTERNAL_IP:PLAINTEXT,PUBLIC_IP:SASL_PLAINTEXT # advertised.listeners 的解析规则
      inter.broker.listener.name=INTERNAL_IP                          # broker 之间通信时采用的 listener ,默认为 PLAINTEXT
      sasl.enabled.mechanisms=PLAIN                                   # broker 启用哪些 SASL 认证机制,默认仅启用 GSSAPI
      sasl.mechanism.inter.broker.protocol=PLAIN                      # broker 之间通信时的 SASL 认证机制,默认为 GSSAPI
      sasl.mechanism.controller.protocol=PLAIN                        # controller 之间通信时的 SASL 认证机制,默认为 GSSAPI 。用 KRaft 取代 Zookeeper 时,需添加该配置参数
      authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer   # 选择一个提供 ACL 服务的 Java 类。Kafka v3 改为了 kafka.security.authorizer.AclAuthorizer
      super.users=User:broker;User:client                             # 将一些用户声明为超级用户
      
    2. 创建一个 jaas.conf 配置文件:

      KafkaServer {
          org.apache.kafka.common.security.plain.PlainLoginModule required
          # 指定当前 broker 连接其它 broker 时使用的用户
          username="broker"
          password="******"
          # 按 user_<NAME>=<PASSWORD> 的格式定义用户。在当前 broker 被其它 broker 或 client 连接时,允许对方使用这些用户
          user_broker="******"
          user_client="******";   # 注意这是一条语句,末尾要加分号
      };
      
    3. 将 jaas.conf 拷贝到每个 broker 的配置目录下,并添加 java 启动参数来启用它:

      export KAFKA_OPTS='-Djava.security.auth.login.config=/kafka/config/jaas.conf'
      

      执行 kafka-server-start.sh、kafka-console-producer.sh 等脚本时会自动应用该配置。

    4. 客户端连接 broker 时,需要在 producer.properties、consumer.properties 中加入配置:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
          username="client" \
          password="******";
      

      其中的账号密码也可以配置在 jaas.conf 中:

      KafkaClient {
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="client"
          password="******";
      };
      
  • 上述为 kafka 被其它 broker、client 连接时的身份认证。而 kafka 连接到 zk 时,也可启用 SASL 认证,配置方法见 zk 文档。

    • 另外建议在 kafka 的 server.properties 中加入:
      zookeeper.set.acl=true  # 对 Kafka 在 zk 中存储的数据启用 ACL 规则:允许被所有用户读取,但只允许被 Kafka 编辑
      

# KRaft

  • Kafka 原本使用 Zookeeper 维护 broker 系统。从 Kafka v3.5 开始弃用 Zookeeper ,请用户改用 Kafka 内置的 KRaft 协议。
    • Zookeeper 模式下,各个 broker 会将元数据写入 Zookeeper ,并选出一个 broker 担任 Controller ,独自管理整个 Kafka 集群。
    • KRaft 模式下,各个 broker 不再连接 Zookeeper ,而是由一些(建议 3 或 5 个) broker 担任 Controller ,按 KRaft 协议进行分布式系统的决策,从而管理整个 Kafka 集群。
    • 从 Zookeeper 改用 KRaft 的优点:简化 Kafka 集群的部署,大幅提高 broker 之间的通信效率,大幅提高管理 partition 的性能。

# 部署

重新部署一个采用 KRaft 模式的 Kafka 集群的流程:

  1. 为了与 Zookeeper 模式区分,Kraft 模式使用的配置文件位于 config/kraft/server.properties ,内容示例:
    node.id=1                               # 如果该 broker 担任 kraft 模式的 controller ,则必须设置 node.id
    process.roles=broker,controller         # 该 Kafka 进程在 kraft 模式下的角色,可以同时担任供生产者、消费组连接的 broker 、进行决策的 controller
    listeners=BROKER://:9092,CONTROLLER://:9093   # 监听 9092 端口供生产者、消费者连接,监听 9093 端口供其它 kraft 节点连接
    advertised.listeners=BROKER://10.0.0.1:9092
    inter.broker.listener.name=BROKER
    controller.quorum.voters=[email protected]:9093,[email protected]:9093,[email protected]:9093   # 声明所有 kraft controller 的 node.id、ip、port
    controller.listener.names=CONTROLLER    # controller 之间通信时采用的 listener ,不能与 inter.broker.listener.name 共用一个 listener
    listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
    
  2. 生成集群 ID :
    KAFKA_CLUSTER_ID=`bin/kafka-storage.sh random-uuid`
    
    初始化每个 broker 的 log.dirs 目录:
    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
    
    • Zookeeper 模式下,部署一个新的 Kafka 时,如果 log.dirs 目录为空,Kafka 会自动初始化。
    • KRaft 模式下,如果 log.dirs 目录为空,需要手动初始化,否则 Kafka 会启动失败,报错 INCONSISTENT_CLUSTER_ID 。取消自动初始化,是为了避免 Controller 在缺少数据的情况下做出决策。
    • 使用 bitnami/kafka 镜像时,默认启用 KRaft 模式,采用的配置文件是 config/kraft/server.properties 而不是 config/server.properties 。只需声明环境变量 KAFKA_KRAFT_CLUSTER_ID ,启动时会自动 format 数据目录。
  3. 启动 broker :
    bin/kafka-server-start.sh config/kraft/server.properties
    
  • 相关命令:
    bin/kafka-metadata-quorum.sh --bootstrap-server 127.0.0.1:9092 describe --status        # 查看 KRaft 状态
    bin/kafka-metadata-quorum.sh --bootstrap-server 127.0.0.1:9092 describe --replication   # 列出所有 Kafka 节点
    

# 从 ZK 迁移到 KRaft

如果已部署 Zookeeper 模式的旧 Kafka 集群,想升级到 Kraft 模式。则建议按蓝绿部署的方案,重新部署一个新 Kafka 集群,然后让客户端改用新 Kafka 。这样操作简单,只是需要确保旧 Kafka 中消息滞后量为 0 。

如果不支持蓝绿部署的方案,只能将旧 Kafka 滚动升级到 KRaft 模式,但操作复杂:

  1. 部署至少一个 KRaft 模式的 controller ,config/kraft/server.properties 配置文件示例:

    # 担任 KRaft 模式的 controller
    node.id=11    # Kraft 模式的 node.id 不能与 Zookpeer 模式的 broker.id 相同,因为共享一个 id 命名空间
    process.roles=controller
    listeners=CONTROLLER://:9093
    controller.quorum.voters=11@10.0.0.1:9093,[email protected]:9093,[email protected]:9093
    controller.listener.names=CONTROLLER
    inter.broker.listener.name=PLAINTEXT
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
    
    # 连接 zookeeper
    zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181
    
    # 开启迁移模式
    zookeeper.metadata.migration.enable=true
    
    • 需要查看旧 kafka 集群的 cluseter.id ,将它配置为 controller 的 KAFKA_KRAFT_CLUSTER_ID :
      grep cluster.id data/meta.properties
      
    • 该 kafka 节点不能同时担任 KRaft 模式的 broker 和 controller ,否则之后迁移时,会与 Zookeeper 模式的 broker 弄混。
    • controller 启动成功之后,会等待 broker 连接到它,每秒打印一条日志:Still waiting for ZK brokers [1, 2, 3] to register with KRaft
  2. 修改所有 broker 的 config/server.properties 配置文件并滚动重启,使得它们的 listener.name 与 controller 一致,从而相互通信。

    inter.broker.listener.name=PLAINTEXT    # 旧版 Kafka 的 listener.name 默认为 PLAINTEXT ,用户也可改为 BROKER 等名称
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
    
  3. 修改所有 broker 的 config/server.properties 配置文件,使它同时连接 Zookeeper 和 controller ,内容示例如下,然后以 v3.4.0 版本滚动重启所有 broker :

    # 保留 Zookeeper 模式的旧配置
    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://10.0.0.1:9092
    inter.broker.listener.name=PLAINTEXT
    zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181
    
    # 连接 controller
    controller.quorum.voters=11@10.0.0.1:9093
    controller.listener.names=CONTROLLER
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
    
    # 声明协议版本
    inter.broker.protocol.version=3.4
    
    # 开启迁移模式
    zookeeper.metadata.migration.enable=true
    
    • 使用 bitnami/kafka:3.4.0 镜像时,需要声明环境变量 KAFKA_ENABLE_KRAFT=false 。
    • 该 kafka 节点不能同时担任 KRaft 模式的 broker 和 controller ,否则启动时因为 log.dirs 目录非空,不能初始化。
    • 迁移模式下,
      • broker 依然工作在 Zookeeper 模式。
      • 滚动重启 broker 时,有的 broker 采用新的 inter.broker.protocol.version ,有的 broker 采用旧的 inter.broker.protocol.version ,因此不能相互通信。
      • 等所有 broker 都重启并连接到 KRaft controller ,才会开始迁移。
      • KRaft controller 会取代 Zookeeper 模式中原来的 controller 节点,因此有权读写 topic、offset 等元数据,将它们迁移到 KRaft 数据目录。
      • 即使停止迁移,Zookeeper 中记录的 controller 节点依然是 KRaft controller ,因此不支持从 KRaft 模式回滚到 Zookeeper 模式。
    • 迁移完成之后, controller 节点会打印一条日志:Completed migration of metadata from Zookeeper to KRaft
  4. 弃用 config/server.properties 这个旧配置文件,改用纯 KRaft 模式的 config/kraft/server.properties 配置文件,重启 broker 。

    node.id=1       # node.id 等于原来的 broker.id
    process.roles=broker
    listeners=PLAINTEXT://:9092
    advertised.listeners=PLAINTEXT://10.0.0.1:9092
    controller.quorum.voters=11@10.0.0.1:9093
    controller.listener.names=CONTROLLER
    inter.broker.listener.name=PLAINTEXT
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
    
  5. 删除 controller 配置文件中的 zookeeper.connect、zookeeper.metadata.migration.enable ,重启 controller 。

  6. 停止运行 Zookeeper 。