Skip to content

在 EMQX 旗舰版中使用 EMQX Streaming

本页面为您提供了如何在旗舰版部署中使用 EMQX Streaming 功能的详细指导。

前置准备

在开始之前,请确保您已完成以下步骤:

  • 已创建了一个旗舰版部署。有关详细说明,请参考创建旗舰版部署
  • 已为您的部署启用 VPC 对等连接。有关说明,请参考 VPC 对等连接设置
  • 已设置用于发布消息的 MQTT 客户端。
  • 已准备好用于消费消息的 Kafka 客户端。

在 EMQX Platform 控制台中启用 EMQX Streaming

EMQX Streaming 功能只能通过提交工单来激活。

  1. 登录 EMQX Platform,进入您的部署,导航到 Streaming (beta)
  2. 点击页面上的开通 Streaming (beta) 按钮提交工单。

一旦 EMQX Streaming 被激活,Kafka 端点信息将显示在 Streaming (beta) -> 概览页面的中。

添加用户

Kafka 客户端需经过认证才能访问 EMQX Streaming,因此您需要创建一个用户进行身份认证。

  1. 导航到 Streaming (beta) -> 访问控制 -> 客户端认证
  2. 在客户端认证页面点击添加。在弹出的添加认证窗口中,完成以下设置:
    • 认证方式:选择 SCRAM-SHA-256(推荐作为默认机制)。
    • 用户名:输入一个用户名,例如 admin
    • 密码:为所提供的用户名设置一个密码。
  3. 点击确认保存新用户。

添加 ACL 规则

创建 Kafka 用户后,下一步是为用户分配必要的权限,以便访问 EMQX Streaming。这些权限通过访问控制列表(ACL)规则进行配置。ACL 规则定义了哪些 Kafka 用户被允许对特定资源(如 MQTT 主题、消费组和集群)执行特定操作。

一个 ACL 规则遵循以下格式: 用户被 [允许|拒绝] 从 Host 对匹配资源类型资源进行操作

EMQX 流式数据的授权机制默认采用白名单模型:只有在 ACL 规则中明确列出的 Kafka 用户才能访问指定资源,而未列出的用户默认被拒绝访问。

添加 ACL 规则的步骤:

  1. 导航到 Streaming (beta) -> 访问控制 -> 客户端授权
  2. 在客户端授权页面点击添加。在弹出的添加授权信息窗口中,完成以下设置:
    • 用户名:指定此 ACL 规则适用的用户。您可以选择两种用户名模式:
      • 匹配所有:规则适用于所有用户。
      • 精确匹配:规则仅适用于您输入的特定用户。
    • Host:输入用户连接到 EMQX Streaming 资源的 Host IP 地址。您可以选择以下模式:
      • 匹配所有:规则适用于从任何 Host 连接的用户。
      • 精确匹配:规则仅适用于从您提供的特定 Host IP 地址连接的用户。
    • 资源类型:选择 ACL 规则适用的资源类型。这决定了目标值中的模式将作用于何种类型的资源。可选的资源类型有:
      • 主题:适用于 MQTT 主题,您可以将 ACL 规则应用于特定主题或主题模式。
      • 消费者组:适用于 Kafka 消费者组,您可以设置与 Kafka 消费者组操作相关的规则。
      • 集群:适用于 EMQX 集群,您可以在集群级别应用规则,进行更广泛的访问控制。
    • 目标值:定义一个模式,匹配所选资源类型的资源名称。此字段决定了 ACL 规则将应用于哪些特定资源,具体取决于您指定的模式。您可以选择以下选项:
      • 匹配所有:规则适用于所选资源类型的所有资源。
      • 精确匹配:规则仅适用于特定的资源名称(例如,特定的主题 t1/+ 或特定的消费组)。
      • 前缀匹配:规则适用于所有以指定前缀开始的资源名称(例如,t1/ 将匹配以 t1/ 开头的所有主题)。
    • 操作:选择规则管理的操作类型。全部操作表示适用于所有可用的操作。
    • 是否允许:选择是否允许或拒绝指定用户、Host 和资源上的操作。
  3. 点击确认保存 ACL 规则。

authorization

创建 Stream

用户可以创建两种类型的 stream:Default 和 Free。Default stream 与 MQTT 主题过滤器关联,默认创建 16 个分区,用于存储所有匹配的 MQTT 消息。Free stream 独立于 MQTT 主题运作,具有可自定义的分区数量,为非 MQTT 使用场景提供了更大的灵活性。

两种类型的 stream 的保留时间都固定为 1 天,且无法修改。

  1. 导航到 Streaming (beta) -> Streams
  2. 在 Streams 页面点击新建。在新建 Streams 弹窗中,完成以下设置:
    • Stream 名称:为 stream 提供一个名称,例如 demo。Kafka 客户端的 Kafka 主题必须与 stream 名称匹配。
    • Stream 类型:选择 stream 类型。
      • Default:Default stream 与 MQTT 主题过滤器关联,匹配该过滤器的 MQTT 消息将保存到该 stream 中。
      • Free:Free stream 不与 MQTT 主题过滤器关联,通常用于其他数据处理用途。
    • MQTT 主题过滤器:如果选择了 Default 类型,输入一个 MQTT 消息的主题过滤器。匹配该过滤器的 MQTT 消息将保存到相应的 default stream 中。本示例中,输入 t1/+
    • 分区数量:如果选择了 Free 类型,请指定分区数量,以便 stream 在扩展性和并行处理方面进行划分。
  3. 点击确认

新的 stream 创建完成后,它将出现在 Streams 列表中。点击 Stream 名称列中的名称,可以查看分区偏移量及其他 stream 的详细信息。

stream_details

使用 MQTT 客户端发布消息

您可以使用 MQTTX 模拟一个 MQTT 客户端,并将消息发布到 t1/at1/b 主题。

publish_messages

使用 Kafka CLI 消费消息

请按照以下步骤下载官方 Kafka CLI 工具:

  1. Kafka 下载页面下载并安装官方 Kafka CLI 工具。
  2. 配置 Kafka CLI 工具以连接 Streaming (beta) -> 概览页面中提供的 Kafka 端点。

创建配置文件以提供凭证

  1. 为 Kafka CLI 工具创建一个配置文件 config/client.properties
  2. 将以下内容添加到配置文件中:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="<your username>" \
  password="<your password>";

检索主题信息

使用 Kafka CLI,可以通过 describe 命令检索主题信息。确保 bootstrap-server 选项设置为 Streaming (beta) -> 概览页面上显示的 Kafka 端点。

示例命令:

bash
bin/kafka-topics.sh --describe --bootstrap-server <your streaming endpoint> --command-config config/client.properties

retrieve_topic

从主题中消费消息

要使用 Kafka CLI 消费消息,请执行以下命令:

bash
bin/kafka-console-consumer.sh --bootstrap-server <your streaming endpoint> --topic demo --from-beginning --consumer.config config/client.properties

此命令将从指定 stream 的开头开始消费消息。您应看到先前发布的消息被成功消费。

consume_messages

在 EMQX Platform 控制台中查看已消费的消息

要验证已消费的消息,导航到 EMQX Platform 控制台的 Streaming (beta) -> 消费者群组

consumer_groups

点击组 ID 以查看有关消费者组的详细信息,例如消费者列表及其消费进度。

consumer_group_detail

查看 EMQX Streaming 指标

Streaming (beta) -> 概览页面提供了 Kafka 端点和 EMQX Streaming 的全面指标视图。它展示了包括当前 Streaming 数、分区、消费者组和消息速率等 Streaming 操作的统计信息。

streaming_overview

删除 Stream

要删除一个 stream:

  1. 进入 Streaming (beta) -> Streams
  2. 点击要删除的 Stream 操作列下的删除图标。
  3. 点击确认删除。