在 EMQX 旗舰版中使用 EMQX Streaming
本页面为您提供了如何在旗舰版部署中使用 EMQX Streaming 功能的详细指导。
前置准备
在开始之前,请确保您已完成以下步骤:
- 已创建了一个旗舰版部署。有关详细说明,请参考创建旗舰版部署。
- 已为您的部署启用 VPC 对等连接。有关说明,请参考 VPC 对等连接设置。
- 已设置用于发布消息的 MQTT 客户端。
- 已准备好用于消费消息的 Kafka 客户端。
在 EMQX Platform 控制台中启用 EMQX Streaming
EMQX Streaming 功能只能通过提交工单来激活。
- 登录 EMQX Platform,进入您的部署,导航到 Streaming (beta)。
- 点击页面上的开通 Streaming (beta) 按钮提交工单。
一旦 EMQX Streaming 被激活,Kafka 端点信息将显示在 Streaming (beta) -> 概览页面的中。
添加用户
Kafka 客户端需经过认证才能访问 EMQX Streaming,因此您需要创建一个用户进行身份认证。
- 导航到 Streaming (beta) -> 访问控制 -> 客户端认证。
- 在客户端认证页面点击添加。在弹出的添加认证窗口中,完成以下设置:
- 认证方式:选择
SCRAM-SHA-256
(推荐作为默认机制)。 - 用户名:输入一个用户名,例如
admin
。 - 密码:为所提供的用户名设置一个密码。
- 认证方式:选择
- 点击确认保存新用户。
添加 ACL 规则
创建 Kafka 用户后,下一步是为用户分配必要的权限,以便访问 EMQX Streaming。这些权限通过访问控制列表(ACL)规则进行配置。ACL 规则定义了哪些 Kafka 用户被允许对特定资源(如 MQTT 主题、消费组和集群)执行特定操作。
一个 ACL 规则遵循以下格式: 用户被 [允许|拒绝] 从 Host 对匹配资源类型的资源进行操作。
EMQX 流式数据的授权机制默认采用白名单模型:只有在 ACL 规则中明确列出的 Kafka 用户才能访问指定资源,而未列出的用户默认被拒绝访问。
添加 ACL 规则的步骤:
- 导航到 Streaming (beta) -> 访问控制 -> 客户端授权。
- 在客户端授权页面点击添加。在弹出的添加授权信息窗口中,完成以下设置:
- 用户名:指定此 ACL 规则适用的用户。您可以选择两种用户名模式:
- 匹配所有:规则适用于所有用户。
- 精确匹配:规则仅适用于您输入的特定用户。
- Host:输入用户连接到 EMQX Streaming 资源的 Host IP 地址。您可以选择以下模式:
- 匹配所有:规则适用于从任何 Host 连接的用户。
- 精确匹配:规则仅适用于从您提供的特定 Host IP 地址连接的用户。
- 资源类型:选择 ACL 规则适用的资源类型。这决定了目标值中的模式将作用于何种类型的资源。可选的资源类型有:
- 主题:适用于 MQTT 主题,您可以将 ACL 规则应用于特定主题或主题模式。
- 消费者组:适用于 Kafka 消费者组,您可以设置与 Kafka 消费者组操作相关的规则。
- 集群:适用于 EMQX 集群,您可以在集群级别应用规则,进行更广泛的访问控制。
- 目标值:定义一个模式,匹配所选资源类型的资源名称。此字段决定了 ACL 规则将应用于哪些特定资源,具体取决于您指定的模式。您可以选择以下选项:
- 匹配所有:规则适用于所选资源类型的所有资源。
- 精确匹配:规则仅适用于特定的资源名称(例如,特定的主题
t1/+
或特定的消费组)。 - 前缀匹配:规则适用于所有以指定前缀开始的资源名称(例如,
t1/
将匹配以t1/
开头的所有主题)。
- 操作:选择规则管理的操作类型。
全部操作
表示适用于所有可用的操作。 - 是否允许:选择是否允许或拒绝指定用户、Host 和资源上的操作。
- 用户名:指定此 ACL 规则适用的用户。您可以选择两种用户名模式:
- 点击确认保存 ACL 规则。
创建 Stream
用户可以创建两种类型的 stream:Default 和 Free。Default stream 与 MQTT 主题过滤器关联,默认创建 16 个分区,用于存储所有匹配的 MQTT 消息。Free stream 独立于 MQTT 主题运作,具有可自定义的分区数量,为非 MQTT 使用场景提供了更大的灵活性。
两种类型的 stream 的保留时间都固定为 1 天,且无法修改。
- 导航到 Streaming (beta) -> Streams。
- 在 Streams 页面点击新建。在新建 Streams 弹窗中,完成以下设置:
- Stream 名称:为 stream 提供一个名称,例如
demo
。Kafka 客户端的 Kafka 主题必须与 stream 名称匹配。 - Stream 类型:选择 stream 类型。
- Default:Default stream 与 MQTT 主题过滤器关联,匹配该过滤器的 MQTT 消息将保存到该 stream 中。
- Free:Free stream 不与 MQTT 主题过滤器关联,通常用于其他数据处理用途。
- MQTT 主题过滤器:如果选择了
Default
类型,输入一个 MQTT 消息的主题过滤器。匹配该过滤器的 MQTT 消息将保存到相应的 default stream 中。本示例中,输入t1/+
。 - 分区数量:如果选择了
Free
类型,请指定分区数量,以便 stream 在扩展性和并行处理方面进行划分。
- Stream 名称:为 stream 提供一个名称,例如
- 点击确认。
新的 stream 创建完成后,它将出现在 Streams 列表中。点击 Stream 名称列中的名称,可以查看分区偏移量及其他 stream 的详细信息。
使用 MQTT 客户端发布消息
您可以使用 MQTTX 模拟一个 MQTT 客户端,并将消息发布到 t1/a
和 t1/b
主题。
使用 Kafka CLI 消费消息
请按照以下步骤下载官方 Kafka CLI 工具:
- 从 Kafka 下载页面下载并安装官方 Kafka CLI 工具。
- 配置 Kafka CLI 工具以连接 Streaming (beta) -> 概览页面中提供的 Kafka 端点。
创建配置文件以提供凭证
- 为 Kafka CLI 工具创建一个配置文件
config/client.properties
。 - 将以下内容添加到配置文件中:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<your username>" \
password="<your password>";
检索主题信息
使用 Kafka CLI,可以通过 describe
命令检索主题信息。确保 bootstrap-server
选项设置为 Streaming (beta) -> 概览页面上显示的 Kafka 端点。
示例命令:
bin/kafka-topics.sh --describe --bootstrap-server <your streaming endpoint> --command-config config/client.properties
从主题中消费消息
要使用 Kafka CLI 消费消息,请执行以下命令:
bin/kafka-console-consumer.sh --bootstrap-server <your streaming endpoint> --topic demo --from-beginning --consumer.config config/client.properties
此命令将从指定 stream 的开头开始消费消息。您应看到先前发布的消息被成功消费。
在 EMQX Platform 控制台中查看已消费的消息
要验证已消费的消息,导航到 EMQX Platform 控制台的 Streaming (beta) -> 消费者群组。
点击组 ID 以查看有关消费者组的详细信息,例如消费者列表及其消费进度。
查看 EMQX Streaming 指标
Streaming (beta) -> 概览页面提供了 Kafka 端点和 EMQX Streaming 的全面指标视图。它展示了包括当前 Streaming 数、分区、消费者组和消息速率等 Streaming 操作的统计信息。
删除 Stream
要删除一个 stream:
- 进入 Streaming (beta) -> Streams。
- 点击要删除的 Stream 操作列下的删除图标。
- 点击确认删除。