# 工具

# shell 脚本

kafka 的 bin 目录下自带了多个 shell 脚本,可用于管理 Kafka 。

  • kafka-server-stop.sh 用于停止 broker 进程。

    • 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
      • broker 进程收到终止信号后,会将所有数据保存到磁盘中,才退出,该过程需要几秒甚至几十秒。
    • 如果强制杀死 broker 进程,可能导致数据丢失。重启时会发出警告:
      WARN  Found a corrupted index file, xxxx/0000000000000000xxxx.index, deleting and rebuilding index... (kafka.log.Log)
      
      • 此时,broker 会花费几十分钟来重建各分区的 index 文件,然后才能加入 Kafka 集群,开始工作。
      • 如果正常终止,则 broker 重启耗时只有几秒。
  • kafka-topics.sh 用于管理 topic 。

    • 例:连接到 zk ,查询 topic 列表。
      bin/kafka-topics.sh  --zookeeper localhost:2181  --list
      
    • 例:连接到 zk ,请求创建 topic ,并指定分区数、每个分区的副本数。
      bin/kafka-topics.sh \
          --zookeeper localhost:2181 \
          --create \
          --topic topic_1 \
          --partitions 1 \
          --replication-factor 1
      
    • 例:连接到 zk ,请求删除 topic 。
      bin/kafka-topics.sh \
          --zookeeper localhost:2181 \
          --delete \
          --topic topic_1
      
      • 这里将 --delete 选项改为 --describe ,就是查询 topic 的状态。
  • 运行生产者终端,从 stdin 读取消息并发送到 broker :

    bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic topic_1
        # --producer.config config/producer.properties
    
  • 运行消费者终端,读取消息并输出到 stdout :

    bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 \
        --topic topic_1
        # --group group_1     # 指定 consumer group 的 ID ,不指定则随机生成
        # --from-beginning    # 从第一条消息开始消费
        # --consumer.config config/consumer.properties
    
  • 运行生产者的性能测试:

    bin/kafka-producer-perf-test.sh
        --producer.config config/producer.properties
        --topic topic_1
        --num-records 10000   # 发送多少条消息
        --record-size 1024    # 每条消息的大小
        --throughput 10000    # 限制每秒种发送的消息数
    
  • kafka-consumer-groups.sh 用于管理消费组。用法:

    bin/kafka-consumer-groups.sh --bootstrap-server 10.1.6.151:9092
        --list            # 列出所有消费组
    
        --group <name>    # 指定消费组。可多次使用该参数,也可用 --all-groups 指定所有
          --delete        # 删除消费组,这会删除其所有 offset
          --describe
            --members     # 查看所有成员
            --offsets     # 查看对各个 topic partition 的 Committed Offset
    
          --topic <name>  # 指定一个 topic 。可多次使用该参数,也可用 --all-topics 指定所有
            --delete-offsets            # 删除 offset
            --execute --reset-offsets   # 调整对某个 topic 的 offset
              --to-offset <n>
              --to-earliest
              --to-latest
    
    • 当一个消费组的所有成员都停止运行时,才支持修改 offset 。
    • 如果一个消费组停止运行,则超过 log.retention.hours 时间之后 Kafka 会从 __consumer_offsets 中删除其 offset 记录。
    • 如果一个消费组取消订阅某个 topic ,则 Kafka 依然会在 __consumer_offsets 中记录旧订阅的 offset 。除非主动删除 group 或 offset 。
  • 给 Kafka 集群新增 broker 之后,可能被自动用于存储新创建的 topic ,但不会影响已有的 topic 。可以采取以下两种措施:

    • 使用 kafka-reassign-partitions.sh 脚本,将指定 topic 的所有分区迁移到指定 broker 上。
    • 使用 Kafka Manager ,在网页上迁移 topic ,更方便。

# Kafka Manager

:一个 Web 服务器,用于管理 Kafka 。

  • GitHub (opens new window)
  • 主要用于监控、管理 topic、partition ,不支持查看 Kafka 消息。
  • 由 Yahoo 公司开源,采用 Java 开发。
  • 2020 年,发布 v3 版本。为了避免与 Kafka 版权冲突而改名为 Cluster Manager for Apache Kafka ,简称为 CMAK 。

# 部署

  • 用 docker-compose 部署:
    version: '3'
    
    services:
      kafka-manager:
        container_name: kafka-manager
        image: kafkamanager/kafka-manager:3.0.0.4
        restart: unless-stopped
        ports:
          - 9000:9000
        environment:
          ZK_HOSTS: 10.0.0.1:2181
          # JAVA_OPTS: -Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/jaas.conf
          KAFKA_MANAGER_AUTH_ENABLED: 'true'    # 是否启用 Basic Auth ,默认为 false
          KAFKA_MANAGER_USERNAME: admin
          KAFKA_MANAGER_PASSWORD: ******
    
    • Kafka Manager 需要连接到 Kafka broker 以进行监控,还需要一个 zk 集群来存储自己的数据。

# 用法

  • 访问 Kafka Manager 的 Web 页面,即可创建一个 Cluster ,表示 Kafka 集群。

    • 需要指定该 Kafka 集群对应的 zk 集群的地址。
    • 可以勾选 "JMX Polling" ,连接到 Kafka 的 JMX 端口,监控消息的传输速度。
    • 可以勾选 "Poll consumer information",监控消费者的 offset 。
  • 支持查看、创建、配置 topic、partition 。

    • topic 的统计信息示例:
      Replication                 3       # 每个分区的的副本数
      Number of Partitions        3       # 该 topic 的分区数
      Sum of partition offsets    0
      Total number of Brokers     3       # Kafka 集群存在的 Broker 数
      Number of Brokers for Topic 2       # 该 topic 存储时占用的 Broker 数
      Preferred Replicas %        100     # Leader replica 为 Preferred replica 的分区,所占百分比
      Brokers Skewed %            0       # 存储的副本过多的 Broker 所占百分比
      Brokers Leader Skewed %     0       # 存储的 Leader replica 过多的 Broker 所占百分比
      Brokers Spread %            66      # 该 topic 占用的 Kafka 集群的 Broker 百分比,这里等于 2/3 * 100%
      Under-replicated %          0       # 存在未同步副本的那些分区,所占百分比
      
      • 上例中的 topic 总共有 3×3 个副本,占用 2 个 Broker 。为了负载均衡,每个 Broker 应该存储 3×3÷2 个副本,取整后可以为 4 或 5 。如果某个 Broker 实际存储的副本数超过该值,则视作 Skewed 。
    • Broker 的统计信息示例:
      Broker      # of Partitions     # as Leader     Partitions    Skewed?     Leader Skewed?
      0           3                   1               (0,1,2)       false       false
      1           3                   2               (0,1,2)       false       true
      2           3                   0               (0,1,2)       false       false
      
      每行表示一个 Broker 的信息,各列分别表示该 Broker 的:
      • ID
      • 存储的分区副本数
      • 存储的 Leader replica 数
      • 存储的各个分区 ID
      • 存储的副本是否过多
      • 存储的 Leader replica 是否过多
  • 支持 preferred-replica-election 。

  • 支持 Reassign Partitions 。用法如下:

    1. 在 topic 详情页面,点击 Generate Partition Assignments ,设置允许该 topic 分配到哪些 broker 上的策略。
    2. 点击 Run Partition Assignments ,执行自动分配的策略。
    • 如果不满足策略,则自动迁移 replica 到指定的 broker 上,并重新选举 leader 。
      • 迁移 replica 时会导致客户端短暂无法访问。
      • 同时迁移多个 replica 时,可能负载过大,导致 Kafka broker 卡死。
    • 如果已满足策略,则不会进行迁移。因此该操作具有幂等性。
    • 也可以点击 Manual Partition Assignments ,进行手动分配。
    1. 到菜单栏的 Reassign Partitions 页面,查看正在执行的分配操作。

# Kafka Eagle

:一个 Web 服务器,用于 管理 Kafka 。

  • GitHub (opens new window)
  • 与 Kafka Manager 相比,多了查询消息、生产消息、配置告警的功能,但缺少关于 Preferred Replica 的功能。
  • 采用 Java 开发,占用大概 1G 内存。

# Kowl

:一个 Web 服务器,用于 管理 Kafka 。

  • GitHub (opens new window)
  • 与 Kafka Manager 相比,页面更美观,多了查看 topic 体积、查看消息内容、查看 consumer 详情的功能。
  • 采用 Golang 开发,只占用几十 MB 内存。
  • 用 docker-compose 部署:
    version: "3"
    
    services:
      kowl:
        container_name: kowl
        image: quay.io/cloudhut/kowl:v1.5.0
        restart: unless-stopped
        # command:
        #   - -config.filepath=kowl.yaml
        environment:
          KAFKA_BROKERS: 10.0.0.1:9092
        ports:
          - 8080:8080
        # volumes:
        #   - ./kowl.yaml:/app/kowl.yaml
    

# Offset Explorer

:旧名为 Kafka Tool ,是一个 GUI 工具,可用作 Kafka 客户端。

# ♢ kafka-python

:Python 的第三方库,提供了 Kafka 客户端的功能。

  • 官方文档 (opens new window)

  • 安装:pip install kafka-python

  • 例:生产消息

    from kafka import KafkaProducer
    
    # 创建一个 Producer ,连接到 broker
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    
    # 生产一个消息到指定的 topic
    msg = 'Hello'
    future = producer.send(topic='topic_1',
                           value=msg.encode(),  # 消息的内容,必须为 bytes 类型
                           # partition=None,    # 指定分区。默认为 None ,会自动分配一个分区
                           # key=None,          # 指定 key ,bytes 类型。用于根据 key 的哈希值分配一个分区,比如相同 key 的消息总是分配到相同分区
                          )
    try:
        metadata = future.get(timeout=1)  # 等待服务器的回复
    except Exception as e:
        print(str(e))
    
    print('sent: ', msg)
    print('offset: ', metadata.offset)
    print('topic: ',  metadata.topic)
    print('partition: ', metadata.partition)
    
  • 例:消费消息

    from kafka import KafkaConsumer
    
    # 创建一个 Consumer ,连接到 broker ,订阅指定的 topic
    consumer = KafkaConsumer('topic_1', bootstrap_servers='localhost:9092')
    
    # Consumer 对象支持迭代。默认从最新 offset 处开始迭代,没有收到消息时会一直阻塞等待,除非设置了 consumer_timeout_ms
    for msg in consumer:
        print('got: ', msg.value.decode())
        print('offset: ', msg.offset)
    
    from kafka import TopicPartition
    
    consumer.subscribe('topic_1')
    consumer.seek(TopicPartition(topic='topic_1', partition=1), 0)    # 调整消费的 offset
    consumer.seek_to_beginning(TopicPartition(topic='topic_1', partition=0))
    msg_set = consumer.poll(max_records=10)
    
  • KafkaConsumer 的定义:

    class KafkaConsumer(six.Iterator):
        def __init__(self, *topics, **configs)
            """ 先输入要消费的 topics 。可用的 configs 参数如下:
            # 关于 broker
            bootstrap_servers='localhost',  # 可以指定一个列表,例如 ['10.0.0.1:9092', '10.0.0.2:9092']
    
            # 关于 SASL 认证
            sasl_mechanism='PLAIN',
            security_protocol='SASL_PLAINTEXT',
            sasl_plain_username='admin',
            sasl_plain_password='******',
    
            # 关于客户端
            client_id='client_1',           # 客户端的名称,默认为 kafka-python-$version
            group_id='group_1',             # 消费者组的名称,默认为 None ,即不加入消费者组
            consumer_timeout_ms=5000,       # 迭代消息时,持续一定时长未收到新消息则结束迭代。默认无限制
            """
    
        def assign(self, partitions)
            """ 主动分配当前 consumer 消费的 TopicPartition 。
            此时不会被 Coordinator 处理,不会触发 Rebalance 。但不能与 subscribe() 同时使用,否则报错。
            例: consumer.assign([TopicPartition(topic='topic_1', partition=0), TopicPartition(topic='topic_1', partition=1)])
            """
    
        def assignment(self)
            """ 返回一个集合,包含当前 consumer 被分配的所有 TopicPartition """
    
        def close(self, autocommit=True)
            """ 关闭 consumer """
    
        def commit(self, offsets=None)
            """ 提交 offsets 。这会阻塞 consumer 直到成功提交 """
    
        def committed(self, partition, metadata=False)
            """ 返回指定 TopicPartition 的 Committed Offset """
    
        def commit_async(self, offsets=None, callback=None)
            """ 异步地提交 offsets """
    
        def poll(self, timeout_ms=0, max_records=None, update_offsets=True)
            """ 从当前 consumer 被分配的 partition 拉取消息,组成一个集合并返回。
            update_offsets :是否自动递增 offset 以便下一次拉取。
            """
    
        def seek(self, partition, offset)
            """ 调整当前消费的 offset ,用于下一次 poll """
    
        def seek_to_beginning(self, *partitions)
            """ 将 offset 调整到可见的最老 offset """
    
        def seek_to_end(self, *partitions)
            """ 将 offset 调整到可见的最新 offset """
    
        def subscribe(self, topics=(), pattern=None, listener=None)
            """ 订阅一组 topics ,或者订阅与正则表达式匹配的 topics ,这会自动分配 TopicPartition 。
            不支持增加订阅,新的订阅列表会覆盖旧的订阅列表。
            可以订阅不存在的 topic ,此时不会被分配 partition ,调用 poll() 的结果为空集合 。
            """
    
        def subscription(self)
            """ 返回一个集合,包含当前 consumer 订阅的所有 topic 的名称 """
    
        def unsubscribe(self)
            """ 取消订阅的所有 topics """
        ...
    

# ♢ confluent-kafka

:Python 的第三方库,提供了 Kafka 客户端的功能,比 kafka-python 的功能更多。

  • GitHub (opens new window)
  • 安装:pip install confluent-kafka
  • 例:消费消息
    from confluent_kafka import Consumer
    
    # 创建消费者,传入配置参数,兼容 Kafka 原生的 properties
    consumer = Consumer({
        'bootstrap.servers': '10.0.0.1,10.0.0.2,10.0.0.3',
        'group.id': 'test_group_1',
        'group.instance.id': 'static_member_1',
        'auto.offset.reset': 'earliest'
    })
    
    consumer.subscribe(['topic_1'])
    msg = consumer.poll(timeout=1)    # 消费一条消息。如果超过 timeout 依然未获取到消息,则返回 None
    consumer.close()