# 部署
# 部署
下载二进制版:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
解压后运行:
bin/zookeeper-server-start.sh config/zookeeper.properties # 启动 zookeeper 服务器 bin/kafka-server-start.sh config/server.properties # 启动 kafka broker 服务器
- 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
- Kafka 发行版包含了 zk 的可执行文件,可以同时启动 kafka、zk 服务器,也可以在其它地方启动 zk 服务器。
- Kafka 会尽快将数据写入磁盘存储,因此占用的内存一般不超过 6G , CPU 平均负载为 2~4 核。
- kafka-server-start.sh 中默认配置了以下环境变量,限制 Kafka 占用的内存:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- 主机应该留出一些空闲内存,相当于 JVM 内存的 0.5~1 倍,用作 Page Cache ,提高 Kafka 读写磁盘的性能。
- kafka-server-start.sh 中默认配置了以下环境变量,限制 Kafka 占用的内存:
- 声明以下环境变量,即可让 broker 开启 JMX 监控,监听一个端口:
export JMX_PORT=9093 # export JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false"
- 部署 Kafka 集群时,需要先部署 zk 集群,然后让每个 broker 服务器连接到 zk ,即可相互发现,组成集群。
或者用 docker-compose 部署:
version: '3' services: kafka: container_name: kafka image: bitnami/kafka:2.8.0 restart: unless-stopped stop_grace_period: 1m environment: # KAFKA_HEAP_OPTS: -Xmx1G -Xms1G ALLOW_PLAINTEXT_LISTENER: 'yes' ports: - 9092:9092 volumes: - ./config:/bitnami/kafka/config - ./data:/bitnami/kafka/data
- Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
- 该镜像会根据环境变量配置 server.properties 文件,这里直接挂载配置目录,通过 CUSTOM_INIT_SCRIPT 执行命令还原配置文件。
- Kafka 官方没有提供 Docker 镜像,这里采用社区提供的一个镜像。
# 版本
- Kafka 的版本列表 (opens new window)
- 例如 kafka_2.13-2.6.0.tgz ,前面的 2.13 是指 Scala 编译器的版本,后面的 2.6.0 是指 Kafka 的版本。
- 使用 Kafka 时,应该尽量让客户端与服务器的版本一致,避免不兼容。
- v0.10.0.0
- :于 2016 年发布。新增了 Kafka Streams API ,用于流处理。
- v0.11.0.0
- :于 2017 年发布。改进了消息格式。支持事务、幂等性。
- v1.0.0
- :于 2017 年发布。
- v1.1.0
- :于 2018 年发布。
- 改进了 Controller ,提高更改 leader replica 的效率,将 broker 正常终止的耗时从几分钟减少到几秒。
- v2.0.0
- :于 2018 年发布。
- v2.8.0
- :于 2021 年发布。支持用内置的 KRaft 协议取代 zookeeper ,提高 Kafka 的稳定性、内部通信效率。
# 配置
- kafka 的配置目录示例:
config/ ├── consumer.properties # 消费者的配置文件 ├── log4j.properties # Java 日志的配置文件 ├── producer.properties # 生产者的配置文件 ├── server.properties # broker 的配置文件 └── zookeeper.properties # zk 的配置文件
- 启动 broker 时只需读取 server.properties、log4j.properties 文件。
# server.properties
配置示例:
# 关于 kafka 集群
broker.id=0 # 该 broker 在 Kafka 集群中的唯一标识符,默认为 -1 ,必须赋值为一个非负整数
listeners=PLAINTEXT://0.0.0.0:9092 # broker 监听的 Socket 地址
advertised.listeners=PLAINTEXT://10.0.0.1:9092 # 当前 broker 供其它 broker 和客户端访问的地址。它会在 zk 中公布,默认采用 listeners 的值
# 关于 zk
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181 # 要连接的 zk 节点,多个地址之间用逗号分隔
# zookeeper.connection.timeout.ms=6000
# 保存数据日志
log.dirs=/data/kafka-logs # broker 存放数据日志的目录,如果有多个目录则用逗号分隔
# log.roll.ms=null # 每个 LogSegment 的最长写入时间,超过该值则会创建一个新的 LogSegment 用于写入。默认未设置,采用 log.roll.hours 的值
# log.roll.hours=168 # 默认为 7*24h
# log.segment.bytes=1073741824 # 每个 LogSegment 的最大体积,超过该值则会创建一个新的 LogSegment 用于写入。默认为 1G
# log.flush.interval.messages=null # 每个日志分区,每接收多少个消息就 flush 一次,即将内存中的数据写入磁盘
# log.flush.interval.ms=null # 每个日志分区,每经过多少毫秒就 flush 一次
# log.flush.offset.checkpoint.interval.ms=60000 # 每隔多少毫秒,刷新一次 checkpoint
# 清理数据日志
# log.cleanup.policy=delete # LogSegment 的清理策略,可以是 delete、compact
# log.retention.check.interval.ms=300000 # 每隔多久检查一次各个 LogSegment 是否应该清理。默认为 5min
log.retention.bytes=10737418240 # 限制单个 partition 的大小,超过则删除其中最旧的 LogSegment 。默认为 -1 ,即不限制
# log.retention.ms=null # 限制单个 LogSegment 的保存时长,超过则删除。默认未设置,采用 log.retention.minutes 的值。如果设置为 -1 ,则不限制
# log.retention.minutes=null # 默认未设置,采用 log.retention.hours 的值
log.retention.hours=24 # 默认为 7*24h
# 关于 topic
# auto.create.topics.enable=true # 当客户端生产、消费一个不存在的 topic 时,是否自动创建该 topic
# delete.topic.enable=true # 是否允许删除 topic
# compression.type=producer # broker 对消息的压缩格式。取值为 producer 则采用与生产者相同的压缩格式
message.max.bytes=10485760 # 允许接收的最大 batch.size ,默认为 1M ,这里设置为 10M 。该参数作用于所有 topic ,也可以对每个 topic 分别设置 max.message.bytes
# 关于 partition
num.partitions=6 # 新建 topic 的默认 partition 数,默认为 3 。这里设置为 6 ,当 consumer group 包含 1、2、3 个成员时都可以平均分配
default.replication.factor=2 # 新建 partition 的默认副本数,默认为 1
# offsets.topic.num.partitions=50 # __consumer_offsets 主题的 partition 数
# offsets.topic.replication.factor=3 # __consumer_offsets 主题的每个 partition 的副本数。部署单节点时必须减至 1
# auto.leader.rebalance.enable=true # 是否自动进行 Partition Rebalance
# leader.imbalance.check.interval.seconds=300 # Controller 每隔多久检查一次是否执行 Partition Rebalance
# leader.imbalance.per.broker.percentage=10 # 如果该 broker 上的非 preferred leader 超过该百分比,则进行 Partition Rebalance
# 关于进程
# background.threads=10 # broker 处理后台任务的线程数
# num.network.threads=3 # broker 处理网络请求的线程数
# num.io.threads=8 # broker 处理磁盘 IO 的线程数,应该不小于磁盘数
# 关于网络
session.timeout.ms=60000 # 会话超时,默认为 10s 。如果在该时长内 broker 一直没有收到 consumer 的 heartbeat 请求,则认为 consumer 下线
# heartbeat.interval.ms=3000 # broker 等待 heartbeat 请求的超时时间。通常设置为 session.timeout.ms 的 1/3 ,以允许超时两次
# socket.send.buffer.bytes=102400 # socket 发送消息的缓冲区大小,默认为 100K
# socket.receive.buffer.bytes=102400 # socket 接收消息的缓冲区大小,默认为 100K 。网络延迟高于 1ms 时,增加 buffer 有利于提高传输效率,但会占用更多内存
# socket.request.max.bytes=104857600 # socket 允许接收的单个消息的最大大小,默认为 100M
- 假设每个 partition 的副本数为 n ,则 Kafka 集群最多允许 n-1 个 broker 故障。
- 增加 partition 数量有利于提高并发规模,但会增加 zk 的负载。
- Kafka v1.1.0 改进了 Controller ,每个 broker 建议最多存储 4k 个 partition(包括副本数),每个集群最多 20w 个。
- 如果用 KRaft 协议取代 zookeeper ,则可以支持数百万的 partition 。
- Kafka 会自动清理 topic 中的过期消息。但即使没有一条消息,也不会删除该 topic 。
# producer.properties
配置示例:
bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092 # 初始连接的 broker 地址。先通过它获取所有 broker 的 advertised.listeners 地址,再实际连接
# client.id='' # 客户端的 ID
# acks=1 # 判断消息发送成功的策略
# 取值为 0 ,则不等待 broker 的回复,直接认为消息已发送成功
# 取值为 1 ,则等待 leader replica 确认接收消息
# 取值为 all ,则等待消息被同步到所有 replica
# retries=2147483647 # 发送消息失败时,如果不超过 delivery.timeout.ms 时长,则尝试重发多少次
# batch.size=16384 # 限制每个 batch 的大小,默认为 16K
# buffer.memory=33554432 # 限制生产者用于发送消息的缓冲区大小,默认为 32M
# compression.type=none # 发送消息时,batch 采用的压缩格式,默认不压缩
max.request.size=10485760 # 限制生产者向 broker 发送的每个请求的大小,这限制了每个请求包含的 batch (压缩之后)数量。默认为 1M ,这里设置为 10M
# max.block.ms=60000 # 生产者调用 send() 等方法时,如果 buffer.memory 不足或 metadata 获取不到,阻塞等待的超时时间
# linger.ms=0 # 生产者创建每个 batch 时,等待多久才发送。调大该值,有助于让每个 batch 包含更多消息。特别是当新增消息的速度,比发送消息的速度更快时
# request.timeout.ms=30000 # 发送请求给 broker 时,等待响应的超时时间
# delivery.timeout.ms=120000 # 生产者调用 send() 方法发送消息的超时时间,该值应该不低于 linger.ms + request.timeout.ms
producer 向同一个 partition 生产多个消息时,会打包为一批(batch)再发送。
- 如果出现的第一个消息就超过 batch.size 限制,则依然会打包成一个 batch ,但只包含该消息。
- Kafka 生产消息、消费消息、同步 replica 时,都会将消息打包成 batch 再传输,从而提高传输效率。
- 每个 batch 可能包含 1~N 个消息,每次网络请求可能传输 0~N 个 batch 。
batch.size 不能大于 producer 的 max.request.size ,否则过大的 batch 不能装载到请求中。
- 不能大于 broker 的 message.max.bytes ,否则请求不能被接收。
- 不能大于 consumer 的 fetch.max.bytes ,否则不能被消费。
Kafka 支持将生产的消息进行压缩。
- 一般流程:
- producer 将消息 batch 以某种压缩格式发送。
- 增加 batch.size 有利于增加重复的消息内容,提高压缩率。
- broker 接收消息,默认以同种压缩格式存储。
- consumer 拉取消息,自动识别其压缩格式,解压。
- producer 将消息 batch 以某种压缩格式发送。
- 优点:
- 减少占用的磁盘空间。
- 减少网络 IO 量,提高 Kafka 吞吐量。
- 如果 CPU 足够、网速不足,则采用压缩会减少生产、消费的耗时。
- 缺点:
- 占用更多 CPU 。
- 支持多种压缩格式:
none # 不压缩 gzip # 压缩率大概为 20% ,CPU 负载较高 lz4 # 压缩率较低,CPU 负载较低,推荐使用 snappy # 性能比 lz4 较差 zstd # 压缩率比 gzip 更低
- 一般流程:
# consumer.properties
配置示例:
bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092
# group.id=null # 消费者组的名称
# group.instance.id=null # 给该参数赋值为非空字符串时, consumer 会声明为 Static Member 类型,并采用该参数的值作为 client.id
# allow.auto.create.topics=false # 订阅或主动分配 topic 时,如果该 topic 不存在,是否自动创建
# auto.offset.reset=latest # 如果 Coordinator 没有记录 Consumer Committed Offset (可能是未创建、已过期删除),则从哪个 offset 处开始消费
# 可选的取值:
# earliest : 采用 partition 可见范围内最老的 offset
# latest : 采用 partition 最新的 offset ,即 High Watemark
# none :让 consumer 抛出异常
# enable.auto.commit=true # 是否自动在后台提交 Consumer Committed Offset 。可以关闭该功能,由用户主动提交,更可靠
# auto.commit.interval.ms=5000 # 自动提交的间隔时长
# max.poll.records=500 # consumer 每次调用 poll() 方法,最多拉取多少个消息。这会影响,但不会决定 fetch 请求的数量
# max.partition.fetch.bytes=1048576 # 每次发出 fetch 请求时,从每个 partition 最多获取多少数据。默认为 1M 。如果获取的第一个消息就超过该限制,则只返回该消息
# fetch.min.bytes=1 # 每次发出 fetch 请求时,broker 应该至少累积多少数据才返回响应
# fetch.max.wait.ms=500 # 每次发出 fetch 请求时,broker 如果没有满足 fetch.min.bytes 的数据,则最多等待指定时长就返回响应
# fetch.max.bytes=57671680 # 每次发出 fetch 请求时,预期 broker 最多返回的数据量。默认为 55M
- 生产者、消费者都有 client.id ,允许重复。消费者还有 group.id 。
- 用户调用 consumer 的 poll() 方法,即可消费消息。
- consumer 会在后台向 broker 发送 fetch 请求,拉取数据,暂存到内存中的队列中,直到用户消费之后才删除。
- consumer 占用的最大内存大概为 max.partition.fetch.bytes * partition_count ,或则 fetch.min.bytes * broker_count 。
# SASL
Kafka broker 支持通过 JAAS 框架启用 SASL 认证。
- 默认不要求身份认证,可以被其它 broker、client 直接连接,因此不安全。
- 可启用以下 SASL 认证机制:
- PLAIN
- GSSAPI (Kerberos)
- OAUTHBEARER
- SCRAM-SHA-256
对于通过身份认证的用户,Kafka 支持配置 ACL 规则,控制用户的访问权限。
- 默认的 ACL 规则为空,因此不允许用户访问任何资源,除非是 super 用户。
Kafka 的通信数据默认为 PLAINTEXT 形式,即未加密。
- 可以启用 SSL 加密通信,但会增加通信延迟。
- 是否启用 SSL ,与是否启用 SASL 认证无关。
例:启用 PLAIN 认证
修改 server.properties ,将通信协议从默认的
PLAINTEXT://
改为SASL_PLAINTEXT://
,即采用 SASL 认证 + 未加密通信。listeners=PLAINTEXT://0.0.0.0:9092,PUBLIC_IP://0.0.0.0:9093 # 定义两个 listener 。当客户端通过某个 listener 连接时,就采用相应的 advertised.listeners advertised.listeners=PLAINTEXT://10.0.0.1:9092,PUBLIC_IP://1.1.1.1:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,PUBLIC_IP:SASL_PLAINTEXT # advertised.listeners 的解析规则 inter.broker.listener.name=PLAINTEXT # broker 之间通信时采用的 listener 。默认为 PLAINTEXT ,即不启用 SASL 认证 + 未加密传输 sasl.mechanism.inter.broker.protocol=PLAIN # broker 之间连接时的 SASL 认证机制,默认为 GSSAPI sasl.enabled.mechanisms=PLAIN # broker 启用的 SASL 认证机制列表 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 开启 ACL super.users=User:broker;User:client # 将一些用户声明为超级用户
创建一个 jaas.conf 配置文件:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required # 指定当前 broker 连接其它 broker 时使用的用户 username="broker" password="******" # 按 user_<NAME>=<PASSWORD> 的格式定义用户。在当前 broker 被其它 broker 或 client 连接时,允许对方使用这些用户 user_broker="******" user_client="******"; # 注意这是一条语句,末尾要加分号 };
将 jaas.conf 拷贝到每个 broker 的配置目录下,并添加 java 启动参数来启用它:
export KAFKA_OPTS='-Djava.security.auth.login.config=/kafka/config/jaas.conf'
执行 kafka-server-start.sh、kafka-console-producer.sh 等脚本时会自动应用该配置。
客户端连接 broker 时,需要在 producer.properties、consumer.properties 中加入配置:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="client" \ password="******";
其中的账号密码也可以配置在 jaas.conf 中:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="******"; };
上述为 broker 被其它 broker、client 连接时的身份认证。而 broker 连接到 zk 时,也可启用 SASL 认证,配置方法见 zk 文档。
- 此时建议在 server.properties 中加入:
zookeeper.set.acl=true # 将 Kafka 在 zk 中存储的数据设置 ACL 规则:允许被所有用户读取,但只允许 Kafka 编辑
- 此时建议在 server.properties 中加入: