# 部署
# 版本
- Kafka 的版本列表 (opens new window)
- 例如 kafka_2.13-2.6.0.tgz ,前面的 2.13 是指 Scala 编译器的版本,后面的 2.6.0 是指 Kafka 的版本。
- 使用 Kafka 时,应该尽量让客户端与服务器的版本一致,避免不兼容。
- v0.10.0.0
- 于 2016 年发布。新增了 Kafka Streams 的 API ,用于流处理。
- v0.11.0.0
- 于 2017 年发布。改进了消息格式,支持给 message 添加 headers 。支持事务、幂等性。
- v1.0.0
- 于 2017 年发布。
- v1.1.0
- 于 2018 年发布。
- 改进了 Controller ,提高更改 leader replica 的效率,将 broker 正常终止的耗时从几分钟减少到几秒。
- v2.0.0
- 于 2018 年发布。
- v2.8.0
- 于 2021 年发布。
- 增加 KRaft 协议,但只是试用阶段。
- v3.0.0
- 于 2021 年发布。
- 生产者默认设置了 acks=all 和 enable.idempotence=true ,实现最强的交付保证。
- 消费者的配置参数 session.timeout.ms 默认值从 10s 增加到 45s 。
- 弃用配置参数 log.message.format.version 和 message.format.version ,停止兼容旧的消息格式 v0 和 v1 。
- v3.3.0
- 于 2022 年发布。
- KRaft 成为正式功能。
- v3.4.0
- 于 2023 年发布。
- 支持将已有的 Kafka 集群,从 Zookeeper 模式升级到 KRaft 模式。
- v3.5.0
- 于 2023 年发布。
# 部署
下载二进制版:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
解压后运行:
bin/zookeeper-server-start.sh config/zookeeper.properties # 启动 zookeeper 服务器 bin/kafka-server-start.sh config/server.properties # 启动 kafka broker 服务器
- 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
- Kafka 发行版包含了 zk 的可执行文件,可以同时启动 kafka、zk 服务器,也可以在其它地方启动 zk 服务器。
- Kafka 会尽快将生产的消息写入磁盘,因此 JVM 占用的内存一般不超过 6G , CPU 平均负载为 2~4 核。
- kafka-server-start.sh 中默认配置了以下环境变量,限制 JVM 占用的内存:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- 主机应该留出一些空闲内存,相当于 JVM 内存的 0.5~1 倍,用作 Page Cache ,提高 Kafka 读写磁盘的速度。
- 上述配置的 Kafka 单节点每分钟就能生产 1 百万条消息,部署多节点还能横向扩容。因此一般用户不会触及 Kafka 的性能瓶颈,常见的性能问题是 rebalance 频繁、消费者的消费速度慢。
- kafka-server-start.sh 中默认配置了以下环境变量,限制 JVM 占用的内存:
- 声明以下环境变量,即可让 broker 开启 JMX 监控,监听一个端口:
export JMX_PORT=9091 # export JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false"
- 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
或者用 docker-compose 部署:
version: '3' services: kafka: container_name: kafka image: bitnami/kafka:3.4.0 restart: unless-stopped # stop_grace_period: 3m # 终止 kafka 可能耗时几分钟,需要等待。否则强制终止 kafka 会来不及保存数据,导致重启时花更久时间恢复 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' # KAFKA_HEAP_OPTS: -Xmx2G -Xms2G ports: - 9092:9092 # volumes: # - ./config:/bitnami/kafka/config # - ./data:/bitnami/kafka/data
- Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
- 该镜像会根据环境变量配置 server.properties 文件,这里直接挂载配置目录,通过 CUSTOM_INIT_SCRIPT 执行命令还原配置文件。
- Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
# 配置
- kafka 的配置目录示例:
config/ ├── consumer.properties # 消费者的配置文件 ├── log4j.properties # Java 日志的配置文件 ├── producer.properties # 生产者的配置文件 ├── server.properties # broker 的配置文件 └── zookeeper.properties # zk 的配置文件
- 启动 broker 时只需读取 server.properties、log4j.properties 文件。
# server.properties
配置示例:
# 关于 kafka 集群
broker.id=1 # 该 broker 在 Kafka 集群中的唯一标识符,默认为 -1 ,必须赋值为一个非负整数
listeners=PLAINTEXT://0.0.0.0:9092 # broker 监听的 Socket 地址
advertised.listeners=PLAINTEXT://10.0.0.1:9092 # 当前 broker 供其它 broker 和客户端访问的地址,它会登记到 zk 中。默认采用 listeners 的值
# 关于 zk
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181 # 要连接的 zk 节点,多个地址之间用逗号分隔
# zookeeper.connection.timeout.ms=6000
# 保存数据日志
log.dirs=/data/kafka-logs # broker 存放数据日志的目录,如果有多个目录则用逗号分隔
# log.roll.ms=... # 这是 kafka 旧版的配置参数,后来改名为 segment.ms
# segment.ms=604800000 # 每个 LogSegment 文件的最长写入时间,超过该值则会关闭该文件,然后创建一个新的 LogSegment 用于写入。默认为 7d
# segment.bytes=1073741824 # 每个 LogSegment 的最大体积,超过该值则会关闭该文件,然后创建一个新的 LogSegment 用于写入。默认为 1G
# log.flush.interval.messages=null # 每个 partition ,每接收多少条消息就 flush 一次,即将内存中的数据写入磁盘
# log.flush.interval.ms=null # 每个 partition ,每经过多少毫秒就 flush 一次
# 默认由 broker 自动控制何时 flush ,效率更高。而且每个 partition 存在 replication ,也不怕 broker 突然故障,未将内存中的数据写入磁盘
# log.flush.offset.checkpoint.interval.ms=60000 # 每隔多少毫秒,刷新一次 checkpoint
# 清理数据日志
# log.cleanup.policy=delete # LogSegment 文件的清理策略(只会清理已关闭的 LogSegment ,不会清理当前打开的 LogSegment)
# 默认为 delete ,删除保存时长超过 log.retention.hours 的 LogSegment
# 可改为 compact ,这会将旧 LogSegment 中的数据拷贝到新 LogSegment 中,并且对于各种取值的 key 只保留最新的一条 message ,删除之前的 message
# 可改为 delete,compact ,同时采用两种策略
# log.retention.check.interval.ms=300000 # 每隔多久检查一次所有 LogSegment ,如果超过 log.retention.bytes、log.retention.hours 阈值则清理。默认为 5min
log.retention.bytes=10737418240 # 如果一个 partition 占用的磁盘空间超过该值,则清理其中的旧 LogSegment 。默认为 -1 ,即不限制
# 假设配置 log.retention.bytes=1G、num.partitions=6、default.replication.factor=2 ,则一个 topic 理论上最多占用 12G 磁盘,实际上可能超过一些才清理
log.retention.hours=24 # 如果一个 LogSegment 文件的 modify time 距今超过该时长,则清理该 LogSegment 。默认为 7d 。如果设置为 -1 ,则不限制
# 如果一条消息的保存时长超过 log.retention.hours ,但所在的 LogSegment 尚未关闭,则该消息不会被清理
# 综上,如果想更及时地清理 kafka 消息,则建议减小 segment.ms 或 segment.bytes ,从而创建更小粒度的 LogSegment
# log.retention.minutes=null # 默认未设置,采用 log.retention.hours 的值
# log.retention.ms=null # 默认未设置,采用 log.retention.minutes 的值。
# delete.retention.ms=86400000 # 默认为 24h 。采用 log.cleanup.policy=compact 时,会将 tombstone 消息至少保留该时长才删除
# 关于 topic
# auto.create.topics.enable=true # 当客户端生产、消费一个不存在的 topic 时,是否自动创建该 topic
# delete.topic.enable=true # 是否允许删除 topic
# compression.type=producer # broker 将消息存储到磁盘时的压缩格式。取值为 producer 则采用与生产者相同的压缩格式
message.max.bytes=10485760 # 允许接收的最大 batch.size ,默认为 1M ,这里设置为 10M 。该参数作用于所有 topic ,也可以对每个 topic 分别设置 max.message.bytes
# 关于 partition
num.partitions=6 # 新建 topic 的默认 partition 数,默认为 3
default.replication.factor=2 # 新建 partition 的默认副本数,默认为 1
# min.insync.replicas=1 # 当生产者配置了 acks=all 时,新消息至少写入多少个 replica 才算成功,否则生产者抛出异常 NotEnoughReplicas
# auto.leader.rebalance.enable=true # 是否自动进行 partition rebalance
# leader.imbalance.check.interval.seconds=300 # Controller 每隔多久检查一次是否执行 partition rebalance
# leader.imbalance.per.broker.percentage=10 # 如果该 broker 上的非 preferred leader 超过该百分比,则进行 partition rebalance
# 关于 __consumer_offsets
# offsets.topic.num.partitions=50 # __consumer_offsets 主题的 partition 数
# offsets.topic.replication.factor=3 # __consumer_offsets 主题的每个 partition 的副本数。部署单节点 Kafka 时必须减至 1
# offsets.retention.minutes=10080 # kafka v2.0 将该参数的默认值从 1d 改为 7d
# 如果一个 consumer group 超过该时长未提交新的 offset ,并且该 consumer group 当前未在线,或未订阅相关 topic ,则从 __consumer_offsets 中删除该 consumer group 的 offset
# 关于进程
# background.threads=10 # broker 处理后台任务的线程数
# num.network.threads=3 # broker 处理网络请求的线程数
# num.io.threads=8 # broker 处理磁盘 IO 的线程数,应该不小于磁盘数
# 关于网络
session.timeout.ms=60000 # 会话超时,默认为 10s 。如果在该时长内 broker 一直没有收到 consumer 的 heartbeat 请求,则认为 consumer 下线
# heartbeat.interval.ms=3000 # broker 等待 heartbeat 请求的超时时间。通常设置为 session.timeout.ms 的 1/3 ,以允许超时两次
# socket.send.buffer.bytes=102400 # socket 发送消息的缓冲区大小,默认为 100K
# socket.receive.buffer.bytes=102400 # socket 接收消息的缓冲区大小,默认为 100K 。网络延迟高于 1ms 时,增加 buffer 有利于提高传输效率,但会占用更多内存
# socket.request.max.bytes=104857600 # socket 允许接收的单条消息的最大大小,默认为 100M
- default.replication.factor 建议设置为 2 或 3 。
- 副本数大于 1 时能提高可用性。假设每个 partition 的副本数为 n ,则 Kafka 集群最多允许 n-1 个 broker 故障。
- 副本数最多允许跟 broker 数一样大,但副本数较多时,会线性增加读写磁盘的数据量,也会占用更多磁盘空间。
- num.partitions 建议设置为 6 等公倍数,当消费组包含 1、2、3 个消费者时都可以平均分配。
- 分区数与 broker 数没关系,只是限制了一个消费组中同时工作的消费者数量。
- 假设 num.partitions 为 2 ,则每个消费组中同时只能有 ≤2 个消费者工作,并发数太少,因此消费速度慢。
- 增加分区数,可以允许一个消费组中有更多消费组并发消费,从而线性提高消费速度。
- 但分区数太多会降低 broker 的性能。
- 每当一个分区被生产者、消费者使用时,broker 需要打开该分区在磁盘的几个数据文件,因此打开的文件描述符变多,占用更多内存。
- 每个分区的状态变化时,broker 都需要实时同步到 zk ,导致 zk 负载变大。严重情况下,会导致生产、消费消息的延迟增加,吞吐量降低。
- Kafka v1.1.0 改进了 Controller ,每个 broker 建议最多存储 4k 个 partition(包括副本数),每个集群最多 20w 个。
- 如果用 KRaft 协议取代 zookeeper ,则可以管理数百万的 partition 。
- 但分区数太多会降低 broker 的性能。
- 分区数与 broker 数没关系,只是限制了一个消费组中同时工作的消费者数量。
# producer.properties
配置示例:
bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092 # 初始连接的 broker 地址。先通过它获取所有 broker 的 advertised.listeners 地址,再实际连接
# client.id='' # 客户端的 ID
# acks=1 # 判断消息发送成功的策略,这决定了 Kafka 的数据可靠性
# 取值为 0 ,则不等待 broker 的回复,直接认为消息已发送成功
# 取值为 1 ,则等待 leader replica 确认接收消息
# 取值为 all ,则等待消息被同步到所有 replica
# retries=2147483647 # 发送消息失败时,如果不超过 delivery.timeout.ms 时长,则最多尝试重发多少次
# enable.idempotence=false # 是否在生产消息时保证幂等性,避免重复生产一条消息。如果启用该功能,需要设置 acks=all 且 retries 大于 0
# batch.size=16384 # 限制每个 batch 的大小,默认为 16K
# buffer.memory=33554432 # 限制生产者用于发送消息的缓冲区大小,默认为 32M
# compression.type=none # 发送消息时,batch 采用的压缩格式。默认不压缩,建议采用 lz4
max.request.size=10485760 # 限制生产者向 broker 发送的每个请求的大小,这限制了每个请求包含的 batch (压缩之后)数量。默认为 1M ,这里设置为 10M
# max.block.ms=60000 # 生产者调用 send() 等方法时,如果 buffer.memory 不足或 metadata 获取不到,阻塞等待的超时时间
# linger.ms=0 # 生产者创建每个 batch 时,等待多久才发送。调大该值,有助于让每个 batch 包含更多消息
# request.timeout.ms=30000 # 发送请求给 broker 时,等待响应的超时时间
# delivery.timeout.ms=120000 # 生产者调用 send() 方法发送消息的超时时间,该值应该不低于 linger.ms + request.timeout.ms
producer 向同一个 partition 生产多条消息时,会等内存中缓冲了一定数量之后,才打包为一批(batch)发送。
- 如果生产的第一条消息就超过 batch.size 限制,则依然会打包成一个 batch ,但只包含该消息。
- Kafka 生产消息、消费消息、同步 replica 时,都会将消息打包成 batch 再传输,从而提高传输效率。
- 每个 batch 可能包含 1~N 条消息,每次网络请求可能传输 0~N 个 batch 。
batch.size 不能大于 producer 的 max.request.size ,否则过大的 batch 不能装载到请求中。
- 不能大于 broker 的 message.max.bytes ,否则请求不能被接收。
- 不能大于 consumer 的 fetch.max.bytes ,否则不能被消费。
Kafka 支持将生产的消息进行压缩。
- 一般流程:
- producer 将消息 batch 以某种压缩格式发送。
- 增加 batch.size 有利于增加重复的消息内容,加强压缩率。
- broker 接收消息,默认以同种压缩格式存储。
- consumer 拉取消息,自动识别其压缩格式,然后解压。
- producer 将消息 batch 以某种压缩格式发送。
- 优点:
- 减少消息占用的磁盘空间。
- 如果 Kafka 吞吐量不高,则启用压缩之后,每个 batch 的体积更小,因此会减少网络 IO 量、传输耗时。
- 如果 Kafka 吞吐量达到瓶颈,则启用压缩之后,每个 batch 的体积不变,但能打包更多条消息,因此会提高 Kafka 吞吐量。
- 缺点:
- 因为压缩和解压,客户端会增加一点 CPU 负载,通常可忽略不计。
- Kafka 支持多种压缩格式:
none gzip lz4 snappy zstd
- none 格式表示不压缩。虽然没有压缩、解压的耗时,但消息体积大,通过网络传输的耗时久。
- 用 Nginx 日志作为 Kafka 消息来测试, gzip 压缩率大概为 5% 。
- 比较压缩消息的 CPU 耗时:
gzip >> zstd > snappy > lz4 > none
。 - 比较消息体积:
none >> lz4 > snappy > zstd > gzip
。
- 一般流程:
# consumer.properties
配置示例:
bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092
# group.id=null # 消费者组的名称
# group.instance.id=null # 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id
# allow.auto.create.topics=false # 订阅或主动分配 topic 时,如果该 topic 不存在,是否自动创建
# auto.offset.reset=latest # 如果 Coordinator 没有记录 Consumer Committed Offset (可能是未创建、已过期删除),则默认从哪个 offset 处开始消费
# 可选的取值:
# earliest : 采用 partition 可见范围内最早的 offset
# latest : 采用 partition 最新的 offset ,即 High Watemark
# none :让 consumer 抛出异常
# enable.auto.commit=true # 是否自动在后台提交 Consumer Committed Offset 。可以关闭该功能,由用户主动提交,更可靠
# auto.commit.interval.ms=5000 # 自动提交的间隔时长
# max.poll.records=500 # consumer 每次调用 poll() 方法,最多拉取多少条消息。这会影响、但不决定 fetch 请求的数量
# max.poll.interval.ms=300000 # consumer 每次调用 poll() 方法,最多间隔多久就应该再次调用 poll() 。如果超时,则 broker 认为 consumer 下线,触发 rebalance
# max.partition.fetch.bytes=1048576 # 每次发出 fetch 请求时,从每个 partition 最多获取多少数据。默认为 1M 。如果获取的第一条消息就超过该限制,则只返回该消息
# fetch.min.bytes=1 # 每次发出 fetch 请求时,broker 应该至少累积多少数据才返回响应
# fetch.max.wait.ms=500 # 每次发出 fetch 请求时,broker 如果没有满足 fetch.min.bytes 的数据,则最多等待指定时长就返回响应
# fetch.max.bytes=57671680 # 每次发出 fetch 请求时,预期 broker 最多返回的数据量。默认为 55M
- 生产者、消费者都有 client.id 标识符,允许重复。消费者还有 group.id 标识符。
- 用户调用 consumer 的 poll() 方法,即可消费消息。
- consumer 会在后台向 broker 发送 fetch 请求,拉取数据,暂存到内存中的队列中,直到用户消费之后才删除。
- consumer 占用的最大内存大概为 max.partition.fetch.bytes * partition_count ,或者 fetch.min.bytes * broker_count 。
# SASL
Kafka broker 支持通过 JAAS 框架启用 SASL 认证。
- 默认不要求身份认证,可以被其它 broker、client 直接连接,因此不安全。
- 可启用以下 SASL 认证机制:
- PLAIN
- GSSAPI (Kerberos)
- OAUTHBEARER
- SCRAM-SHA-256
对于通过身份认证的用户,Kafka 支持配置 ACL 规则,控制用户的访问权限。
- 默认的 ACL 规则为空,因此不允许用户访问任何资源,除非是 super 用户。
Kafka 传输数据时默认未加密,称为 PLAINTEXT 方式。
- 可以启用 SSL 加密通信,但会增加通信延迟。
- 是否启用 SSL ,与是否启用 SASL 认证无关。
- 注意 PLAIN 是一种 SASL 认证机制,而 PLAINTEXT 是指数据未加密传输。
例:启用 PLAIN 认证
修改
config/server.properties
配置文件,将通信协议从默认的PLAINTEXT://
改为SASL_PLAINTEXT://
,即采用 SASL 认证 + 未加密通信。listeners=INTERNAL_IP://0.0.0.0:9092,PUBLIC_IP://0.0.0.0:9093 # 定义两个 listener 。当客户端通过某个 listener 连接时,就采用相应的 advertised.listeners advertised.listeners=INTERNAL_IP://10.0.0.1:9092,PUBLIC_IP://1.1.1.1:9093 listener.security.protocol.map=INTERNAL_IP:PLAINTEXT,PUBLIC_IP:SASL_PLAINTEXT # advertised.listeners 的解析规则 inter.broker.listener.name=INTERNAL_IP # broker 之间通信时采用的 listener ,默认为 PLAINTEXT sasl.enabled.mechanisms=PLAIN # broker 启用哪些 SASL 认证机制,默认仅启用 GSSAPI sasl.mechanism.inter.broker.protocol=PLAIN # broker 之间通信时的 SASL 认证机制,默认为 GSSAPI sasl.mechanism.controller.protocol=PLAIN # controller 之间通信时的 SASL 认证机制,默认为 GSSAPI 。用 KRaft 取代 Zookeeper 时,需添加该配置参数 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 选择一个提供 ACL 服务的 Java 类。Kafka v3 改为了 kafka.security.authorizer.AclAuthorizer super.users=User:broker;User:client # 将一些用户声明为超级用户
创建一个 jaas.conf 配置文件:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required # 指定当前 broker 连接其它 broker 时使用的用户 username="broker" password="******" # 按 user_<NAME>=<PASSWORD> 的格式定义用户。在当前 broker 被其它 broker 或 client 连接时,允许对方使用这些用户 user_broker="******" user_client="******"; # 注意这是一条语句,末尾要加分号 };
将 jaas.conf 拷贝到每个 broker 的配置目录下,并添加 java 启动参数来启用它:
export KAFKA_OPTS='-Djava.security.auth.login.config=/kafka/config/jaas.conf'
执行 kafka-server-start.sh、kafka-console-producer.sh 等脚本时会自动应用该配置。
客户端连接 broker 时,需要在 producer.properties、consumer.properties 中加入配置:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="client" \ password="******";
其中的账号密码也可以配置在 jaas.conf 中:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="******"; };
上述为 kafka 被其它 broker、client 连接时的身份认证。而 kafka 连接到 zk 时,也可启用 SASL 认证,配置方法见 zk 文档。
- 另外建议在 kafka 的 server.properties 中加入:
zookeeper.set.acl=true # 对 Kafka 在 zk 中存储的数据启用 ACL 规则:允许被所有用户读取,但只允许被 Kafka 编辑
- 另外建议在 kafka 的 server.properties 中加入:
# KRaft
- Kafka 原本使用 Zookeeper 维护 broker 系统,从 Kafka v3.4 开始弃用 Zookeeper ,改用 Kafka 内置的 KRaft 协议。
- Zookeeper 模式下,各个 broker 会将元数据写入 Zookeeper ,并选出一个 broker 担任 Controller ,独自管理整个 Kafka 集群。
- KRaft 模式下,各个 broker 不再连接 Zookeeper ,而是由一些(建议 3 或 5 个) broker 担任 Controller ,按 KRaft 协议进行分布式系统的决策,从而管理整个 Kafka 集群。
- 从 Zookeeper 改用 KRaft 的优点:简化 Kafka 集群的部署,大幅提高 broker 之间的通信效率,大幅提高管理 partition 的性能。
# 部署
重新部署一个采用 KRaft 模式的 Kafka 集群的流程:
- 为了与 Zookeeper 模式区分,Kraft 模式使用的配置文件位于
config/kraft/server.properties
,内容示例:node.id=1 # 如果该 broker 担任 kraft 模式的 controller ,则必须设置 node.id process.roles=broker,controller # 该 Kafka 进程在 kraft 模式下的角色,可以同时担任供生产者、消费组连接的 broker 、进行决策的 controller listeners=BROKER://:9092,CONTROLLER://:9093 # 监听 9092 端口供生产者、消费者连接,监听 9093 端口供其它 kraft 节点连接 advertised.listeners=BROKER://10.0.0.1:9092 inter.broker.listener.name=BROKER controller.quorum.voters=[email protected]:9093,[email protected]:9093,[email protected]:9093 # 声明所有 kraft controller 的 node.id、ip、port controller.listener.names=CONTROLLER # controller 之间通信时采用的 listener ,不能与 inter.broker.listener.name 共用一个 listener listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT
- 生成集群 ID :初始化每个 broker 的 log.dirs 目录:
KAFKA_CLUSTER_ID=`bin/kafka-storage.sh random-uuid`
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
- Zookeeper 模式下,部署一个新的 Kafka 时,如果 log.dirs 目录为空,Kafka 会自动初始化。
- KRaft 模式下,如果 log.dirs 目录为空,需要手动初始化,否则 Kafka 会启动失败,报错 INCONSISTENT_CLUSTER_ID 。取消自动初始化,是为了避免 Controller 在缺少数据的情况下做出决策。
- 使用 bitnami/kafka 镜像时,默认启用 KRaft 模式,采用的配置文件是
config/kraft/server.properties
而不是config/server.properties
。只需声明环境变量 KAFKA_KRAFT_CLUSTER_ID ,启动时会自动 format 数据目录。
- 启动 broker :
bin/kafka-server-start.sh config/kraft/server.properties
- 相关命令:
bin/kafka-metadata-quorum.sh --bootstrap-server 127.0.0.1:9092 describe --status # 查看 KRaft 状态 bin/kafka-metadata-quorum.sh --bootstrap-server 127.0.0.1:9092 describe --replication # 列出所有 Kafka 节点
# 从 ZK 迁移到 KRaft
如果已部署 Zookeeper 模式的旧 Kafka 集群,想升级到 Kraft 模式。则建议按蓝绿部署的方案,重新部署一个新 Kafka 集群,然后让客户端改用新 Kafka 。这样操作简单,只是需要确保旧 Kafka 中消息滞后量为 0 。
如果不支持蓝绿部署的方案,只能将旧 Kafka 滚动升级到 KRaft 模式,但操作复杂:
部署至少一个 KRaft 模式的 controller ,
config/kraft/server.properties
配置文件示例:# 担任 KRaft 模式的 controller node.id=11 # Kraft 模式的 node.id 不能与 Zookpeer 模式的 broker.id 相同,因为共享一个 id 命名空间 process.roles=controller listeners=CONTROLLER://:9093 controller.quorum.voters=11@10.0.0.1:9093,[email protected]:9093,[email protected]:9093 controller.listener.names=CONTROLLER inter.broker.listener.name=PLAINTEXT listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT # 连接 zookeeper zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181 # 开启迁移模式 zookeeper.metadata.migration.enable=true
- 需要查看旧 kafka 集群的 cluseter.id ,将它配置为 controller 的 KAFKA_KRAFT_CLUSTER_ID :
grep cluster.id data/meta.properties
- 该 kafka 节点不能同时担任 KRaft 模式的 broker 和 controller ,否则之后迁移时,会与 Zookeeper 模式的 broker 弄混。
- controller 启动成功之后,会等待 broker 连接到它,每秒打印一条日志:
Still waiting for ZK brokers [1, 2, 3] to register with KRaft
- 需要查看旧 kafka 集群的 cluseter.id ,将它配置为 controller 的 KAFKA_KRAFT_CLUSTER_ID :
修改所有 broker 的
config/server.properties
配置文件并滚动重启,使得它们的 listener.name 与 controller 一致,从而相互通信。inter.broker.listener.name=PLAINTEXT # 旧版 Kafka 的 listener.name 默认为 PLAINTEXT ,用户也可改为 BROKER 等名称 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
修改所有 broker 的
config/server.properties
配置文件,使它同时连接 Zookeeper 和 controller ,内容示例如下,然后以 v3.4.0 版本滚动重启所有 broker :# 保留 Zookeeper 模式的旧配置 broker.id=1 listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://10.0.0.1:9092 inter.broker.listener.name=PLAINTEXT zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181 # 连接 controller controller.quorum.voters=11@10.0.0.1:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT # 声明协议版本 inter.broker.protocol.version=3.4 # 开启迁移模式 zookeeper.metadata.migration.enable=true
- 使用 bitnami/kafka:3.4.0 镜像时,需要声明环境变量 KAFKA_ENABLE_KRAFT=false 。
- 该 kafka 节点不能同时担任 KRaft 模式的 broker 和 controller ,否则启动时因为 log.dirs 目录非空,不能初始化。
- 迁移模式下,
- broker 依然工作在 Zookeeper 模式。
- 滚动重启 broker 时,有的 broker 采用新的 inter.broker.protocol.version ,有的 broker 采用旧的 inter.broker.protocol.version ,因此不能相互通信。
- 等所有 broker 都重启并连接到 KRaft controller ,才会开始迁移。
- KRaft controller 会取代 Zookeeper 模式中原来的 controller 节点,因此有权读写 topic、offset 等元数据,将它们迁移到 KRaft 数据目录。
- 即使停止迁移,Zookeeper 中记录的 controller 节点依然是 KRaft controller ,因此不支持从 KRaft 模式回滚到 Zookeeper 模式。
- 迁移完成之后, controller 节点会打印一条日志:
Completed migration of metadata from Zookeeper to KRaft
弃用
config/server.properties
这个旧配置文件,改用纯 KRaft 模式的config/kraft/server.properties
配置文件,重启 broker 。node.id=1 # node.id 等于原来的 broker.id process.roles=broker listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://10.0.0.1:9092 controller.quorum.voters=11@10.0.0.1:9093 controller.listener.names=CONTROLLER inter.broker.listener.name=PLAINTEXT listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
删除 controller 配置文件中的 zookeeper.connect、zookeeper.metadata.migration.enable ,重启 controller 。
停止运行 Zookeeper 。