(window.webpackJsonp=window.webpackJsonp||[]).push([[160],{475:function(t,s,a){"use strict";a.r(s);var n=a(10),e=Object(n.a)({},(function(){var t=this,s=t._self._c;return s("ContentSlotsDistributor",{attrs:{"slot-key":t.$parent.slotKey}},[s("h1",{attrs:{id:"工具"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#工具"}},[t._v("#")]),t._v(" 工具")]),t._v(" "),s("h2",{attrs:{id:"shell-脚本"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#shell-脚本"}},[t._v("#")]),t._v(" shell 脚本")]),t._v(" "),s("p",[t._v("kafka 的 bin 目录下自带了多个 shell 脚本，可用于管理 Kafka 。")]),t._v(" "),s("ul",[s("li",[s("p",[s("code",[t._v("kafka-server-stop.sh")]),t._v(" 用于停止 broker 进程。")]),t._v(" "),s("ul",[s("li",[t._v("它会查找本机上的所有 broker 进程，发送 SIGTERM 信号。\n"),s("ul",[s("li",[t._v("broker 进程收到终止信号后，会将所有数据保存到磁盘中，才退出，该过程需要几秒甚至几十秒。")])])]),t._v(" "),s("li",[t._v("如果强制杀死 broker 进程，可能导致数据丢失。重启时会发出警告："),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("WARN  Found a corrupted index file, xxxx/0000000000000000xxxx.index, deleting and rebuilding index"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("..")]),t._v(".\n")])])]),s("ul",[s("li",[t._v("此时，broker 需要重建各分区的 index 文件，耗时几十分钟，然后才能加入 Kafka 集群。")]),t._v(" "),s("li",[t._v("如果正常终止，则 broker 重启耗时只有几秒。")])])])])]),t._v(" "),s("li",[s("p",[s("code",[t._v("kafka-topics.sh")]),t._v(" 用于管理 topic 。例：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 列出所有 topic 名称")]),t._v("\nbin/kafka-topics.sh  "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--zookeeper")]),t._v(" localhost:2181  "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--list")]),t._v("         "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 这里连接到 zookeeper 来获取数据")]),t._v("\nbin/kafka-topics.sh  --bootstrap-server localhost:9092  "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--list")]),t._v("  "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 这里连接到 kafka 来获取数据")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 创建一个 topic ，并指定配置参数")]),t._v("\nbin/kafka-topics.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--create")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--partitions")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v(" --replication-factor "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 删除一个 topic")]),t._v("\nbin/kafka-topics.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--delete")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 查看一个 topic 的状态、配置参数")]),t._v("\nbin/kafka-topics.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--describe")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1\n")])])])]),t._v(" "),s("li",[s("p",[s("code",[t._v("kafka-configs.sh")]),t._v(" 用于管理配置参数。例：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 查看 broker 的配置参数（只显示与配置文件不同的部分，即在运行时被修改的配置参数）")]),t._v("\nbin/kafka-configs.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--describe")]),t._v(" --entity-type brokers --entity-name "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 查看 broker 的全部配置参数")]),t._v("\nbin/kafka-configs.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--describe")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--all")]),t._v(" --entity-type brokers --entity-name "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 给一个 topic 添加配置参数")]),t._v("\nbin/kafka-configs.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--alter")]),t._v(" --entity-type topics --entity-name topic_1 --add-config "),s("span",{pre:!0,attrs:{class:"token assign-left variable"}},[t._v("retention.ms")]),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("xx,retention.bytes"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("xx\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 给一个 topic 删除配置参数")]),t._v("\nbin/kafka-configs.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--alter")]),t._v(" --entity-type topics --entity-name topic_1 --delete-config retention.ms,retention.bytes\n")])])])]),t._v(" "),s("li",[s("p",[t._v("运行生产者终端，从 stdin 读取消息并发送到 broker ：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("bin/kafka-console-producer.sh --broker-list localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--producer.config")]),t._v(" config/producer.properties    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 指定生产者的配置文件，否则采用默认配置")]),t._v("\n")])])])]),t._v(" "),s("li",[s("p",[t._v("运行消费者终端，读取消息并输出到 stdout ：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--group")]),t._v(" group_1     "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 指定 consumer group 的 ID ，不指定则随机生成")]),t._v("\n    --from-beginning    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 从第一条消息开始消费")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--consumer.config")]),t._v(" config/consumer.properties\n")])])])]),t._v(" "),s("li",[s("p",[t._v("运行生产者的性能测试：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("bin/kafka-producer-perf-test.sh\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--producer.config")]),t._v(" config/producer.properties\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" topic_1\n    --num-records "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("10000")]),t._v("   "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 发送多少条消息")]),t._v("\n    --record-size "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1024")]),t._v("    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 每条消息的大小")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--throughput")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("10000")]),t._v("    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 限制每秒种发送的消息数")]),t._v("\n")])])])]),t._v(" "),s("li",[s("p",[s("code",[t._v("kafka-consumer-groups.sh")]),t._v(" 用于管理消费组。用法：")]),t._v(" "),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--list")]),t._v("            "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 列出所有消费组的名称")]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--group")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("<")]),t._v("name"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v(">")]),t._v("    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 指定消费组。可多次使用该参数，也可用 --all-groups 指定所有")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--delete")]),t._v("        "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 删除消费组，这会删除其所有 offset")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--describe")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--members")]),t._v("     "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 查看所有成员")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--offsets")]),t._v("     "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 查看对各个 topic partition 的 Committed Offset")]),t._v("\n\n      "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--topic")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("<")]),t._v("name"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v(">")]),t._v("  "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 指定一个 topic 。可多次使用该参数，也可用 --all-topics 指定所有")]),t._v("\n        --delete-offsets            "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 删除 offset")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--execute")]),t._v(" --reset-offsets   "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 调整对某个 topic 的 offset")]),t._v("\n          --to-offset "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("<")]),t._v("n"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v(">")]),t._v("\n          --to-earliest\n          --to-latest\n")])])])]),t._v(" "),s("li",[s("p",[t._v("Kafka 会自动决定将每个 partition 分配到哪个 broker 上存储。用户也可以主动创建 reassign 迁移任务，将某些 topic 的某些 partition 重新分配到指定 broker 。")]),t._v(" "),s("ul",[s("li",[t._v("创建 reassign 任务时，\n"),s("ul",[s("li",[t._v("需要先制定迁移计划，保存为 reassignment.json 文件，然后根据它创建迁移任务。")]),t._v(" "),s("li",[t._v("每个 reassign 任务可以迁移多个 topic 的多个 partition 。但建议同时只迁移一个 topic ，否则同时迁移大量数据可能占满磁盘读写速度。")]),t._v(" "),s("li",[t._v("例如给 Kafka 集群新增 broker 之后，可能被自动用于存储新创建的 topic ，但不会影响旧的 topic ，因此需要手动 reassign 旧的 topic 。")])])]),t._v(" "),s("li",[t._v("执行 reassign 任务时，\n"),s("ul",[s("li",[t._v("同时只能执行一个 reassign 任务。")]),t._v(" "),s("li",[t._v("broker 会自动将 partition 的数据文件，从原来的 broker 拷贝到新 broker ，然后从原来的 broker 删除。")]),t._v(" "),s("li",[t._v("执行过程中，该 partition 不支持客户端生产、消费。")])])]),t._v(" "),s("li",[t._v("使用 "),s("code",[t._v("kafka-reassign-partitions.sh")]),t._v(" 脚本的示例："),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("bin/kafka-topics.sh --bootstrap-server "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("127.0")]),t._v(".0.1:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--list")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("&>")]),t._v(" topics\n\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("for")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token for-or-select variable"}},[t._v("topic")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("in")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token variable"}},[s("span",{pre:!0,attrs:{class:"token variable"}},[t._v("`")]),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("cat")]),t._v(" topics"),s("span",{pre:!0,attrs:{class:"token variable"}},[t._v("`")])]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("do")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 指定需要迁移的 topic")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("echo")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v('\'\n    {\n      "version": 1,\n      "topics": [\n        { "topic": "<topic>"}\n      ]\n    }\n    \'')]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("|")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("sed")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v('"s#<topic>#'),s("span",{pre:!0,attrs:{class:"token variable"}},[t._v("$topic")]),t._v('#g"')]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v(">")]),t._v(" topics.json\n    "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("echo")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'----------'")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("echo")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'handling topic:'")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token variable"}},[t._v("$topic")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 生成迁移计划，将 topics 迁移到目标 broker-list")]),t._v("\n    bin/kafka-reassign-partitions.sh --bootstrap-server "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("127.0")]),t._v(".0.1:9092 --broker-list "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'1,2,3'")]),t._v(" --topics-to-move-json-file topics.json "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--generate")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("|")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("tail")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("-n")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v(">")]),t._v(" reassignment.json\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 开始迁移")]),t._v("\n    bin/kafka-reassign-partitions.sh --bootstrap-server "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("127.0")]),t._v(".0.1:9092 --reassignment-json-file reassignment.json "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--execute")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("&&")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("break")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("while")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("true")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("do")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 等待迁移完成")]),t._v("\n        bin/kafka-reassign-partitions.sh --bootstrap-server "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("127.0")]),t._v(".0.1:9092 --reassignment-json-file reassignment.json "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--verify")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("|")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("grep")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'still in progress'")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("||")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("break")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token builtin class-name"}},[t._v("echo")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'Waiting for the current task to complete...'")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("sleep")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("done")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("done")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 显示正在执行的迁移任务")]),t._v("\nbin/kafka-reassign-partitions.sh --bootstrap-server "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("127.0")]),t._v(".0.1:9092 "),s("span",{pre:!0,attrs:{class:"token parameter variable"}},[t._v("--list")]),t._v("\n")])])])])])])]),t._v(" "),s("h2",{attrs:{id:"kafka-manager"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#kafka-manager"}},[t._v("#")]),t._v(" Kafka Manager")]),t._v(" "),s("p",[t._v("：一个 Web 服务器，用于管理 Kafka 。")]),t._v(" "),s("ul",[s("li",[s("a",{attrs:{href:"https://github.com/yahoo/CMAK",target:"_blank",rel:"noopener noreferrer"}},[t._v("GitHub"),s("OutboundLink")],1)]),t._v(" "),s("li",[t._v("主要用于监控、管理 topic、partition ，不支持查看 Kafka 消息。")]),t._v(" "),s("li",[t._v("由 Yahoo 公司开源，采用 Java 语言开发。")]),t._v(" "),s("li",[t._v("2020 年，发布 v3 版本。为了避免与 Kafka 版权冲突而改名为 Cluster Manager for Apache Kafka ，简称为 CMAK 。")])]),t._v(" "),s("h3",{attrs:{id:"部署"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#部署"}},[t._v("#")]),t._v(" 部署")]),t._v(" "),s("ul",[s("li",[t._v("用 docker-compose 部署："),s("div",{staticClass:"language-yml extra-class"},[s("pre",{pre:!0,attrs:{class:"language-yml"}},[s("code",[s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("version")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'3'")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("services")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n  "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("kafka-manager")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("container_name")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" kafka"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v("manager\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("image")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" kafkamanager/kafka"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v("manager"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("3.0.0.4\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("restart")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" unless"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v("stopped\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("ports")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v(" 9000"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("9000")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("environment")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("ZK_HOSTS")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" 10.0.0.1"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("2181")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# JAVA_OPTS: -Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/jaas.conf")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("KAFKA_MANAGER_AUTH_ENABLED")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'true'")]),t._v("    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 是否启用 Basic Auth ，默认为 false")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("KAFKA_MANAGER_USERNAME")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" admin\n      "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("KAFKA_MANAGER_PASSWORD")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token important"}},[t._v("******")]),t._v("\n")])])]),s("ul",[s("li",[t._v("Kafka Manager 需要连接到 Kafka broker 以进行监控，还需要一个 zk 集群来存储自己的数据。")])])])]),t._v(" "),s("h3",{attrs:{id:"用法"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#用法"}},[t._v("#")]),t._v(" 用法")]),t._v(" "),s("ul",[s("li",[s("p",[t._v("访问 Kafka Manager 的 Web 页面，即可创建一个 Cluster ，表示 Kafka 集群。")]),t._v(" "),s("ul",[s("li",[t._v("需要指定该 Kafka 集群对应的 zk 集群的地址。")]),t._v(" "),s("li",[t._v('可以勾选 "JMX Polling" ，连接到 Kafka 的 JMX 端口，监控消息的传输速度。')]),t._v(" "),s("li",[t._v('可以勾选 "Poll consumer information"，监控消费者的 offset 。')])])]),t._v(" "),s("li",[s("p",[t._v("支持查看、创建、配置 topic、partition 。")]),t._v(" "),s("ul",[s("li",[t._v("topic 的统计信息示例："),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("Replication                 "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 每个分区的的副本数")]),t._v("\nNumber of Partitions        "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 该 topic 的分区数")]),t._v("\nSum of partition offsets    "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("\nTotal number of Brokers     "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# Kafka 集群存在的 Broker 数")]),t._v("\nNumber of Brokers "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("for")]),t._v(" Topic "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("2")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 该 topic 存储时占用的 Broker 数")]),t._v("\nPreferred Replicas %        "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("100")]),t._v("     "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# Leader replica 为 Preferred replica 的分区，所占百分比")]),t._v("\nBrokers Skewed %            "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 存储的副本过多的 Broker 所占百分比")]),t._v("\nBrokers Leader Skewed %     "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 存储的 Leader replica 过多的 Broker 所占百分比")]),t._v("\nBrokers Spread %            "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("66")]),t._v("      "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 该 topic 占用的 Kafka 集群的 Broker 百分比，这里等于 2/3 * 100%")]),t._v("\nUnder-replicated %          "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 存在未同步副本的那些分区，所占百分比")]),t._v("\n")])])]),s("ul",[s("li",[t._v("上例中的 topic 总共有 3×3 个副本，占用 2 个 Broker 。为了负载均衡，每个 Broker 应该存储 3×3÷2 个副本，取整后可以为 4 或 5 。如果某个 Broker 实际存储的副本数超过该值，则视作 Skewed 。")])])]),t._v(" "),s("li",[t._v("Broker 的统计信息示例："),s("div",{staticClass:"language-sh extra-class"},[s("pre",{pre:!0,attrs:{class:"language-sh"}},[s("code",[t._v("Broker      "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# of Partitions     # as Leader     Partitions    Skewed?     Leader Skewed?")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("           "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("                   "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v("               "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0,1")]),t._v(",2"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("false")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("false")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),t._v("           "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("                   "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("2")]),t._v("               "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0,1")]),t._v(",2"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("false")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("true")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("2")]),t._v("           "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("3")]),t._v("                   "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),t._v("               "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0,1")]),t._v(",2"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("false")]),t._v("       "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("false")]),t._v("\n")])])]),t._v("每行表示一个 Broker 的信息，各列分别表示该 Broker 的：\n"),s("ul",[s("li",[t._v("ID")]),t._v(" "),s("li",[t._v("存储的分区副本数")]),t._v(" "),s("li",[t._v("存储的 Leader replica 数")]),t._v(" "),s("li",[t._v("存储的各个分区 ID")]),t._v(" "),s("li",[t._v("存储的副本是否过多")]),t._v(" "),s("li",[t._v("存储的 Leader replica 是否过多")])])])])]),t._v(" "),s("li",[s("p",[t._v("支持 preferred-replica-election 。")])]),t._v(" "),s("li",[s("p",[t._v("支持 Reassign Partitions 。用法如下：")]),t._v(" "),s("ol",[s("li",[t._v("在 topic 详情页面，点击 "),s("code",[t._v("Generate Partition Assignments")]),t._v(" ，设置允许该 topic 分配到哪些 broker 上的策略。")]),t._v(" "),s("li",[t._v("点击 "),s("code",[t._v("Run Partition Assignments")]),t._v(" ，执行自动分配的策略。")])]),t._v(" "),s("ul",[s("li",[t._v("如果不满足策略，则自动迁移 replica 到指定的 broker 上，并重新选举 leader 。\n"),s("ul",[s("li",[t._v("迁移 replica 时会导致客户端短暂无法访问。")]),t._v(" "),s("li",[t._v("同时迁移多个 replica 时，可能负载过大，导致 Kafka broker 卡死。")])])]),t._v(" "),s("li",[t._v("如果已满足策略，则不会进行迁移。因此该操作具有幂等性。")]),t._v(" "),s("li",[t._v("也可以点击 "),s("code",[t._v("Manual Partition Assignments")]),t._v(" ，进行手动分配。")])]),t._v(" "),s("ol",{attrs:{start:"3"}},[s("li",[t._v("到菜单栏的 "),s("code",[t._v("Reassign Partitions")]),t._v(" 页面，查看正在执行的分配操作。")])])])]),t._v(" "),s("h2",{attrs:{id:"kafka-eagle"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#kafka-eagle"}},[t._v("#")]),t._v(" Kafka Eagle")]),t._v(" "),s("p",[t._v("：一个 Web 服务器，用于管理 Kafka 。改名为了 EFAK（Eagle For Apache Kafka）。")]),t._v(" "),s("ul",[s("li",[s("a",{attrs:{href:"https://github.com/smartloli/EFAK",target:"_blank",rel:"noopener noreferrer"}},[t._v("GitHub"),s("OutboundLink")],1)]),t._v(" "),s("li",[t._v("与 Kafka Manager 相比，多了查看消息内容、生产消息、配置告警的功能。")]),t._v(" "),s("li",[t._v("采用 Java 语言开发，占用大概 1G 内存。")])]),t._v(" "),s("h2",{attrs:{id:"redpanda"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#redpanda"}},[t._v("#")]),t._v(" Redpanda")]),t._v(" "),s("p",[t._v("：一个 Web 服务器，用于管理 Kafka 。原名为 Kowl 。")]),t._v(" "),s("ul",[s("li",[s("a",{attrs:{href:"https://github.com/redpanda-data/console",target:"_blank",rel:"noopener noreferrer"}},[t._v("GitHub"),s("OutboundLink")],1)]),t._v(" "),s("li",[t._v("与 Kafka Manager 相比，页面更美观，多了查看 topic 体积、查看消息内容、生产消息、修改 offset 的功能。但免费版不支持 Reassign Partitions 。")]),t._v(" "),s("li",[t._v("采用 Golang 语言开发，只占用几十 MB 内存。")]),t._v(" "),s("li",[t._v("用 docker-compose 部署："),s("div",{staticClass:"language-yml extra-class"},[s("pre",{pre:!0,attrs:{class:"language-yml"}},[s("code",[s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("version")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v('"3"')]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("services")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n  "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("redpanda")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("container_name")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" redpanda\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("image")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" vectorized/console"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("v2.4.5\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("restart")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" unless"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v("stopped\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# command:")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("#   - -config.filepath=redpanda.yaml")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("environment")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("KAFKA_BROKERS")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" 10.0.0.1"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("9092")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token key atrule"}},[t._v("ports")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n      "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("-")]),t._v(" 8080"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("8080")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# volumes:")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("#   - ./redpanda.yaml:/app/redpanda.yaml")]),t._v("\n")])])])])]),t._v(" "),s("h2",{attrs:{id:"offset-explorer"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#offset-explorer"}},[t._v("#")]),t._v(" Offset Explorer")]),t._v(" "),s("p",[t._v("：旧名为 Kafka Tool ，是一个 GUI 工具，可用作 Kafka 客户端。")]),t._v(" "),s("ul",[s("li",[s("a",{attrs:{href:"https://www.kafkatool.com/",target:"_blank",rel:"noopener noreferrer"}},[t._v("官网"),s("OutboundLink")],1)]),t._v(" "),s("li",[t._v("支持查看、管理 topic ，支持查看消息、生产消息，缺少关于监控的功能。")])]),t._v(" "),s("h2",{attrs:{id:"import-kafka-python"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#import-kafka-python"}},[t._v("#")]),t._v(" import kafka-python")]),t._v(" "),s("p",[t._v("：Python 的第三方库，提供了 Kafka 客户端的功能。")]),t._v(" "),s("ul",[s("li",[s("p",[s("a",{attrs:{href:"https://kafka-python.readthedocs.io/en/master/index.html",target:"_blank",rel:"noopener noreferrer"}},[t._v("官方文档"),s("OutboundLink")],1)])]),t._v(" "),s("li",[s("p",[t._v("安装："),s("code",[t._v("pip install kafka-python")])])]),t._v(" "),s("li",[s("p",[t._v("例：生产消息")]),t._v(" "),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("from")]),t._v(" kafka "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("import")]),t._v(" KafkaProducer\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 创建一个 Producer ，连接到 broker")]),t._v("\nproducer "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" KafkaProducer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("bootstrap_servers"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'localhost:9092'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 生产一条消息到指定的 topic")]),t._v("\nmsg "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'Hello'")]),t._v("\nfuture "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" producer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("send"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("topic"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("\n                       value"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("encode"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("  "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 消息的内容，必须为 bytes 类型")]),t._v("\n                       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# partition=None,    # 指定分区。默认为 None ，会自动分配一个分区")]),t._v("\n                       "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# key=None,          # 指定 key")]),t._v("\n                      "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("try")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    metadata "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" future"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("get"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("timeout"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("5.0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("  "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 等待服务器的回复")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("except")]),t._v(" Exception "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("as")]),t._v(" e"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token builtin"}},[t._v("str")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("e"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'sent: '")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'offset: '")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" metadata"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("offset"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic: '")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("  metadata"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("topic"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'partition: '")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" metadata"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("partition"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n")])])])]),t._v(" "),s("li",[s("p",[t._v("例：消费消息")]),t._v(" "),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("from")]),t._v(" kafka "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("import")]),t._v(" KafkaConsumer\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 创建一个 Consumer ，连接到 broker ，订阅指定的 topic")]),t._v("\nconsumer "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" KafkaConsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" bootstrap_servers"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'localhost:9092'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# Consumer 对象支持迭代。默认从最新 offset 处开始迭代，没有收到消息时会一直阻塞等待，除非设置了 consumer_timeout_ms")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("for")]),t._v(" msg "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("in")]),t._v(" consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string-interpolation"}},[s("span",{pre:!0,attrs:{class:"token string"}},[t._v("f'")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("offset"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v(" ")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("value"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("decode"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'")])]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n")])])]),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("from")]),t._v(" kafka "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("import")]),t._v(" TopicPartition\n\nconsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("subscribe"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\nconsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("seek"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("TopicPartition"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("topic"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" partition"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 调整消费的 offset")]),t._v("\nconsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("seek_to_beginning"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("TopicPartition"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("topic"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" partition"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\nmsg_set "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("poll"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("max_records"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("10")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n")])])])]),t._v(" "),s("li",[s("p",[t._v("KafkaConsumer 的定义：")]),t._v(" "),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("class")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token class-name"}},[t._v("KafkaConsumer")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("six"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("Iterator"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("__init__")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("*")]),t._v("topics"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("**")]),t._v("configs"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v("\"\"\" 先输入要消费的 topics 。可用的 configs 参数如下：\n        # 关于 broker\n        bootstrap_servers='localhost',  # 可以指定一个列表，例如 ['10.0.0.1:9092', '10.0.0.2:9092']\n\n        # 关于 SASL 认证\n        sasl_mechanism='PLAIN',\n        security_protocol='SASL_PLAINTEXT',\n        sasl_plain_username='admin',\n        sasl_plain_password='******',\n\n        # 关于客户端\n        client_id='client_1',           # 客户端的名称，默认为 kafka-python-$version\n        group_id='group_1',             # 消费者组的名称，默认为 None ，即不加入消费者组\n        consumer_timeout_ms=5000,       # 迭代消息时，持续一定时长未收到新消息则结束迭代。默认无限制\n        \"\"\"")]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("assign")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" partitions"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 主动分配当前 consumer 消费的 TopicPartition 。\n        此时不会被 Coordinator 处理，不会触发 rebalance 。但不能与 subscribe() 同时使用，否则报错。\n        例： consumer.assign([TopicPartition(topic=\'topic_1\', partition=0), TopicPartition(topic=\'topic_1\', partition=1)])\n        """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("assignment")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 返回一个集合，包含当前 consumer 被分配的所有 TopicPartition """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("close")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" autocommit"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("True")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 关闭 consumer """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("commit")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" offsets"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 提交 offsets 。这会阻塞 consumer 直到成功提交 """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("committed")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" partition"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" metadata"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("False")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 返回指定 TopicPartition 的 Committed Offset """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("commit_async")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" offsets"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" callback"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 异步地提交 offsets """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("poll")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" timeout_ms"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" max_records"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" update_offsets"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("True")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 从当前 consumer 被分配的 partition 拉取消息，组成一个集合并返回。\n        update_offsets ：是否自动递增 offset 以便下一次拉取。\n        """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("seek")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" partition"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" offset"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 调整当前消费的 offset ，用于下一次 poll """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("seek_to_beginning")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("*")]),t._v("partitions"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 将 offset 调整到可见的最老 offset """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("seek_to_end")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("*")]),t._v("partitions"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 将 offset 调整到可见的最新 offset """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("subscribe")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" topics"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" pattern"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" listener"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 订阅一组 topics ，或者订阅与正则表达式匹配的 topics ，这会自动分配 TopicPartition 。\n        不支持增加订阅，新的订阅列表会覆盖旧的订阅列表。\n        可以订阅不存在的 topic ，此时不会被分配 partition ，调用 poll() 的结果为空集合 。\n        """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("subscription")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 返回一个集合，包含当前 consumer 订阅的所有 topic 的名称 """')]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("unsubscribe")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("self"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 取消订阅的所有 topics """')]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("\n")])])])])]),t._v(" "),s("h2",{attrs:{id:"import-confluent-kafka"}},[s("a",{staticClass:"header-anchor",attrs:{href:"#import-confluent-kafka"}},[t._v("#")]),t._v(" import confluent-kafka")]),t._v(" "),s("p",[t._v("：Python 的第三方库，提供了 Kafka 客户端的功能，比 kafka-python 的功能更多。")]),t._v(" "),s("ul",[s("li",[s("a",{attrs:{href:"https://github.com/confluentinc/confluent-kafka-python",target:"_blank",rel:"noopener noreferrer"}},[t._v("GitHub"),s("OutboundLink")],1)]),t._v(" "),s("li",[t._v("安装："),s("code",[t._v("pip install confluent-kafka")])]),t._v(" "),s("li",[t._v("例：生产消息"),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("from")]),t._v(" confluent_kafka "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("import")]),t._v(" Consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" KafkaException\n\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("produce_messages")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("producer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" msg_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v("\"\"\" 生产消息，输入示例：\n    msg_list = [\n        {'topic': 'test',\n        'value': 'hello'.encode('utf-8'),\n        'key': None,\n        'headers': None,\n        }\n    ]\n    \"\"\"")]),t._v("\n    success_list "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),t._v("\n    failed_list "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("delivery_report")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("err"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token triple-quoted-string string"}},[t._v('""" 每次调用 poll() 或 flush() 生产消息时，会自动调用一次该函数，检查交付结果。 """')]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("if")]),t._v(" err"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n            failed_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("append"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n            "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'[ERROR] Message delivery failed: {}'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),s("span",{pre:!0,attrs:{class:"token builtin"}},[t._v("format")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("err"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("else")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n            success_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("append"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("for")]),t._v(" msg "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("in")]),t._v(" msg_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 检查之前调用 produce() 生产消息的交付结果")]),t._v("\n        producer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("poll"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 生产消息。produce() 函数会异步执行，需要之后调用 poll() 或 flush() 来检查交付结果")]),t._v("\n        producer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("produce"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("topic"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" value"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'value'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" key"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'key'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" headers"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'headers'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" on_delivery"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v("delivery_report"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 等待剩下的消息发送完毕")]),t._v("\n    producer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("flush"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string-interpolation"}},[s("span",{pre:!0,attrs:{class:"token string"}},[t._v("f'已生产消息，成功数 ")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),s("span",{pre:!0,attrs:{class:"token builtin"}},[t._v("len")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("success_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v(" ，失败数 ")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),s("span",{pre:!0,attrs:{class:"token builtin"}},[t._v("len")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("failed_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'")])]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n")])])])]),t._v(" "),s("li",[t._v("例：消费消息"),s("div",{staticClass:"language-py extra-class"},[s("pre",{pre:!0,attrs:{class:"language-py"}},[s("code",[s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("from")]),t._v(" confluent_kafka "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("import")]),t._v(" Consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" KafkaException\n\n"),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 创建消费者，传入配置参数，兼容 Kafka 原生的 properties")]),t._v("\nconsumer "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" Consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'bootstrap.servers'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'10.0.0.1,10.0.0.2,10.0.0.3'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'group.id'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'test_group_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'group.instance.id'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'static_member_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'auto.offset.reset'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'earliest'")]),t._v("\n"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\nconsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("subscribe"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'topic_1'")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n\n"),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("def")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token function"}},[t._v("consume_messages")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" max_count"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("1")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n    msg_list "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("[")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("]")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("for")]),t._v(" i "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("in")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token builtin"}},[t._v("range")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("max_count"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token comment"}},[t._v("# 消费一条消息。如果超过 timeout 依然未获取到消息，则返回 None 。如果 timeout 只有 1s ，则遇到 rebalance 就可能超时")]),t._v("\n        msg "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),t._v(" consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("poll"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("timeout"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("5.0")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("if")]),t._v(" msg "),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("==")]),t._v(" "),s("span",{pre:!0,attrs:{class:"token boolean"}},[t._v("None")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n            "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string-interpolation"}},[s("span",{pre:!0,attrs:{class:"token string"}},[t._v("f'未拉取到新消息'")])]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n            "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("return")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("if")]),t._v(" msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("error"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(":")]),t._v("\n            "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("raise")]),t._v(" KafkaException"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("error"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        msg_list"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("append"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n        "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("print")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token string-interpolation"}},[s("span",{pre:!0,attrs:{class:"token string"}},[t._v("f'成功消费 1 条消息 ")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("offset"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v(" ")]),s("span",{pre:!0,attrs:{class:"token interpolation"}},[s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("{")]),t._v("msg"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("value"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("decode"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("}")])]),s("span",{pre:!0,attrs:{class:"token string"}},[t._v("'")])]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n    "),s("span",{pre:!0,attrs:{class:"token keyword"}},[t._v("return")]),t._v(" msg_list\n\nconsume_messages"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),t._v("consumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(",")]),t._v(" max_count"),s("span",{pre:!0,attrs:{class:"token operator"}},[t._v("=")]),s("span",{pre:!0,attrs:{class:"token number"}},[t._v("10")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\nconsumer"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(".")]),t._v("close"),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v("(")]),s("span",{pre:!0,attrs:{class:"token punctuation"}},[t._v(")")]),t._v("\n")])])])])])])}),[],!1,null,null,null);s.default=e.exports}}]);