# Pulsar

:一个消息队列服务器。

  • 官方文档 (opens new window)
  • 发音为 /ˈpʌlsɑː(r)/
  • 采用 Java 语言开发。
  • 2016 年由 Yahoo 公司开源,后来交给 ASF 基金会管理。

# 原理

  • Pulsar 的特点:

    • 采用发布-订阅模式,支持队列模型、流模型。
    • Pulsar 使用 Zookeeper 存储集群元数据、配置,协调分布式系统。
    • 生产者(Producer)负责将消息发送到 topic 。而消费者可以订阅多个 topic 。
    • Pulsar 可以创建多个租户(tenant),每个 tenant 可以创建多个 namespace ,每个 namespace 下可以创建多个 topic 。
  • Kafka 的服务器只有 broker 一种角色,同时负责存储数据、处理客户端请求。而 Pulsar 将服务器分为两种角色:

    • broker :负责处理客户端请求、代理 BookKeeper 集群。是无状态服务,可以随时增减。
    • bookie :负责存储数据,又称为 BookKeeper 集群。
  • Pulsar 像 Kafka 一样以追加日志的形式存储数据,并将日志以 Segment 为单位分段,分散存储到各个 Bookie 服务器上。

  • topic 中的消息默认会持久保存,直到被所有订阅者确认消费(ACK)。

    • 如果 topic 不存在一个订阅,则消息不会保留,会在 segment rollover 时被删除。
    • 可选设置 TTL ,自动将超时的消息标记为确认消费,允许被删除。
    • 可选设置保留策略(Retention),将已确认消费的消息,保留一定大小或时长。
  • 对比 Kafka 与 Pulsar :

    • Kafka 增减 broker 数量时需要迁移 partition 、rebalance ,耗时久。而 Pulsar 以 Segment 为单位在服务器之间迁移数据。
    • Pulsar 软件还不成熟,架构较复杂,因此一般情况下还是推荐使用 Kafka 。

# 部署

  • 用 docker-compose 部署:
    version: '3'
    
    services:
      pulsar:
        container_name: pulsar
        image: apachepulsar/pulsar:2.8.2
        restart: unless-stopped
        entrypoint:
          - sh
          - -c
          - "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
        environment:
          PULSAR_MEM: -Xms2g -Xmx2g
        ports:
          - 6650:6650   # broker 监听的端口
          - 8080:8080   # HTTP 端口
        volumes:
          - ./conf:/pulsar/conf
          - ./data:/pulsar/data
    

# 命令

  • pulsar 自带了一个命令行工具,用法如下:
    bin/pulsar-admin
            tenants
                list          # 列出所有租户
    
            namespaces
                list   <tenant>               # 列出租户的所有命名空间
                create <tenant>/<namespace>   # 创建一个命名空间
                delete <tenant>/<namespace>   # 删除一个命名空间
    
                get-message-ttl <tenant>/<namespace>                      # 查询命名空间的消息 TTL
                set-message-ttl <tenant>/<namespace> --messageTTL 3600    # 设置 TTL ,单位为秒。如果设置为 0 ,则禁用
                get-retention <tenant>/<namespace>                        # 查询命名空间的消息过期策略
                set-retention <tenant>/<namespace> --size 100M --time 1h  # 设置消息过期策略
    
            topics
                list <tenant>/<namespace>   # 列出命名空间下的所有 topic
                delete persistent://<tenant>/<namespace>/<topic>  # 删除一个 topic
                    --force   # 强制删除。默认不能删除有活跃的 producer 或 subscription 的 topic
    
            broker-stats
                topics        # 查询所有 topic 的状态
    
    • 例:
      [root@CentOS ~]# bin/pulsar-admin tenants list
      "public"
      "pulsar"
      "sample"
      [root@CentOS ~]# bin/pulsar-admin topics list public/functions
      "persistent://public/functions/metadata"
      "persistent://public/functions/assignments"
      "persistent://public/functions/coordinate"