# 集成 Confluent

Confluent Cloud 基于 Apache Kafka,是一项弹性、可扩展、并完全托管的流式数据服务。EMQX 支持通过数据桥接实现与 Confluent 的数据集成,使您能够轻松将 MQTT 数据流式传输到 Confluent。本页面主要为您介绍了Confluent 集成的特性和优势,并指导您如何配置 Confluent Cloud 以及如何在 EMQX 中创建 Confluent 数据桥接。

# 特性与优势

EMQX 与 Confluent 的集成具备以下特性与优势:

  • 大规模消息传输的可靠性:EMQX 和 Confluent Cloud 均采用了高可靠的集群机制,可以建立稳定可靠的消息传输通道,确保大规模物联网设备消息的零丢失。两者都可以通过添加节点实现水平扩展、动态调整资源来应对突发的大规模消息,保证消息传输的可用性。

  • 强大的数据处理能力:EMQX 的本地规则引擎与 Confluent Cloud 都提供了可靠的流式数据处理能力,作用在物联网数据从设备到应用的不同阶段,可以根据场景选择性地进行实时的数据过滤、格式转换、聚合分析等处理,实现更复杂的物联网消息处理流程,满足数据分析应用的需求。

  • 强大的集成能力:通过 Confluent Cloud 提供的各种 Connector,可以帮助 EMQX 轻松集成其他数据库、数据仓库、数据流处理系统等,构建完整的物联网数据流程,实现数据的敏捷分析应用。

  • 高吞吐量场景下的处理能力:支持同步和异步两种写入模式,您可以区分实时优先和性能优先的数据写入策略,并根据不同场景实现延迟和吞吐量之间的灵活平衡。

  • 有效的主题映射:通过桥接配置,可以将众多 IoT 业务主题映射到 Kakfa 主题。EMQX 支持 MQTT 用户属性映射到 Kafka Header,并采用多种灵活的主题映射方式,包括一对一、一对多、多对多,还支持 MQTT 主题过滤器(通配符)。

# 配置 Confluent Cloud

在创建 Confluent 数据桥接之前,您必须在 Confluent Cloud 控制台中创建 Confluent 集群,并使用 Confluent Cloud CLI 创建主题和 API 密钥。

# 创建集群

  1. 登录到 Confluent Cloud 控制台并创建一个集群。选择 Standard 集群作为示例,然后点击 Begin configuration

    EMQX Confluent Create Cluster
  2. 选择 Region/zones。确保部署区域与 Confluent Cloud 的区域匹配。然后点击 Continue

    EMQX Confluent Select Cluster Region
  3. 输入您的集群名称,然后点击 Launch cluster

    image-20231013105736218

# 使用 Confluent Cloud CLI 创建主题和 API 密钥

现在,您已在 Confluent Cloud 中运行集群,并且可以在 Cluster Overview -> Cluster Settings 页面中获取 Bootstrap server URL。

image-20231013111959327

您可以使用 Confluent Cloud CLI 来管理集群。以下是使用 Confluent Cloud CLI 的基本命令。

# 安装 Confluent Cloud CLI

curl -sL --http1.1 https://cnfl.io/cli | sh -s -- -b /usr/local/bin
Copied!
1

如果您已经安装了,您可以使用以下命令来进行更新:

confluent update
Copied!
1

# 登录到您的帐户

confluent login --save
Copied!
1

# 选择环境

# list env
confluent environment list
# use env
confluent environment use <environment_id>
Copied!
1
2
3
4

# 选择集群

# list kafka cluster
confluent kafka cluster list
# use kafka
confluent kafka cluster use <kafka_cluster_id>
Copied!
1
2
3
4

# 使用 API 密钥和 Secret

如果您想使用现有的 API 密钥,请使用以下命令将其添加到 CLI:

confluent api-key store --resource <kafka_cluster_id>
Key: <API_KEY>
Secret: <API_SECRET>
Copied!
1
2
3

如果您没有 API 密钥和 Secret,可以使用以下命令创建:

$ confluent api-key create --resource <kafka_cluster_id>

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+------------+------------------------------------------------------------------+
| API Key    | YZ6R7YO6Q2WK35X7                                                 |
| API Secret | ****************************************                         |
+------------+------------------------------------------------------------------+
Copied!
1
2
3
4
5
6
7
8

将它们添加到 CLI 后,您可以通过执行以下命令使用 API 密钥和 Secret:

confluent api-key use <API_Key> --resource <kafka_cluster_id>

Copied!
1
2

# 创建主题

您可以使用以下命令创建一个主题:

confluent kafka topic create <topic_name>
Copied!
1

您可以使用以下命令检查主题列表:

confluent kafka topic list
Copied!
1

# 向主题生成消息

您可以使用以下命令创建生产者。启动生产者后,输入一条消息并按 Enter 键。消息将被生成到相应的主题中。

confluent kafka topic produce <topic_name>
Copied!
1

# 从主题消费消息

您可以使用以下命令创建消费者。它将输出相应主题中的所有消息。

confluent kafka topic consume -b <topic_name>
Copied!
1

# 创建 Confluent 数据桥接

本节介绍了在 EMQX Dashboard 中如何通过创建规则并添加规则动作以将数据转发到 Confluent 来创建一个 Confluent 数据桥接。

  1. 在 Dashboard 左侧导航菜单中点击规则引擎 -> 规则

  2. 在规则创建页面上,点击创建。在 SQL 文本框中输入以下 SQL:

    SELECT
        *
    FROM
        "t/#"
    
    Copied!
    1
    2
    3
    4
  3. 点击响应动作区域的添加动作按钮。在新增动作对话框中,从动作类型下拉框中选择数据转发 -> 数据桥接到 Confluent

    confluent_action_type

  4. 点击使用资源下拉框边上的新建,为动作绑定一个资源。

  5. 创建资源对话中,将 Confluent 集群设置页面的 Endpoints 中的信息填入Bootstrap 服务器 文本框。将您之前用 Confluent Cloud CLI 创建的 API 密钥和 Secret 填入 KeySecret 文本框。其余选项均使用默认值。

  6. 点击测试按钮以确保可以成功创建连接,然后点击确定按钮。

    confluent_create_resource

  7. 返回到添加动作对话框,配置规则动作的选项:

    • Kafka 主题:输入您在配置 Confluent 时创建的主题,例如 t.1

    • Kafka Headers:(可选)此字段用于直接将规则输出的变量作为 Kafka Headers 发送。例如,如果您输入${pub_props},则动作将经规则处理的 MQTT 消息的所有 PUBLISH 属性作为 Kafka Header 发送。

    • 更多 Kafka Header:(可选)此字段允许您以键值对形式添加一个或多个 Kafka Header。键和值可以使用${var}的格式作为占位符。例如,您可以添加一个键为clientid,值为${clientid}的键值对。当具有客户端 ID foo 的 MQTT 客户端触发动作时,它将发送一个 Kafka Header clientid: foo

    • Kafka Headers 值的编码类型:此字段允许您选择指定在 Kafka Header更多 Kafka Header 字段中的键的编码模式。当指定的键不是字符串格式,或者指定的值不是二进制或字符串格式时,值将在发送之前进行编码。

      ::: 提示

      根据 规范 (opens new window) ,Kafka Header 的键必须是字符串格式(utf8),其值必须是二进制或字符串格式。

      :::

      该字段有两个可选值:NONEJSON

      • NONE:此选项仅发送具有字符串键和二进制或字符串值的 Header。它丢弃所有具有其他键和值格式的 Header。
      • JSON:使用此选项,系统将尝试将 value 编码为 JSON 字符串,然后发送任何 Header。如果编码成功,则发送 Header;否则,将被丢弃。此外,key 必须是字符串格式;否则,它也将被丢弃。
    • 将其他选项保留为默认设置。

  8. 点击确定按钮。

    confluent_add_action

  9. 回到规则创建页面,您可以看到一个处理命中规则消息的动作已被添加到响应动作区域。点击确定按钮,可以看到规则列表中显示了您创建的规则。

    confluent_rule_list

# 测试数据桥接

  1. 通过发送一条 MQTT 消息到 EMQX,您可以测试 Confluent 数据桥接是否正常运行:
Topic: "t/1"

QoS: 0

Retained: false

Payload: "hello"
Copied!
1
2
3
4
5
6
7
  1. 您可以使用 Confluent 命令从主题 t.1 中消费消息。
confluent kafka topic consume -b t.1
Copied!
1
  1. 点击规则监控列中的图标,规则统计中的命中次数应增加 1。