# 工具

# shell 脚本

kafka 的 bin 目录下自带了多个 shell 脚本,可用于管理 Kafka 。

  • kafka-server-stop.sh 用于停止 broker 进程。

    • 它会查找本机上的所有 broker 进程,发送 SIGTERM 信号。
      • broker 进程收到终止信号后,会将所有数据保存到磁盘中,才退出,该过程需要几秒甚至几十秒。
    • 如果强制杀死 broker 进程,可能导致数据丢失。重启时会发出警告:
      WARN  Found a corrupted index file, xxxx/0000000000000000xxxx.index, deleting and rebuilding index...
      
      • 此时,broker 需要重建各分区的 index 文件,耗时几十分钟,然后才能加入 Kafka 集群。
      • 如果正常终止,则 broker 重启耗时只有几秒。
  • kafka-topics.sh 用于管理 topic 。例:

    # 列出所有 topic 名称
    bin/kafka-topics.sh  --zookeeper localhost:2181  --list         # 这里连接到 zookeeper 来获取数据
    bin/kafka-topics.sh  --bootstrap-server localhost:9092  --list  # 这里连接到 kafka 来获取数据
    
    # 创建一个 topic ,并指定配置参数
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_1 --partitions 1 --replication-factor 1
    
    # 删除一个 topic
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic_1
    
    # 查看一个 topic 的状态、配置参数
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic_1
    
  • kafka-configs.sh 用于管理配置参数。例:

    # 查看 broker 的配置参数(只显示与配置文件不同的部分,即在运行时被修改的配置参数)
    bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0
    
    # 查看 broker 的全部配置参数
    bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --all --entity-type brokers --entity-name 0
    
    # 给一个 topic 添加配置参数
    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic_1 --add-config retention.ms=xx,retention.bytes=xx
    
    # 给一个 topic 删除配置参数
    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic_1 --delete-config retention.ms,retention.bytes
    
  • 运行生产者终端,从 stdin 读取消息并发送到 broker :

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_1
        --producer.config config/producer.properties    # 指定生产者的配置文件,否则采用默认配置
    
  • 运行消费者终端,读取消息并输出到 stdout :

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
        --group group_1     # 指定 consumer group 的 ID ,不指定则随机生成
        --from-beginning    # 从第一条消息开始消费
        --consumer.config config/consumer.properties
    
  • 运行生产者的性能测试:

    bin/kafka-producer-perf-test.sh
        --producer.config config/producer.properties
        --topic topic_1
        --num-records 10000   # 发送多少条消息
        --record-size 1024    # 每条消息的大小
        --throughput 10000    # 限制每秒种发送的消息数
    
  • kafka-consumer-groups.sh 用于管理消费组。用法:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092
        --list            # 列出所有消费组的名称
    
        --group <name>    # 指定消费组。可多次使用该参数,也可用 --all-groups 指定所有
          --delete        # 删除消费组,这会删除其所有 offset
          --describe
            --members     # 查看所有成员
            --offsets     # 查看对各个 topic partition 的 Committed Offset
    
          --topic <name>  # 指定一个 topic 。可多次使用该参数,也可用 --all-topics 指定所有
            --delete-offsets            # 删除 offset
            --execute --reset-offsets   # 调整对某个 topic 的 offset
              --to-offset <n>
              --to-earliest
              --to-latest
    
  • kafka-mirror-maker.sh 用于同步数据。从一个 kafka 中消费消息,写入另一个 kafka 。

    bin/kafka-mirror-maker.sh
        --consumer.config consumer.properties    # 连接到 src kafka
        --producer.config producer.properties    # 连接到 dst kafka
        --num.streams 1     # consumer 线程数
        --num.producers 1   # producer 线程数
        --whitelist ".*"    # 在 src kafka 中,找出匹配正则表达式的所有 topic ,消费它们
    
  • Kafka 会自动决定将每个 partition 分配到哪个 broker 上存储。用户也可以主动创建 reassign 迁移任务,将某些 topic 的某些 partition 重新分配到指定 broker 。

    • 创建 reassign 任务时,
      • 需要先制定迁移计划,保存为 reassignment.json 文件,然后根据它创建迁移任务。
      • 每个 reassign 任务可以迁移多个 topic 的多个 partition 。但建议同时只迁移一个 topic ,否则同时迁移大量数据可能占满磁盘读写速度。
      • 例如给 Kafka 集群新增 broker 之后,可能被自动用于存储新创建的 topic ,但不会影响旧的 topic ,因此需要手动 reassign 旧的 topic 。
    • 执行 reassign 任务时,
      • 同时只能执行一个 reassign 任务。
      • broker 会自动将 partition 的数据文件,从原来的 broker 拷贝到新 broker ,然后从原来的 broker 删除。
      • 执行过程中,该 partition 不支持客户端生产、消费。
    • 使用 kafka-reassign-partitions.sh 脚本的示例:
      bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list &> topics
      
      for topic in `cat topics`
      do
          # 指定需要迁移的 topic
          echo '
          {
            "version": 1,
            "topics": [
              { "topic": "<topic>"}
            ]
          }
          ' | sed "s#<topic>#$topic#g" > topics.json
          echo '----------'
          echo 'handling topic:' $topic
          # 生成迁移计划,将 topics 迁移到目标 broker-list
          bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --broker-list '1,2,3' --topics-to-move-json-file topics.json --generate | tail -n 1 > reassignment.json
          # 开始迁移
          bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file reassignment.json --execute && break
          while true
          do
              # 等待迁移完成
              bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file reassignment.json --verify | grep 'still in progress' || break
              echo 'Waiting for the current task to complete...'
              sleep 1
          done
      done
      
      # 显示正在执行的迁移任务
      bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --list
      

# Kafka Manager

:一个 Web 服务器,用于管理 Kafka 。

  • GitHub (opens new window)
  • 主要用于监控、管理 topic、partition ,不支持查看 Kafka 消息。
  • 由 Yahoo 公司开源,采用 Java 语言开发。
  • 2020 年,发布 v3 版本。为了避免与 Kafka 版权冲突而改名为 Cluster Manager for Apache Kafka ,简称为 CMAK 。

# 部署

  • 用 docker-compose 部署:
    version: '3'
    
    services:
      kafka-manager:
        container_name: kafka-manager
        image: kafkamanager/kafka-manager:3.0.0.4
        restart: unless-stopped
        ports:
          - 9000:9000
        environment:
          ZK_HOSTS: 10.0.0.1:2181
          # JAVA_OPTS: -Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/jaas.conf
          KAFKA_MANAGER_AUTH_ENABLED: 'true'    # 是否启用 Basic Auth ,默认为 false
          KAFKA_MANAGER_USERNAME: admin
          KAFKA_MANAGER_PASSWORD: ******
    
    • Kafka Manager 需要连接到 Kafka broker 以进行监控,还需要一个 zk 集群来存储自己的数据。

# 用法

  • 访问 Kafka Manager 的 Web 页面,即可创建一个 Cluster ,表示 Kafka 集群。

    • 需要指定该 Kafka 集群对应的 zk 集群的地址。
    • 可以勾选 "JMX Polling" ,连接到 Kafka 的 JMX 端口,监控消息的传输速度。
    • 可以勾选 "Poll consumer information",监控消费者的 offset 。
  • 支持查看、创建、配置 topic、partition 。

    • topic 的统计信息示例:
      Replication                 3       # 每个分区的的副本数
      Number of Partitions        3       # 该 topic 的分区数
      Sum of partition offsets    0
      Total number of Brokers     3       # Kafka 集群存在的 Broker 数
      Number of Brokers for Topic 2       # 该 topic 存储时占用的 Broker 数
      Preferred Replicas %        100     # Leader replica 为 Preferred replica 的分区,所占百分比
      Brokers Skewed %            0       # 存储的副本过多的 Broker 所占百分比
      Brokers Leader Skewed %     0       # 存储的 Leader replica 过多的 Broker 所占百分比
      Brokers Spread %            66      # 该 topic 占用的 Kafka 集群的 Broker 百分比,这里等于 2/3 * 100%
      Under-replicated %          0       # 存在未同步副本的那些分区,所占百分比
      
      • 上例中的 topic 总共有 3×3 个副本,占用 2 个 Broker 。为了负载均衡,每个 Broker 应该存储 3×3÷2 个副本,取整后可以为 4 或 5 。如果某个 Broker 实际存储的副本数超过该值,则视作 Skewed 。
    • Broker 的统计信息示例:
      Broker      # of Partitions     # as Leader     Partitions    Skewed?     Leader Skewed?
      0           3                   1               (0,1,2)       false       false
      1           3                   2               (0,1,2)       false       true
      2           3                   0               (0,1,2)       false       false
      
      每行表示一个 Broker 的信息,各列分别表示该 Broker 的:
      • ID
      • 存储的分区副本数
      • 存储的 Leader replica 数
      • 存储的各个分区 ID
      • 存储的副本是否过多
      • 存储的 Leader replica 是否过多
  • 支持 preferred-replica-election 。

  • 支持 Reassign Partitions 。用法如下:

    1. 在 topic 详情页面,点击 Generate Partition Assignments ,设置允许该 topic 分配到哪些 broker 上的策略。
    2. 点击 Run Partition Assignments ,执行自动分配的策略。
    • 如果不满足策略,则自动迁移 replica 到指定的 broker 上,并重新选举 leader 。
      • 迁移 replica 时会导致客户端短暂无法访问。
      • 同时迁移多个 replica 时,可能负载过大,导致 Kafka broker 卡死。
    • 如果已满足策略,则不会进行迁移。因此该操作具有幂等性。
    • 也可以点击 Manual Partition Assignments ,进行手动分配。
    1. 到菜单栏的 Reassign Partitions 页面,查看正在执行的分配操作。

# Kafka Eagle

:一个 Web 服务器,用于管理 Kafka 。改名为了 EFAK(Eagle For Apache Kafka)。

  • GitHub (opens new window)
  • 与 Kafka Manager 相比,多了查看消息内容、生产消息、配置告警的功能。
  • 采用 Java 语言开发,占用大概 1G 内存。

# Redpanda

:一个 Web 服务器,用于管理 Kafka 。原名为 Kowl 。

  • GitHub (opens new window)
  • 与 Kafka Manager 相比,页面更美观,多了查看 topic 体积、查看消息内容、生产消息、修改 offset 的功能。但免费版不支持 Reassign Partitions 。
  • 采用 Golang 语言开发,只占用几十 MB 内存。
  • 用 docker-compose 部署:
    version: "3"
    
    services:
      redpanda:
        container_name: redpanda
        image: vectorized/console:v2.4.5
        restart: unless-stopped
        # command:
        #   - -config.filepath=redpanda.yaml
        environment:
          KAFKA_BROKERS: 10.0.0.1:9092
        ports:
          - 8080:8080
        # volumes:
        #   - ./redpanda.yaml:/app/redpanda.yaml
    

# Offset Explorer

:旧名为 Kafka Tool ,是一个 GUI 工具,可用作 Kafka 客户端。

# import kafka-python

:Python 的第三方库,提供了 Kafka 客户端的功能。

  • 官方文档 (opens new window)

  • 安装:pip install kafka-python

  • 例:生产消息

    from kafka import KafkaProducer
    
    # 创建一个 Producer ,连接到 broker
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    
    # 生产一条消息到指定的 topic
    msg = 'Hello'
    future = producer.send(topic='topic_1',
                           value=msg.encode(),  # 消息的内容,必须为 bytes 类型
                           # partition=None,    # 指定分区。默认为 None ,会自动分配一个分区
                           # key=None,          # 指定 key
                          )
    try:
        metadata = future.get(timeout=5.0)  # 等待服务器的回复
    except Exception as e:
        print(str(e))
    
    print('sent: ', msg)
    print('offset: ', metadata.offset)
    print('topic: ',  metadata.topic)
    print('partition: ', metadata.partition)
    
  • 例:消费消息

    from kafka import KafkaConsumer
    
    # 创建一个 Consumer ,连接到 broker ,订阅指定的 topic
    consumer = KafkaConsumer('topic_1', bootstrap_servers='localhost:9092')
    
    # Consumer 对象支持迭代。默认从最新 offset 处开始迭代,没有收到消息时会一直阻塞等待,除非设置了 consumer_timeout_ms
    for msg in consumer:
        print(f'{msg.offset=} {msg.value.decode()=}')
    
    from kafka import TopicPartition
    
    consumer.subscribe('topic_1')
    consumer.seek(TopicPartition(topic='topic_1', partition=1), 0)    # 调整消费的 offset
    consumer.seek_to_beginning(TopicPartition(topic='topic_1', partition=0))
    msg_set = consumer.poll(max_records=10)
    
  • KafkaConsumer 的定义:

    class KafkaConsumer(six.Iterator):
        def __init__(self, *topics, **configs)
            """ 先输入要消费的 topics 。可用的 configs 参数如下:
            # 关于 broker
            bootstrap_servers='localhost',  # 可以指定一个列表,例如 ['10.0.0.1:9092', '10.0.0.2:9092']
    
            # 关于 SASL 认证
            sasl_mechanism='PLAIN',
            security_protocol='SASL_PLAINTEXT',
            sasl_plain_username='admin',
            sasl_plain_password='******',
    
            # 关于客户端
            client_id='client_1',           # 客户端的名称,默认为 kafka-python-$version
            group_id='group_1',             # 消费者组的名称,默认为 None ,即不加入消费者组
            consumer_timeout_ms=5000,       # 迭代消息时,持续一定时长未收到新消息则结束迭代。默认无限制
            """
    
        def assign(self, partitions)
            """ 主动分配当前 consumer 消费的 TopicPartition 。
            此时不会被 Coordinator 处理,不会触发 rebalance 。但不能与 subscribe() 同时使用,否则报错。
            例: consumer.assign([TopicPartition(topic='topic_1', partition=0), TopicPartition(topic='topic_1', partition=1)])
            """
    
        def assignment(self)
            """ 返回一个集合,包含当前 consumer 被分配的所有 TopicPartition """
    
        def close(self, autocommit=True)
            """ 关闭 consumer """
    
        def commit(self, offsets=None)
            """ 提交 offsets 。这会阻塞 consumer 直到成功提交 """
    
        def committed(self, partition, metadata=False)
            """ 返回指定 TopicPartition 的 Committed Offset """
    
        def commit_async(self, offsets=None, callback=None)
            """ 异步地提交 offsets """
    
        def poll(self, timeout_ms=0, max_records=None, update_offsets=True)
            """ 从当前 consumer 被分配的 partition 拉取消息,组成一个集合并返回。
            update_offsets :是否自动递增 offset 以便下一次拉取。
            """
    
        def seek(self, partition, offset)
            """ 调整当前消费的 offset ,用于下一次 poll """
    
        def seek_to_beginning(self, *partitions)
            """ 将 offset 调整到可见的最老 offset """
    
        def seek_to_end(self, *partitions)
            """ 将 offset 调整到可见的最新 offset """
    
        def subscribe(self, topics=(), pattern=None, listener=None)
            """ 订阅一组 topics ,或者订阅与正则表达式匹配的 topics ,这会自动分配 TopicPartition 。
            不支持增加订阅,新的订阅列表会覆盖旧的订阅列表。
            可以订阅不存在的 topic ,此时不会被分配 partition ,调用 poll() 的结果为空集合 。
            """
    
        def subscription(self)
            """ 返回一个集合,包含当前 consumer 订阅的所有 topic 的名称 """
    
        def unsubscribe(self)
            """ 取消订阅的所有 topics """
        ...
    

# import confluent-kafka

:Python 的第三方库,提供了 Kafka 客户端的功能,比 kafka-python 的功能更多。

  • GitHub (opens new window)
  • 安装:pip install confluent-kafka
  • 例:生产消息
    from confluent_kafka import Producer
    
    producer = Producer({'bootstrap.servers': '10.0.0.1,10.0.0.2,10.0.0.3'})
    
    def produce_messages(producer, msg_list):
        """ 生产消息,输入示例:
        msg_list = [
            {'topic': 'test',
            'value': 'hello'.encode('utf-8'),
            'key': None,
            'headers': None,
            }
        ]
        """
        success_list = []
        failed_list = []
        def delivery_report(err, msg):
            """ 每次调用 poll() 或 flush() 生产消息时,会自动调用一次该函数,检查交付结果。 """
            if err:
                failed_list.append(msg)
                print('Failed to send message {} , error: {}'.format(msg.key(), err))
            else:
                success_list.append(msg)
    
        for msg in msg_list:
            # 检查之前调用 produce() 生产消息的交付结果
            producer.poll(0)
            # 生产消息。produce() 函数会异步执行,需要之后调用 poll() 或 flush() 来检查交付结果
            producer.produce(topic=msg['topic'], value=msg['value'], key=msg['key'], headers=msg['headers'], on_delivery=delivery_report)
    
        # 等待剩下的消息发送完毕
        producer.flush()
        print(f'已生产消息,成功数 {len(success_list)} ,失败数 {len(failed_list)}')
    
  • 例:消费消息
    from confluent_kafka import Consumer, KafkaException
    
    # 创建消费者,传入配置参数,兼容 Kafka 原生的 properties
    consumer = Consumer({
        'bootstrap.servers': '10.0.0.1,10.0.0.2,10.0.0.3',
        'group.id': 'test_group_1',
        'group.instance.id': 'static_member_1',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe(['topic_1'])
    
    def consume_messages(consumer, max_count=1):
        msg_list = []
        for i in range(max_count):
            # 消费一条消息。如果超过 timeout 依然未获取到消息,则返回 None 。如果 timeout 只有 1s ,则遇到 rebalance 就可能超时
            msg = consumer.poll(timeout=5.0)
            if msg == None:
                print(f'未拉取到新消息')
                return
            if msg.error():
                raise KafkaException(msg.error())
            msg_list.append(msg)
            print(f'成功消费 1 条消息 {msg.offset()=} {msg.value().decode()=}')
        return msg_list
    
    consume_messages(consumer, max_count=10)
    consumer.close()