# 部署

# 版本

  • Kafka 的版本列表 (opens new window)
    • 例如 kafka_2.13-2.6.0.tgz ,前面的 2.13 是指 Scala 编译器的版本,后面的 2.6.0 是指 Kafka 的版本。
    • 使用 Kafka 时,应该尽量让客户端与服务器的版本一致,避免不兼容。
  • v0.10.0.0
    • 于 2016 年发布。新增了 Kafka Streams API ,用于流处理。
  • v0.11.0.0
    • 于 2017 年发布。改进了消息格式。支持事务、幂等性。
  • v1.0.0
    • 于 2017 年发布。
  • v1.1.0
    • 于 2018 年发布。
    • 改进了 Controller ,提高更改 leader replica 的效率,将 broker 正常终止的耗时从几分钟减少到几秒。
  • v2.0.0
    • 于 2018 年发布。
  • v2.8.0
    • 于 2021 年发布。
    • 增加 KRaft 协议,但只是试用阶段。
  • v3.0.0
    • 于 2021 年发布。
    • 生产者默认设置了 acks=all 和 enable.idempotence=true ,实现最强的交付保证。
    • 消费者的配置参数 session.timeout.ms 默认值从 10s 增加到 45s 。
    • 弃用配置参数 log.message.format.version 和 message.format.version ,停止兼容旧的消息格式 v0 和 v1 。
  • v3.3.0
    • 于 2022 年发布。
    • KRaft 成为正式功能。
      • 计划在 Kafka v3.4 弃用 zookeeper 模式。在 Kafka v3.5 支持将 broker 数据从 zookeeper 模式升级到 KRaft 模式。

# 部署

  • 下载二进制版:

    wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
    

    解压后运行:

    bin/zookeeper-server-start.sh config/zookeeper.properties   # 启动 zookeeper 服务器
    bin/kafka-server-start.sh     config/server.properties      # 启动 kafka broker 服务器
    
    • 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
      • Kafka 发行版包含了 zk 的可执行文件,可以同时启动 kafka、zk 服务器,也可以在其它地方启动 zk 服务器。
    • Kafka 会尽快将生产的消息写入磁盘,因此 JVM 占用的内存一般不超过 6G , CPU 平均负载为 2~4 核。
      • kafka-server-start.sh 中默认配置了以下环境变量,限制 JVM 占用的内存:
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        
      • 主机应该留出一些空闲内存,相当于 JVM 内存的 0.5~1 倍,用作 Page Cache ,提高 Kafka 读写磁盘的速度。
      • 上述配置的单节点 Kafka ,每分钟最多能生产 1 百万条消息。
    • 声明以下环境变量,即可让 broker 开启 JMX 监控,监听一个端口:
      export JMX_PORT=9093
      # export JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false"
      
  • 或者用 docker-compose 部署:

    version: '3'
    
    services:
      kafka:
        container_name: kafka
        image: bitnami/kafka:2.8.0
        restart: unless-stopped
        stop_grace_period: 1m     # 终止 broker 时可能要花几分钟时间,强制杀死则会来不及保存数据
        environment:
          # KAFKA_HEAP_OPTS: -Xmx1G -Xms1G
          ALLOW_PLAINTEXT_LISTENER: 'yes'
        ports:
          - 9092:9092
        volumes:
          - ./config:/bitnami/kafka/config
          - ./data:/bitnami/kafka/data
    
    • Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
      • 该镜像会根据环境变量配置 server.properties 文件,这里直接挂载配置目录,通过 CUSTOM_INIT_SCRIPT 执行命令还原配置文件。

# 配置

  • kafka 的配置目录示例:
    config/
    ├── consumer.properties       # 消费者的配置文件
    ├── log4j.properties          # Java 日志的配置文件
    ├── producer.properties       # 生产者的配置文件
    ├── server.properties         # broker 的配置文件
    └── zookeeper.properties      # zk 的配置文件
    
    • 启动 broker 时只需读取 server.properties、log4j.properties 文件。

# server.properties

配置示例:

# 关于 kafka 集群
broker.id=0                               # 该 broker 在 Kafka 集群中的唯一标识符,默认为 -1 ,必须赋值为一个非负整数
listeners=PLAINTEXT://0.0.0.0:9092        # broker 监听的 Socket 地址
advertised.listeners=PLAINTEXT://10.0.0.1:9092  # 当前 broker 供其它 broker 和客户端访问的地址。它会在 zk 中公布,默认采用 listeners 的值

# 关于 zk
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181   # 要连接的 zk 节点,多个地址之间用逗号分隔
# zookeeper.connection.timeout.ms=6000

# 保存数据日志
log.dirs=/data/kafka-logs                 # broker 存放数据日志的目录,如果有多个目录则用逗号分隔
# log.roll.ms=null                        # 每个 LogSegment 的最长写入时间,超过该值则会创建一个新的 LogSegment 用于写入。默认未设置,采用 log.roll.hours 的值
# log.roll.hours=168                      # 默认为 7*24h
# log.segment.bytes=1073741824            # 每个 LogSegment 的最大体积,超过该值则会创建一个新的 LogSegment 用于写入。默认为 1G
# log.flush.interval.messages=null        # 每个日志分区,每接收多少个消息就 flush 一次,即将内存中的数据写入磁盘
# log.flush.interval.ms=null              # 每个日志分区,每经过多少毫秒就 flush 一次
# log.flush.offset.checkpoint.interval.ms=60000 # 每隔多少毫秒,刷新一次 checkpoint

# 清理数据日志
# log.cleanup.policy=delete               # LogSegment 的清理策略,可以是 delete、compact
# log.retention.check.interval.ms=300000  # 每隔多久检查一次各个 LogSegment 是否应该清理。默认为 5min
log.retention.bytes=10737418240           # 限制单个 partition 的大小,超过则删除其中最旧的 LogSegment 。默认为 -1 ,即不限制
# log.retention.ms=null                   # 限制单个 LogSegment 的保存时长,超过则删除。默认未设置,采用 log.retention.minutes 的值。如果设置为 -1 ,则不限制
# log.retention.minutes=null              # 默认未设置,采用 log.retention.hours 的值
log.retention.hours=24                    # 默认为 7*24h

# 关于 topic
# auto.create.topics.enable=true          # 当客户端生产、消费一个不存在的 topic 时,是否自动创建该 topic
# delete.topic.enable=true                # 是否允许删除 topic
# compression.type=producer               # broker 对消息的压缩格式。取值为 producer 则采用与生产者相同的压缩格式
message.max.bytes=10485760                # 允许接收的最大 batch.size ,默认为 1M ,这里设置为 10M 。该参数作用于所有 topic ,也可以对每个 topic 分别设置 max.message.bytes

# 关于 partition
num.partitions=6                          # 新建 topic 的默认 partition 数,默认为 3
default.replication.factor=2              # 新建 partition 的默认副本数,默认为 1
# offsets.topic.num.partitions=50         # __consumer_offsets 主题的 partition 数
# offsets.topic.replication.factor=3      # __consumer_offsets 主题的每个 partition 的副本数。部署单节点时必须减至 1
# auto.leader.rebalance.enable=true           # 是否自动进行 Partition Rebalance
# leader.imbalance.check.interval.seconds=300 # Controller 每隔多久检查一次是否执行 Partition Rebalance
# leader.imbalance.per.broker.percentage=10   # 如果该 broker 上的非 preferred leader 超过该百分比,则进行 Partition Rebalance

# 关于进程
# background.threads=10                   # broker 处理后台任务的线程数
# num.network.threads=3                   # broker 处理网络请求的线程数
# num.io.threads=8                        # broker 处理磁盘 IO 的线程数,应该不小于磁盘数

# 关于网络
session.timeout.ms=60000                  # 会话超时,默认为 10s 。如果在该时长内 broker 一直没有收到 consumer 的 heartbeat 请求,则认为 consumer 下线
# heartbeat.interval.ms=3000              # broker 等待 heartbeat 请求的超时时间。通常设置为 session.timeout.ms 的 1/3 ,以允许超时两次
# socket.send.buffer.bytes=102400         # socket 发送消息的缓冲区大小,默认为 100K
# socket.receive.buffer.bytes=102400      # socket 接收消息的缓冲区大小,默认为 100K 。网络延迟高于 1ms 时,增加 buffer 有利于提高传输效率,但会占用更多内存
# socket.request.max.bytes=104857600      # socket 允许接收的单个消息的最大大小,默认为 100M
  • 建议 default.replication.factor 设置为 2 或 3 。
    • 副本数大于 1 时可提高可用性。假设每个 partition 的副本数为 n ,则 Kafka 集群最多允许 n-1 个 broker 故障。
    • 副本数最多允许跟 broker 数一样大,但副本数较多时,会线性增加读写磁盘的数据量,也会占用更多磁盘空间。
  • num.partitions 建议设置为 6 等公倍数,当消费组包含 1、2、3 个消费者时都可以平均分配。
    • 分区数与 broker 数没关系,只是限制了一个消费组中同时工作的消费者数量。
      • 假设 num.partitions 为 2 ,则每个消费组中同时只能有 ≤2 个消费者工作,并发数太少,因此消费速度慢。
    • 增加分区数,可以允许一个消费组中有更多消费组并发消费,从而线性提高消费速度。
      • 但分区数太多会降低 broker 的性能。
        • 每当一个分区被生产者、消费者使用时,broker 需要打开该分区在磁盘的几个数据文件,因此打开的文件描述符变多,占用更多内存。
        • 每个分区的状态变化时,broker 都需要实时同步到 zk ,导致 zk 负载变大。严重情况下,会导致生产、消费消息的延迟增加,吞吐量降低。
      • Kafka v1.1.0 改进了 Controller ,每个 broker 建议最多存储 4k 个 partition(包括副本数),每个集群最多 20w 个。
      • 如果用 KRaft 协议取代 zookeeper ,则可以管理数百万的 partition 。
  • Kafka 会根据 log.retention 配置,自动清理 topic 中的消息。
    • 即使没有一条消息,也不会删除该 topic 。
    • 如果客户端不取消订阅 topic ,则 Kafka 会一直记录 offset 信息。
  • 改用以下配置时,会启用 KRaft 模式:
    node.id=1                             # 如果该 broker 担任 kraft 模式的 controller ,则必须设置 node.id
    process.roles=broker,controller       # 该 Kafka 进程在 kraft 模式下的角色,可以同时担任供生产者、消费组连接的 broker 、进行决策的 controller
    inter.broker.listener.name=PLAINTEXT
    controller.listener.names=CONTROLLER
    listeners=PLAINTEXT://:9092,CONTROLLER://:9093   # 监听 9092 端口供生产者、消费者连接,监听 9093 端口供其它 kraft 节点连接
    log.dirs=/data/kraft-logs
    listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    controller.quorum.voters=[email protected]:9093,[email protected]:9093,[email protected]:9093   # 声明所有 kraft controller 的 node.id、ip、port
    

# producer.properties

配置示例:

bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092   # 初始连接的 broker 地址。先通过它获取所有 broker 的 advertised.listeners 地址,再实际连接
# client.id=''                  # 客户端的 ID

# acks=1                        # 判断消息发送成功的策略
  # 取值为 0 ,则不等待 broker 的回复,直接认为消息已发送成功
  # 取值为 1 ,则等待 leader replica 确认接收消息
  # 取值为 all ,则等待消息被同步到所有 replica
# retries=2147483647            # 发送消息失败时,如果不超过 delivery.timeout.ms 时长,则最多尝试重发多少次
# enable.idempotence=false      # 是否在生产消息时保证幂等性,避免重复生产一条消息。如果启用该功能,需要设置 acks=all 且 retries 大于 0

# batch.size=16384              # 限制每个 batch 的大小,默认为 16K
# buffer.memory=33554432        # 限制生产者用于发送消息的缓冲区大小,默认为 32M
# compression.type=none         # 发送消息时,batch 采用的压缩格式,默认不压缩
max.request.size=10485760       # 限制生产者向 broker 发送的每个请求的大小,这限制了每个请求包含的 batch (压缩之后)数量。默认为 1M ,这里设置为 10M
# max.block.ms=60000            # 生产者调用 send() 等方法时,如果 buffer.memory 不足或 metadata 获取不到,阻塞等待的超时时间

# linger.ms=0                   # 生产者创建每个 batch 时,等待多久才发送。调大该值,有助于让每个 batch 包含更多消息。特别是当新增消息的速度,比发送消息的速度更快时
# request.timeout.ms=30000      # 发送请求给 broker 时,等待响应的超时时间
# delivery.timeout.ms=120000    # 生产者调用 send() 方法发送消息的超时时间,该值应该不低于 linger.ms + request.timeout.ms
  • producer 向同一个 partition 生产多个消息时,会打包为一批(batch)再发送。

    • 如果出现的第一个消息就超过 batch.size 限制,则依然会打包成一个 batch ,但只包含该消息。
    • Kafka 生产消息、消费消息、同步 replica 时,都会将消息打包成 batch 再传输,从而提高传输效率。
      • 每个 batch 可能包含 1~N 个消息,每次网络请求可能传输 0~N 个 batch 。
  • batch.size 不能大于 producer 的 max.request.size ,否则过大的 batch 不能装载到请求中。

    • 不能大于 broker 的 message.max.bytes ,否则请求不能被接收。
    • 不能大于 consumer 的 fetch.max.bytes ,否则不能被消费。
  • Kafka 支持将生产的消息进行压缩。

    • 一般流程:
      1. producer 将消息 batch 以某种压缩格式发送。
        • 增加 batch.size 有利于增加重复的消息内容,提高压缩率。
      2. broker 接收消息,默认以同种压缩格式存储。
      3. consumer 拉取消息,自动识别其压缩格式,解压。
    • 优点:
      • 减少占用的磁盘空间。
      • 减少网络 IO 量,提高 Kafka 吞吐量。
      • 如果 CPU 足够、网速不足,则采用压缩会减少生产、消费的耗时。
    • 缺点:
      • 占用更多 CPU 。
    • 支持多种压缩格式:
      none      # 不压缩
      gzip      # 压缩率大概为 20% ,CPU 负载较高
      lz4       # 压缩率较低,CPU 负载较低,推荐使用
      snappy    # 性能比 lz4 较差
      zstd      # 压缩率比 gzip 更低
      

# consumer.properties

配置示例:

bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092

# group.id=null                     # 消费者组的名称
# group.instance.id=null            # 给该参数赋值为非空字符串时, consumer 会声明为 Static Member 类型,并采用该参数的值作为 client.id
# allow.auto.create.topics=false    # 订阅或主动分配 topic 时,如果该 topic 不存在,是否自动创建
# auto.offset.reset=latest          # 如果 Coordinator 没有记录 Consumer Committed Offset (可能是未创建、已过期删除),则从哪个 offset 处开始消费
    # 可选的取值:
    # earliest : 采用 partition 可见范围内最老的 offset
    # latest   : 采用 partition 最新的 offset ,即 High Watemark
    # none     :让 consumer 抛出异常
# enable.auto.commit=true           # 是否自动在后台提交 Consumer Committed Offset 。可以关闭该功能,由用户主动提交,更可靠
# auto.commit.interval.ms=5000      # 自动提交的间隔时长

# max.poll.records=500              # consumer 每次调用 poll() 方法,最多拉取多少个消息。这会影响、但不决定 fetch 请求的数量
# max.poll.interval.ms=300000       # consumer 每次调用 poll() 方法,最多间隔多久就应该再次调用 poll() 。如果超时,则 broker 认为 consumer 下线,触发 Rebalance
# max.partition.fetch.bytes=1048576 # 每次发出 fetch 请求时,从每个 partition 最多获取多少数据。默认为 1M 。如果获取的第一个消息就超过该限制,则只返回该消息
# fetch.min.bytes=1                 # 每次发出 fetch 请求时,broker 应该至少累积多少数据才返回响应
# fetch.max.wait.ms=500             # 每次发出 fetch 请求时,broker 如果没有满足 fetch.min.bytes 的数据,则最多等待指定时长就返回响应
# fetch.max.bytes=57671680          # 每次发出 fetch 请求时,预期 broker 最多返回的数据量。默认为 55M
  • 生产者、消费者都有 client.id ,允许重复。消费者还有 group.id 。
  • 用户调用 consumer 的 poll() 方法,即可消费消息。
    • consumer 会在后台向 broker 发送 fetch 请求,拉取数据,暂存到内存中的队列中,直到用户消费之后才删除。
    • consumer 占用的最大内存大概为 max.partition.fetch.bytes * partition_count ,或则 fetch.min.bytes * broker_count 。

# SASL

  • Kafka broker 支持通过 JAAS 框架启用 SASL 认证。

    • 默认不要求身份认证,可以被其它 broker、client 直接连接,因此不安全。
    • 可启用以下 SASL 认证机制:
      • PLAIN
      • GSSAPI (Kerberos)
      • OAUTHBEARER
      • SCRAM-SHA-256
  • 对于通过身份认证的用户,Kafka 支持配置 ACL 规则,控制用户的访问权限。

    • 默认的 ACL 规则为空,因此不允许用户访问任何资源,除非是 super 用户。
  • Kafka 的通信数据默认为 PLAINTEXT 形式,即未加密。

    • 可以启用 SSL 加密通信,但会增加通信延迟。
    • 是否启用 SSL ,与是否启用 SASL 认证无关。
  • 例:启用 PLAIN 认证

    1. 修改 server.properties ,将通信协议从默认的 PLAINTEXT:// 改为 SASL_PLAINTEXT:// ,即采用 SASL 认证 + 未加密通信。

      listeners=PLAINTEXT://0.0.0.0:9092,PUBLIC_IP://0.0.0.0:9093                 # 定义两个 listener 。当客户端通过某个 listener 连接时,就采用相应的 advertised.listeners
      advertised.listeners=PLAINTEXT://10.0.0.1:9092,PUBLIC_IP://1.1.1.1:9093
      listener.security.protocol.map=PLAINTEXT:PLAINTEXT,PUBLIC_IP:SASL_PLAINTEXT # advertised.listeners 的解析规则
      inter.broker.listener.name=PLAINTEXT                                        # broker 之间通信时采用的 listener 。默认为 PLAINTEXT ,即不启用 SASL 认证 + 未加密传输
      sasl.mechanism.inter.broker.protocol=PLAIN                      # broker 之间连接时的 SASL 认证机制,默认为 GSSAPI
      sasl.enabled.mechanisms=PLAIN                                   # broker 启用的 SASL 认证机制列表
      authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer   # 开启 ACL
      super.users=User:broker;User:client                             # 将一些用户声明为超级用户
      
    2. 创建一个 jaas.conf 配置文件:

      KafkaServer {
          org.apache.kafka.common.security.plain.PlainLoginModule required
          # 指定当前 broker 连接其它 broker 时使用的用户
          username="broker"
          password="******"
          # 按 user_<NAME>=<PASSWORD> 的格式定义用户。在当前 broker 被其它 broker 或 client 连接时,允许对方使用这些用户
          user_broker="******"
          user_client="******";   # 注意这是一条语句,末尾要加分号
      };
      
    3. 将 jaas.conf 拷贝到每个 broker 的配置目录下,并添加 java 启动参数来启用它:

      export KAFKA_OPTS='-Djava.security.auth.login.config=/kafka/config/jaas.conf'
      

      执行 kafka-server-start.sh、kafka-console-producer.sh 等脚本时会自动应用该配置。

    4. 客户端连接 broker 时,需要在 producer.properties、consumer.properties 中加入配置:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
          username="client" \
          password="******";
      

      其中的账号密码也可以配置在 jaas.conf 中:

      KafkaClient {
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="client"
          password="******";
      };
      
  • 上述为 broker 被其它 broker、client 连接时的身份认证。而 broker 连接到 zk 时,也可启用 SASL 认证,配置方法见 zk 文档。

    • 此时建议在 server.properties 中加入:
      zookeeper.set.acl=true  # 将 Kafka 在 zk 中存储的数据设置 ACL 规则:允许被所有用户读取,但只允许 Kafka 编辑