# 工具
# shell 脚本
kafka 的 bin 目录下自带了多个 shell 脚本,可用于管理 Kafka 。
kafka-server-stop.sh
用于停止 broker 进程。- 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
- broker 进程收到终止信号后,会将所有数据保存到磁盘中,才退出,该过程需要几秒甚至几十秒。
- 如果强制杀死 broker 进程,可能导致数据丢失。重启时会发出警告:
WARN Found a corrupted index file, xxxx/0000000000000000xxxx.index, deleting and rebuilding index...
- 此时,broker 需要重建各分区的 index 文件,耗时几十分钟,然后才能加入 Kafka 集群。
- 如果正常终止,则 broker 重启耗时只有几秒。
- 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
kafka-topics.sh
用于管理 topic 。例:# 列出所有 topic 名称 bin/kafka-topics.sh --zookeeper localhost:2181 --list # 这里连接到 zookeeper 来获取数据 bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 这里连接到 kafka 来获取数据 # 创建一个 topic ,并指定配置参数 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_1 --partitions 1 --replication-factor 1 # 删除一个 topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic_1 # 查看一个 topic 的状态、配置参数 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic_1
kafka-configs.sh
用于管理配置参数。例:# 查看 broker 的配置参数(只显示与配置文件不同的部分,即在运行时被修改的配置参数) bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0 # 查看 broker 的全部配置参数 bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --all --entity-type brokers --entity-name 0 # 给一个 topic 添加配置参数 bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic_1 --add-config retention.ms=xx,retention.bytes=xx # 给一个 topic 删除配置参数 bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic_1 --delete-config retention.ms,retention.bytes
运行生产者终端,从 stdin 读取消息并发送到 broker :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_1 --producer.config config/producer.properties # 指定生产者的配置文件,否则采用默认配置
运行消费者终端,读取消息并输出到 stdout :
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --group group_1 # 指定 consumer group 的 ID ,不指定则随机生成 --from-beginning # 从第一条消息开始消费 --consumer.config config/consumer.properties
运行生产者的性能测试:
bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --topic topic_1 --num-records 10000 # 发送多少条消息 --record-size 1024 # 每条消息的大小 --throughput 10000 # 限制每秒种发送的消息数
kafka-consumer-groups.sh
用于管理消费组。用法:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 列出所有消费组的名称 --group <name> # 指定消费组。可多次使用该参数,也可用 --all-groups 指定所有 --delete # 删除消费组,这会删除其所有 offset --describe --members # 查看所有成员 --offsets # 查看对各个 topic partition 的 Committed Offset --topic <name> # 指定一个 topic 。可多次使用该参数,也可用 --all-topics 指定所有 --delete-offsets # 删除 offset --execute --reset-offsets # 调整对某个 topic 的 offset --to-offset <n> --to-earliest --to-latest
Kafka 会自动决定将每个 partition 分配到哪个 broker 上存储。用户也可以主动创建 reassign 迁移任务,将某些 topic 的某些 partition 重新分配到指定 broker 。
- 创建 reassign 任务时,
- 需要先制定迁移计划,保存为 reassignment.json 文件,然后根据它创建迁移任务。
- 每个 reassign 任务可以迁移多个 topic 的多个 partition 。但建议同时只迁移一个 topic ,否则同时迁移大量数据可能占满磁盘读写速度。
- 例如给 Kafka 集群新增 broker 之后,可能被自动用于存储新创建的 topic ,但不会影响旧的 topic ,因此需要手动 reassign 旧的 topic 。
- 执行 reassign 任务时,
- 同时只能执行一个 reassign 任务。
- broker 会自动将 partition 的数据文件,从原来的 broker 拷贝到新 broker ,然后从原来的 broker 删除。
- 执行过程中,该 partition 不支持客户端生产、消费。
- 使用
kafka-reassign-partitions.sh
脚本的示例:bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list &> topics for topic in `cat topics` do # 指定需要迁移的 topic echo ' { "version": 1, "topics": [ { "topic": "<topic>"} ] } ' | sed "s#<topic>#$topic#g" > topics.json echo '----------' echo 'handling topic:' $topic # 生成迁移计划,将 topics 迁移到目标 broker-list bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --broker-list '1,2,3' --topics-to-move-json-file topics.json --generate | tail -n 1 > reassignment.json # 开始迁移 bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file reassignment.json --execute && break while true do # 等待迁移完成 bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file reassignment.json --verify | grep 'still in progress' || break echo 'Waiting for the current task to complete...' sleep 1 done done # 显示正在执行的迁移任务 bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --list
- 创建 reassign 任务时,
# Kafka Manager
:一个 Web 服务器,用于管理 Kafka 。
- GitHub (opens new window)
- 主要用于监控、管理 topic、partition ,不支持查看 Kafka 消息。
- 由 Yahoo 公司开源,采用 Java 语言开发。
- 2020 年,发布 v3 版本。为了避免与 Kafka 版权冲突而改名为 Cluster Manager for Apache Kafka ,简称为 CMAK 。
# 部署
- 用 docker-compose 部署:
version: '3' services: kafka-manager: container_name: kafka-manager image: kafkamanager/kafka-manager:3.0.0.4 restart: unless-stopped ports: - 9000:9000 environment: ZK_HOSTS: 10.0.0.1:2181 # JAVA_OPTS: -Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/jaas.conf KAFKA_MANAGER_AUTH_ENABLED: 'true' # 是否启用 Basic Auth ,默认为 false KAFKA_MANAGER_USERNAME: admin KAFKA_MANAGER_PASSWORD: ******
- Kafka Manager 需要连接到 Kafka broker 以进行监控,还需要一个 zk 集群来存储自己的数据。
# 用法
访问 Kafka Manager 的 Web 页面,即可创建一个 Cluster ,表示 Kafka 集群。
- 需要指定该 Kafka 集群对应的 zk 集群的地址。
- 可以勾选 "JMX Polling" ,连接到 Kafka 的 JMX 端口,监控消息的传输速度。
- 可以勾选 "Poll consumer information",监控消费者的 offset 。
支持查看、创建、配置 topic、partition 。
- topic 的统计信息示例:
Replication 3 # 每个分区的的副本数 Number of Partitions 3 # 该 topic 的分区数 Sum of partition offsets 0 Total number of Brokers 3 # Kafka 集群存在的 Broker 数 Number of Brokers for Topic 2 # 该 topic 存储时占用的 Broker 数 Preferred Replicas % 100 # Leader replica 为 Preferred replica 的分区,所占百分比 Brokers Skewed % 0 # 存储的副本过多的 Broker 所占百分比 Brokers Leader Skewed % 0 # 存储的 Leader replica 过多的 Broker 所占百分比 Brokers Spread % 66 # 该 topic 占用的 Kafka 集群的 Broker 百分比,这里等于 2/3 * 100% Under-replicated % 0 # 存在未同步副本的那些分区,所占百分比
- 上例中的 topic 总共有 3×3 个副本,占用 2 个 Broker 。为了负载均衡,每个 Broker 应该存储 3×3÷2 个副本,取整后可以为 4 或 5 。如果某个 Broker 实际存储的副本数超过该值,则视作 Skewed 。
- Broker 的统计信息示例:每行表示一个 Broker 的信息,各列分别表示该 Broker 的:
Broker # of Partitions # as Leader Partitions Skewed? Leader Skewed? 0 3 1 (0,1,2) false false 1 3 2 (0,1,2) false true 2 3 0 (0,1,2) false false
- ID
- 存储的分区副本数
- 存储的 Leader replica 数
- 存储的各个分区 ID
- 存储的副本是否过多
- 存储的 Leader replica 是否过多
- topic 的统计信息示例:
支持 preferred-replica-election 。
支持 Reassign Partitions 。用法如下:
- 在 topic 详情页面,点击
Generate Partition Assignments
,设置允许该 topic 分配到哪些 broker 上的策略。 - 点击
Run Partition Assignments
,执行自动分配的策略。
- 如果不满足策略,则自动迁移 replica 到指定的 broker 上,并重新选举 leader 。
- 迁移 replica 时会导致客户端短暂无法访问。
- 同时迁移多个 replica 时,可能负载过大,导致 Kafka broker 卡死。
- 如果已满足策略,则不会进行迁移。因此该操作具有幂等性。
- 也可以点击
Manual Partition Assignments
,进行手动分配。
- 到菜单栏的
Reassign Partitions
页面,查看正在执行的分配操作。
- 在 topic 详情页面,点击
# Kafka Eagle
:一个 Web 服务器,用于管理 Kafka 。改名为了 EFAK(Eagle For Apache Kafka)。
- GitHub (opens new window)
- 与 Kafka Manager 相比,多了查看消息内容、生产消息、配置告警的功能。
- 采用 Java 语言开发,占用大概 1G 内存。
# Redpanda
:一个 Web 服务器,用于管理 Kafka 。原名为 Kowl 。
- GitHub (opens new window)
- 与 Kafka Manager 相比,页面更美观,多了查看 topic 体积、查看消息内容、生产消息、修改 offset 的功能。但免费版不支持 Reassign Partitions 。
- 采用 Golang 语言开发,只占用几十 MB 内存。
- 用 docker-compose 部署:
version: "3" services: redpanda: container_name: redpanda image: vectorized/console:v2.4.5 restart: unless-stopped # command: # - -config.filepath=redpanda.yaml environment: KAFKA_BROKERS: 10.0.0.1:9092 ports: - 8080:8080 # volumes: # - ./redpanda.yaml:/app/redpanda.yaml
# Offset Explorer
:旧名为 Kafka Tool ,是一个 GUI 工具,可用作 Kafka 客户端。
- 官网 (opens new window)
- 支持查看、管理 topic ,支持查看消息、生产消息,缺少关于监控的功能。
# import kafka-python
:Python 的第三方库,提供了 Kafka 客户端的功能。
安装:
pip install kafka-python
例:生产消息
from kafka import KafkaProducer # 创建一个 Producer ,连接到 broker producer = KafkaProducer(bootstrap_servers='localhost:9092') # 生产一条消息到指定的 topic msg = 'Hello' future = producer.send(topic='topic_1', value=msg.encode(), # 消息的内容,必须为 bytes 类型 # partition=None, # 指定分区。默认为 None ,会自动分配一个分区 # key=None, # 指定 key ) try: metadata = future.get(timeout=5.0) # 等待服务器的回复 except Exception as e: print(str(e)) print('sent: ', msg) print('offset: ', metadata.offset) print('topic: ', metadata.topic) print('partition: ', metadata.partition)
例:消费消息
from kafka import KafkaConsumer # 创建一个 Consumer ,连接到 broker ,订阅指定的 topic consumer = KafkaConsumer('topic_1', bootstrap_servers='localhost:9092') # Consumer 对象支持迭代。默认从最新 offset 处开始迭代,没有收到消息时会一直阻塞等待,除非设置了 consumer_timeout_ms for msg in consumer: print(f'{msg.offset=} {msg.value.decode()=}')
from kafka import TopicPartition consumer.subscribe('topic_1') consumer.seek(TopicPartition(topic='topic_1', partition=1), 0) # 调整消费的 offset consumer.seek_to_beginning(TopicPartition(topic='topic_1', partition=0)) msg_set = consumer.poll(max_records=10)
KafkaConsumer 的定义:
class KafkaConsumer(six.Iterator): def __init__(self, *topics, **configs) """ 先输入要消费的 topics 。可用的 configs 参数如下: # 关于 broker bootstrap_servers='localhost', # 可以指定一个列表,例如 ['10.0.0.1:9092', '10.0.0.2:9092'] # 关于 SASL 认证 sasl_mechanism='PLAIN', security_protocol='SASL_PLAINTEXT', sasl_plain_username='admin', sasl_plain_password='******', # 关于客户端 client_id='client_1', # 客户端的名称,默认为 kafka-python-$version group_id='group_1', # 消费者组的名称,默认为 None ,即不加入消费者组 consumer_timeout_ms=5000, # 迭代消息时,持续一定时长未收到新消息则结束迭代。默认无限制 """ def assign(self, partitions) """ 主动分配当前 consumer 消费的 TopicPartition 。 此时不会被 Coordinator 处理,不会触发 rebalance 。但不能与 subscribe() 同时使用,否则报错。 例: consumer.assign([TopicPartition(topic='topic_1', partition=0), TopicPartition(topic='topic_1', partition=1)]) """ def assignment(self) """ 返回一个集合,包含当前 consumer 被分配的所有 TopicPartition """ def close(self, autocommit=True) """ 关闭 consumer """ def commit(self, offsets=None) """ 提交 offsets 。这会阻塞 consumer 直到成功提交 """ def committed(self, partition, metadata=False) """ 返回指定 TopicPartition 的 Committed Offset """ def commit_async(self, offsets=None, callback=None) """ 异步地提交 offsets """ def poll(self, timeout_ms=0, max_records=None, update_offsets=True) """ 从当前 consumer 被分配的 partition 拉取消息,组成一个集合并返回。 update_offsets :是否自动递增 offset 以便下一次拉取。 """ def seek(self, partition, offset) """ 调整当前消费的 offset ,用于下一次 poll """ def seek_to_beginning(self, *partitions) """ 将 offset 调整到可见的最老 offset """ def seek_to_end(self, *partitions) """ 将 offset 调整到可见的最新 offset """ def subscribe(self, topics=(), pattern=None, listener=None) """ 订阅一组 topics ,或者订阅与正则表达式匹配的 topics ,这会自动分配 TopicPartition 。 不支持增加订阅,新的订阅列表会覆盖旧的订阅列表。 可以订阅不存在的 topic ,此时不会被分配 partition ,调用 poll() 的结果为空集合 。 """ def subscription(self) """ 返回一个集合,包含当前 consumer 订阅的所有 topic 的名称 """ def unsubscribe(self) """ 取消订阅的所有 topics """ ...
# import confluent-kafka
:Python 的第三方库,提供了 Kafka 客户端的功能,比 kafka-python 的功能更多。
- GitHub (opens new window)
- 安装:
pip install confluent-kafka
- 例:生产消息
from confluent_kafka import Consumer, KafkaException def produce_messages(producer, msg_list): """ 生产消息,输入示例: msg_list = [ {'topic': 'test', 'value': 'hello'.encode('utf-8'), 'key': None, 'headers': None, } ] """ success_list = [] failed_list = [] def delivery_report(err, msg): """ 每次调用 poll() 或 flush() 生产消息时,会自动调用一次该函数,检查交付结果。 """ if err: failed_list.append(msg) print('[ERROR] Message delivery failed: {}'.format(err)) else: success_list.append(msg) for msg in msg_list: # 检查之前调用 produce() 生产消息的交付结果 producer.poll(0) # 生产消息。produce() 函数会异步执行,需要之后调用 poll() 或 flush() 来检查交付结果 producer.produce(topic=msg['topic'], value=msg['value'], key=msg['key'], headers=msg['headers'], on_delivery=delivery_report) # 等待剩下的消息发送完毕 producer.flush() print(f'已生产消息,成功数 {len(success_list)} ,失败数 {len(failed_list)}')
- 例:消费消息
from confluent_kafka import Consumer, KafkaException # 创建消费者,传入配置参数,兼容 Kafka 原生的 properties consumer = Consumer({ 'bootstrap.servers': '10.0.0.1,10.0.0.2,10.0.0.3', 'group.id': 'test_group_1', 'group.instance.id': 'static_member_1', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['topic_1']) def consume_messages(consumer, max_count=1): msg_list = [] for i in range(max_count): # 消费一条消息。如果超过 timeout 依然未获取到消息,则返回 None 。如果 timeout 只有 1s ,则遇到 rebalance 就可能超时 msg = consumer.poll(timeout=5.0) if msg == None: print(f'未拉取到新消息') return if msg.error(): raise KafkaException(msg.error()) msg_list.append(msg) print(f'成功消费 1 条消息 {msg.offset()=} {msg.value().decode()=}') return msg_list consume_messages(consumer, max_count=10) consumer.close()