# 工具
# shell 脚本
kafka 的 bin 目录下自带了多个 shell 脚本,可用于管理 Kafka 。
kafka-server-stop.sh
用于停止 broker 进程。- 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
- broker 进程收到终止信号后,会将所有数据保存到磁盘中,才退出,该过程需要几秒甚至几十秒。
- 如果强制杀死 broker 进程,可能导致数据丢失。重启时会发出警告:
WARN Found a corrupted index file, xxxx/0000000000000000xxxx.index, deleting and rebuilding index... (kafka.log.Log)
- 此时,broker 会花费几十分钟来重建各分区的 index 文件,然后才能加入 Kafka 集群,开始工作。
- 如果正常终止,则 broker 重启耗时只有几秒。
- 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
kafka-topics.sh
用于管理 topic 。- 例:连接到 zk ,查询 topic 列表。
bin/kafka-topics.sh --zookeeper localhost:2181 --list
- 例:连接到 zk ,请求创建 topic ,并指定分区数、每个分区的副本数。
bin/kafka-topics.sh \ --zookeeper localhost:2181 \ --create \ --topic topic_1 \ --partitions 1 \ --replication-factor 1
- 例:连接到 zk ,请求删除 topic 。
bin/kafka-topics.sh \ --zookeeper localhost:2181 \ --delete \ --topic topic_1
- 这里将 --delete 选项改为 --describe ,就是查询 topic 的状态。
- 例:连接到 zk ,查询 topic 列表。
运行生产者终端,从 stdin 读取消息并发送到 broker :
bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_1 # --producer.config config/producer.properties
运行消费者终端,读取消息并输出到 stdout :
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic topic_1 # --group group_1 # 指定 consumer group 的 ID ,不指定则随机生成 # --from-beginning # 从第一条消息开始消费 # --consumer.config config/consumer.properties
运行生产者的性能测试:
bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --topic topic_1 --num-records 10000 # 发送多少条消息 --record-size 1024 # 每条消息的大小 --throughput 10000 # 限制每秒种发送的消息数
kafka-consumer-groups.sh
用于管理消费组。用法:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 列出所有消费组的名称 --group <name> # 指定消费组。可多次使用该参数,也可用 --all-groups 指定所有 --delete # 删除消费组,这会删除其所有 offset --describe --members # 查看所有成员 --offsets # 查看对各个 topic partition 的 Committed Offset --topic <name> # 指定一个 topic 。可多次使用该参数,也可用 --all-topics 指定所有 --delete-offsets # 删除 offset --execute --reset-offsets # 调整对某个 topic 的 offset --to-offset <n> --to-earliest --to-latest
- 当一个消费组的所有成员都停止运行时,才支持修改 offset 。
- 如果一个消费组停止运行,则超过 log.retention.hours 时间之后 Kafka 会从 __consumer_offsets 中删除其 offset 记录。
- 如果一个消费组取消订阅某个 topic ,则 Kafka 依然会在 __consumer_offsets 中记录旧订阅的 offset 。除非主动删除 group 或 offset 。
给 Kafka 集群新增 broker 之后,可能被自动用于存储新创建的 topic ,但不会影响已有的 topic 。可以采取以下两种措施:
- 使用
kafka-reassign-partitions.sh
脚本,将指定 topic 的所有分区迁移到指定 broker 上。 - 使用 Kafka Manager ,在网页上迁移 topic ,更方便。
- 使用
# Kafka Manager
:一个 Web 服务器,用于管理 Kafka 。
- GitHub (opens new window)
- 主要用于监控、管理 topic、partition ,不支持查看 Kafka 消息。
- 由 Yahoo 公司开源,采用 Java 开发。
- 2020 年,发布 v3 版本。为了避免与 Kafka 版权冲突而改名为 Cluster Manager for Apache Kafka ,简称为 CMAK 。
# 部署
- 用 docker-compose 部署:
version: '3' services: kafka-manager: container_name: kafka-manager image: kafkamanager/kafka-manager:3.0.0.4 restart: unless-stopped ports: - 9000:9000 environment: ZK_HOSTS: 10.0.0.1:2181 # JAVA_OPTS: -Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/jaas.conf KAFKA_MANAGER_AUTH_ENABLED: 'true' # 是否启用 Basic Auth ,默认为 false KAFKA_MANAGER_USERNAME: admin KAFKA_MANAGER_PASSWORD: ******
- Kafka Manager 需要连接到 Kafka broker 以进行监控,还需要一个 zk 集群来存储自己的数据。
# 用法
访问 Kafka Manager 的 Web 页面,即可创建一个 Cluster ,表示 Kafka 集群。
- 需要指定该 Kafka 集群对应的 zk 集群的地址。
- 可以勾选 "JMX Polling" ,连接到 Kafka 的 JMX 端口,监控消息的传输速度。
- 可以勾选 "Poll consumer information",监控消费者的 offset 。
支持查看、创建、配置 topic、partition 。
- topic 的统计信息示例:
Replication 3 # 每个分区的的副本数 Number of Partitions 3 # 该 topic 的分区数 Sum of partition offsets 0 Total number of Brokers 3 # Kafka 集群存在的 Broker 数 Number of Brokers for Topic 2 # 该 topic 存储时占用的 Broker 数 Preferred Replicas % 100 # Leader replica 为 Preferred replica 的分区,所占百分比 Brokers Skewed % 0 # 存储的副本过多的 Broker 所占百分比 Brokers Leader Skewed % 0 # 存储的 Leader replica 过多的 Broker 所占百分比 Brokers Spread % 66 # 该 topic 占用的 Kafka 集群的 Broker 百分比,这里等于 2/3 * 100% Under-replicated % 0 # 存在未同步副本的那些分区,所占百分比
- 上例中的 topic 总共有 3×3 个副本,占用 2 个 Broker 。为了负载均衡,每个 Broker 应该存储 3×3÷2 个副本,取整后可以为 4 或 5 。如果某个 Broker 实际存储的副本数超过该值,则视作 Skewed 。
- Broker 的统计信息示例:每行表示一个 Broker 的信息,各列分别表示该 Broker 的:
Broker # of Partitions # as Leader Partitions Skewed? Leader Skewed? 0 3 1 (0,1,2) false false 1 3 2 (0,1,2) false true 2 3 0 (0,1,2) false false
- ID
- 存储的分区副本数
- 存储的 Leader replica 数
- 存储的各个分区 ID
- 存储的副本是否过多
- 存储的 Leader replica 是否过多
- topic 的统计信息示例:
支持 preferred-replica-election 。
支持 Reassign Partitions 。用法如下:
- 在 topic 详情页面,点击
Generate Partition Assignments
,设置允许该 topic 分配到哪些 broker 上的策略。 - 点击
Run Partition Assignments
,执行自动分配的策略。
- 如果不满足策略,则自动迁移 replica 到指定的 broker 上,并重新选举 leader 。
- 迁移 replica 时会导致客户端短暂无法访问。
- 同时迁移多个 replica 时,可能负载过大,导致 Kafka broker 卡死。
- 如果已满足策略,则不会进行迁移。因此该操作具有幂等性。
- 也可以点击
Manual Partition Assignments
,进行手动分配。
- 到菜单栏的
Reassign Partitions
页面,查看正在执行的分配操作。
- 在 topic 详情页面,点击
# Kafka Eagle
:一个 Web 服务器,用于管理 Kafka 。改名为了 EFAK(Eagle For Apache Kafka)。
- GitHub (opens new window)
- 与 Kafka Manager 相比,多了查询消息、生产消息、配置告警的功能,但缺少关于 Preferred Replica 的功能。
- 采用 Java 开发,占用大概 1G 内存。
# Kowl
:一个 Web 服务器,用于管理 Kafka 。改名为了 Redpanda Console 。
- GitHub (opens new window)
- 与 Kafka Manager 相比,页面更美观,多了查看 topic 体积、查看消息内容、查看 consumer 详情的功能,但不支持搜索 topic 。
- 采用 Golang 开发,只占用几十 MB 内存。
- 用 docker-compose 部署:
version: "3" services: kowl: container_name: kowl image: docker.redpanda.com/vectorized/console:v2.0.3 restart: unless-stopped # command: # - -config.filepath=kowl.yaml environment: KAFKA_BROKERS: 10.0.0.1:9092 ports: - 8080:8080 # volumes: # - ./kowl.yaml:/app/kowl.yaml
# Offset Explorer
:旧名为 Kafka Tool ,是一个 GUI 工具,可用作 Kafka 客户端。
- 官网 (opens new window)
- 支持查看、管理 topic ,支持查看消息、生产消息,缺少关于监控的功能。
# ♢ kafka-python
:Python 的第三方库,提供了 Kafka 客户端的功能。
安装:
pip install kafka-python
例:生产消息
from kafka import KafkaProducer # 创建一个 Producer ,连接到 broker producer = KafkaProducer(bootstrap_servers='localhost:9092') # 生产一个消息到指定的 topic msg = 'Hello' future = producer.send(topic='topic_1', value=msg.encode(), # 消息的内容,必须为 bytes 类型 # partition=None, # 指定分区。默认为 None ,会自动分配一个分区 # key=None, # 指定 key ,bytes 类型。用于根据 key 的哈希值分配一个分区,比如相同 key 的消息总是分配到相同分区 ) try: metadata = future.get(timeout=1) # 等待服务器的回复 except Exception as e: print(str(e)) print('sent: ', msg) print('offset: ', metadata.offset) print('topic: ', metadata.topic) print('partition: ', metadata.partition)
例:消费消息
from kafka import KafkaConsumer # 创建一个 Consumer ,连接到 broker ,订阅指定的 topic consumer = KafkaConsumer('topic_1', bootstrap_servers='localhost:9092') # Consumer 对象支持迭代。默认从最新 offset 处开始迭代,没有收到消息时会一直阻塞等待,除非设置了 consumer_timeout_ms for msg in consumer: print('got: ', msg.value.decode()) print('offset: ', msg.offset)
from kafka import TopicPartition consumer.subscribe('topic_1') consumer.seek(TopicPartition(topic='topic_1', partition=1), 0) # 调整消费的 offset consumer.seek_to_beginning(TopicPartition(topic='topic_1', partition=0)) msg_set = consumer.poll(max_records=10)
KafkaConsumer 的定义:
class KafkaConsumer(six.Iterator): def __init__(self, *topics, **configs) """ 先输入要消费的 topics 。可用的 configs 参数如下: # 关于 broker bootstrap_servers='localhost', # 可以指定一个列表,例如 ['10.0.0.1:9092', '10.0.0.2:9092'] # 关于 SASL 认证 sasl_mechanism='PLAIN', security_protocol='SASL_PLAINTEXT', sasl_plain_username='admin', sasl_plain_password='******', # 关于客户端 client_id='client_1', # 客户端的名称,默认为 kafka-python-$version group_id='group_1', # 消费者组的名称,默认为 None ,即不加入消费者组 consumer_timeout_ms=5000, # 迭代消息时,持续一定时长未收到新消息则结束迭代。默认无限制 """ def assign(self, partitions) """ 主动分配当前 consumer 消费的 TopicPartition 。 此时不会被 Coordinator 处理,不会触发 rebalance 。但不能与 subscribe() 同时使用,否则报错。 例: consumer.assign([TopicPartition(topic='topic_1', partition=0), TopicPartition(topic='topic_1', partition=1)]) """ def assignment(self) """ 返回一个集合,包含当前 consumer 被分配的所有 TopicPartition """ def close(self, autocommit=True) """ 关闭 consumer """ def commit(self, offsets=None) """ 提交 offsets 。这会阻塞 consumer 直到成功提交 """ def committed(self, partition, metadata=False) """ 返回指定 TopicPartition 的 Committed Offset """ def commit_async(self, offsets=None, callback=None) """ 异步地提交 offsets """ def poll(self, timeout_ms=0, max_records=None, update_offsets=True) """ 从当前 consumer 被分配的 partition 拉取消息,组成一个集合并返回。 update_offsets :是否自动递增 offset 以便下一次拉取。 """ def seek(self, partition, offset) """ 调整当前消费的 offset ,用于下一次 poll """ def seek_to_beginning(self, *partitions) """ 将 offset 调整到可见的最老 offset """ def seek_to_end(self, *partitions) """ 将 offset 调整到可见的最新 offset """ def subscribe(self, topics=(), pattern=None, listener=None) """ 订阅一组 topics ,或者订阅与正则表达式匹配的 topics ,这会自动分配 TopicPartition 。 不支持增加订阅,新的订阅列表会覆盖旧的订阅列表。 可以订阅不存在的 topic ,此时不会被分配 partition ,调用 poll() 的结果为空集合 。 """ def subscription(self) """ 返回一个集合,包含当前 consumer 订阅的所有 topic 的名称 """ def unsubscribe(self) """ 取消订阅的所有 topics """ ...
# ♢ confluent-kafka
:Python 的第三方库,提供了 Kafka 客户端的功能,比 kafka-python 的功能更多。
- GitHub (opens new window)
- 安装:
pip install confluent-kafka
- 例:消费消息
from confluent_kafka import Consumer # 创建消费者,传入配置参数,兼容 Kafka 原生的 properties consumer = Consumer({ 'bootstrap.servers': '10.0.0.1,10.0.0.2,10.0.0.3', 'group.id': 'test_group_1', 'group.instance.id': 'static_member_1', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['topic_1']) msg = consumer.poll(timeout=1) # 消费一条消息。如果超过 timeout 依然未获取到消息,则返回 None consumer.close()