# 与其他 MQTT 服务桥接

MQTT 服务数据集成提供了一种连接到另一个 EMQX 集群或其它 MQTT 服务进行消息桥接的功能，能够用于实现跨网络、跨服务的数据交互和通信。本页提供了 EMQX Cloud 中 MQTT 消息桥接的工作原理介绍，以及如何创建和验证消息桥接的实际指导。

## 工作原理

在桥接时，EMQX Cloud 以客户端的身份与目标服务建立 MQTT 连接，通过发布订阅模式实现消息的传输：

- 消息流出：将当前部署的主题的消息发布到远程 MQTT 服务指定的主题。
- 消息流入：订阅远程 MQTT 服务的主题，并将其消息转发到当前部署。


EMQX Cloud 支持在同一个连接上配置多个桥接规则，每个规则都可以配置不同的主题映射和消息转换规则，实现类似消息路由的功能。在桥接过程中，您还可以通过规则引擎对消息进行处理，实现消息转发前的过滤、丰富和转换等操作。

下图展示了 EMQX Cloud 和 其他 MQTT 服务之间的数据集成的典型架构：

![EMQX Cloud-MQTT 集成](./_assets/data_integration_mqtt.jpg)

## 特性与优势

EMQX Cloud 的 MQTT 消息桥接具有以下特性和优势：

- **广泛的兼容性**：使用标准的 MQTT 协议，可以桥接到各类物联网平台，包括 AWS IoT Core、Azure IoT Hubs 等，同时也支持开源或其他行业 MQTT Broker 和物联网平台。这使得它可以与各种设备和平台进行无缝集成和通信。
- **灵活的主题映射**：基于 MQTT 发布订阅模式，实现了灵活的主题映射。桥接过程中支持为主题添加前缀，可以利用客户端的上下文信息（如客户端 ID、用户名等）动态构造主题。这种灵活性使得可以根据具体需求对消息进行定制化处理和路由。
- **高性能**：提供了性能优化选项，如连接池和共享订阅，以降低单个桥接客户端的负载压力，实现更低的桥接延迟和更高的桥接消息吞吐量。通过这些优化措施，可以提升整体系统的性能和可扩展性。
- **消息 payload 转换**：允许通过定义 SQL 规则对消息 payload 进行处理，在消息传输过程中，可以对 payload 进行数据提取、过滤、丰富和转换等操作。例如，可以从 payload 中提取实时指标，并在消息传递到其他 MQTT 服务之前进行数据转换和处理。
- **指标监控**：提供了详细的运行指标的监控，可以查看消息总数、成功/失败计数、当前速率等指标，帮助用户实时监控和评估 MQTT 消息桥接 的性能和健康状况。

## 准备 MQTT 服务连接信息

### 前置准备

- 了解[数据集成](./introduction.md)。
- 了解[规则](./rules.md)。

在创建 MQTT 消息桥接之前，您需要获取远程 MQTT 服务的连接信息，包括：

- **MQTT 服务地址**：目标 MQTT 服务的地址和端口，例如  `broker.emqx.io:1883`。
- **用户名**：进行连接所需的用户名，如果目标服务不需要认证，此项可留空。
- **密码**：进行连接所需的密码，如果目标服务不需要认证，此项也可留空。
- **协议类型**：需要明确目标服务是否启用了 TLS，以及是否使用的是 MQTT over TCP/TLS 协议。值得注意的是，EMQX Cloud MQTT 桥接暂时不支持 MQTT over WebSocket 和 MQTT over QUIC 类型的协议。
- **协议版本**：目标 MQTT 服务使用的协议版本，EMQX Cloud 支持 MQTT 3.1, 3.1.1 和 MQTT 5.0。

对于 EMQX Cloud 或者其他标准的 MQTT 服务器，桥接都能提供良好的兼容和支持。如果您需要连接的是其他类型的 MQTT 服务，你可以参考其相关文档来获取这些连接信息。一般来说，大多数的物联网平台都会提供标准的 MQTT 接入方式，你可以依据他们的指引将设备信息转化为以上所述的 MQTT 连接信息。

::: tip
EMQX Cloud 运行在集群模式下或启用连接池时，多个节点使用相同的客户端 ID 连接到同一 MQTT 服务通常会导致设备互踢，因此 EMQX Cloud MQTT 桥接暂不支持设置固定的客户端 ID。
:::

### 网络设置

<!--@include: ./network-setting.md-->

## 创建连接器（目标数据和源数据）

本节将以 EMQX 的[在线 MQTT 服务器](https://www.emqx.com/zh/mqtt/public-mqtt5-broker)作为远程 MQTT 服务器，指导您如何配置与远程 MQTT 服务器的连接。

在创建数据集成的规则之前，您需要先创建一个 MQTT 服务 连接器用于访问 MQTT 服务。

1. 在部署菜单中选择**数据集成**，在数据转发分类下选择 MQTT 服务。如果您已经创建了其他的连接器，点击**新建连接器**，然后在数据转发服务分类下选择 MQTT 服务。

2. **连接器名称**：系统将自动生成一个连接器的名称。

3. 填写连接相关配置：

   - **MQTT 服务地址**：仅支持 MQTT over TCP/TLS，例如：`broker.emqx.io:1883`。

   - **客户端 ID 前缀**：此处可以留空，实际使用中，指定客户端 ID 前缀可以便于客户端管理，EMQX Cloud 会根据客户端 ID 前缀和连接池大小自动生成客户端 ID。

   - **用户名**和**密码**：如果 MQTT 服务器要求身份认证，请输入与该 Client ID 关联的用户名和密码。如果不需要认证（例如公共 MQTT 服务器），则可以留空。

   - **Keepalive**：指定需要的 keepalive 间隔。

   - **MQTT 协议版本**：选择适合该 MQTT 服务器连接的 MQTT 协议版本。

   - **静态客户端 ID 映射表**：（高级配置）此部分允许为连接器配置静态 Client ID，以确保连接稳定，特别是在连接 Azure IoT Hubs 等服务时。更多配置方法参见[配置静态客户端 ID](https://docs.emqx.com/zh/emqx/latest/data-integration/data-bridge-mqtt.html#配置静态客户端-id)。

     ::: tip

     当配置了映射关系后，只有在映射中指定的 EMQX 节点才会创建对应的 MQTT 客户端连接。

     :::

4. 其他配置保持默认即可，点击**测试连接**按钮，如果 MQTT 服务能够正常访问，则会返回**连接器可用**提示。

5. 点击**新建**按钮完成连接器的创建。

接下来，您可以基于此连接器创建数据桥接规则。

### 连接池与客户端 ID 生成规则

EMQX Cloud 允许多个客户端同时连接到桥接的 MQTT 服务，在创建连接器时您可以设置一个 MQTT 客户端连接池，并配置连接池大小以表明连接池中的客户端连接数。连接池可以充分利用服务器资源，以实现更大的消息吞吐和更好的并发性能，这对于处理高负载、高并发的场景非常重要。

由于 MQTT 协议要求连接到一个 MQTT 服务器的客户端必须具有唯一的客户端 ID，且 EMQX Cloud 可能以集群方式部署，因此 MQTT 桥接的每个客户端都被分配了一个唯一的客户端 ID。EMQX Cloud 根据以下模式自动生成客户端 ID：

```bash
[客户端 ID 前缀]:{连接器名称}{8 位随机字符串}:{连接池中的连接序号}
```

例如当客户端 ID 前缀为 `myprefix`，连接器名称为 `foo`，实际的客户端 ID 可能是：

```bash
myprefix:foo2bd61c44:1
```

### 配置静态客户端 ID

在某些使用场景中，集成时可用的客户端 ID 数量是有限的。在这种情况下，可以在配置连接器时，为集群中的各个节点分配静态的客户端 ID 集合。对于每个客户端 ID，可以指定对应的用户名和密码。这在连接 Azure IoT Hubs 等场景中特别有用，因为每个设备（Client ID）都需要一组唯一的凭证。

配置静态 Client ID 的步骤如下：

1. 在**静态客户端 ID 映射表**部分，点击**添加**按钮添加一个新的节点与静态客户端 ID 的映射条目。根据需要，可以添加多个不同节点的条目。
2. 对于每个条目，填写以下字段：
   - **节点名称**：指定客户端 ID 所分配的节点，例如 `emqx@10.0.0.1`。您需要[提交工单](../feature/tickets.md)来获取当前部署的 EMQX 节点名称。
   - **客户端 ID**：输入静态 Client ID，例如 `device1`。如需为某个节点添加多个客户端 ID，可点击**添加**按钮继续添加。
     - **用户名**：（可选）输入与该客户端 ID 关联的用户名，用于身份认证。
     - **密码**：（可选）输入与该客户端 ID 关联的密码。这是用于设备或客户端认证的凭证，可能是设备专用密钥、密文或证书，具体取决于所连接的平台（例如，在 Azure IoT Hubs 中是认证密钥）。

**配置示例**:

| 节点            | 客户端 ID   | 用户名（可选） | 密码（可选） |
| :-------------- | :---------- | :------------- | :----------- |
| `emqx@10.0.0.1` | `clientid1` | `username1`    | `secret1`    |
|                 | `clientid3` |                |              |
| `emqx@10.0.0.2` | `clientid2` | `username2`    |              |
| `emqx@10.0.0.3` | `clientid4` |                |              |
|                 | `clientid5` |                |              |

您也可以在配置文件中为每个节点单独定义 `static_clientids` 参数。

如果配置了静态客户端 ID，则只有使用这些客户端 ID 的 MQTT 连接会被启动。任何动态客户端 ID 的配置（例如 `pool_size` 或 `clientid_prefix`）将不再生效。

## 创建 MQTT 服务（目标数据） 规则

本节演示了如何创建一条规则来指定需要转发至远程 MQTT 服务的数据。

1. 点击连接器列表**操作**列下的新建规则图标或在**规则列表**中点击**新建规则**进入**新建规则**步骤页。

2. 在 SQL 编辑器中输入规则，客户端将温湿度消息发送到 `temp_hum/emqx` 主题时，就会触发引擎。这里需要对 SQL 进行一定的处理：

   ```sql
    SELECT
      topic,
      payload
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   如果您初次使用 SQL，可以点击 **SQL 示例**和**启用调试**来学习和测试规则 SQL 的结果。

   :::

3. 点击**下一步**开始创建动作。

4. 从**使用连接器**下拉框中选择您之前创建的连接器。

5. 完成消息从 EMQX Cloud 到发布到外部 MQTT 服务的配置：

   - **主题**：发布到外部 MQTT 服务的主题，支持 `${var}` 格式的占位符。此处输入 `pub/${topic}`，表示在原主题的基础上添加 `pub/` 前缀进行转发。例如，当原始消息主题为 `t/1` 时，转发到外部 MQTT 服务的主题为 `pub/t/1`。
   - **QoS**：消息发布 QoS，下拉选择 `0` 、 `1` 、`2` 或 `${qos}`，也可以输入占位符从其他字段中设置 QoS，此处选择 `${qos}` 表示跟随原始消息的 QoS。
   - **Retain**：选择 `true` 、`false` 或 `${falgs.retain}`，确认是否以保留消息方式发布消息，也可以输入占位符从其他字段中设置保留消息标志位。此处选择 `${falgs.retain}` 表示跟随原始消息的保留消息标志位。
   - **消息模板**：用于生成转发消息 Payload 的模板，默认留空表示转发规则输出结果。此处可以输入 `${payload}` 表示仅转发 Payload。

6. 其他高级设置中的配置使用默认值，点击**确认**按钮完成规则的创建

7. 在弹出的**成功创建规则**提示框中点击**返回规则列表**，从而完成了整个数据集成的配置链路。

创建成功后将回到创建规则页面，在**规则列表**中可以查看到新创建的规则。在**动作列表**中，**输入动作列表**可以查看到数据导入的动作列表。

### 测试规则

推荐使用 [MQTTX CLI](https://mqttx.app/zh/cli) 模拟温湿度数据上报，同时您也可以使用其他任意客户端完成。

1. 订阅外部 MQTT 服务 `pub/#` 主题：

   ```bash
   mqttx sub -t pub/# -q 1 -h broker.emqx.io -v
   ```

1. 使用 MQTTX 连接到 EMQX Cloud 部署，并向以下 Topic 发送消息。

   - topic: `temp_hum/emqx`

   - client id: `test_client`

   - payload:

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

1. 您可以在 MQTTX 外部 MQTT 服务订阅到 `pub/temp_hum/emqx` 主题的消息，表示消息已经成功从 EMQX Cloud 转发到外部 MQTT 服务：

   ```bash
   [2024-3-21] [10:43:13] › topic: pub/temp_hum/emqx
   payload:
   { "temp": "27.5", "hum": "41.8"}
   ```

## 创建 MQTT 服务（源数据）规则

本节演示了如何创建一条规则将远程 MQTT 服务的数据转发当前部署。您需要同时创建一个 MQTT 服务（源数据） 和一个消息重发布动作，实现消息从远程 MQTT 服务到 EMQX 的订阅，以及订阅数据的转发。

MQTT 服务（源数据）连接器和 MQTT 服务（目标数据）连接器创建方式相同，请参考创建。

1. 点击连接器列表**操作**列下的新建规则图标，或在**规则列表**中点击**新建规则**进入**新建规则**步骤页。

2. 对于源数据规则，您需要先配置输入的动作。规则编辑页面会自动弹出输入动作配置，或者选择面板右侧的**输入动作** -> **添加输入动作**，选择 **MQTT 服务（源数据）**，点击**下一步**进入配置。
    - 选择 MQTT 服务（源数据）连接器。
    - **主题**：输入需要从远程 MQTT 订阅的主题，如从远端订阅的主题为`temp_hum/emqx`。
    - **QoS**：选择消息 QoS。
    - 点击**确认**按钮完成配置。

3. **SQL 编辑器**中将更新数据源字段：

    ```sql
    SELECT
      *
    FROM
      "$bridges/mqtt:source-d1f51e81"
    ```
4. 点击**下一步**开始创建输出动作。
5. 在新建输出动作中，选择**消息重新发布**。
6. 配置输出动作信息：
   - **主题**：转发的 MQTT 主题，支持 `${var}` 格式的占位符。此处输入 `sub/${topic}`，表示在原主题的基础上添加 `sub/` 前缀进行转发。例如，当原始消息主题为 `t/1` 时，转发的主题为 `sub/t/1`。
   - **QoS**：消息发布 QoS，下拉选择 `0` 、 `1` 、`2` 或 `${qos}`，也可以输入占位符从其他字段中设置 QoS，此处选择 `${qos}` 表示跟随原始消息的 QoS。
   - **Retain**：选择 `true` 、`false` 或 `${falgs.retain}`，确认是否以保留消息方式发布消息，也可以输入占位符从其他字段中设置保留消息标志位。此处选择 `${falgs.retain}` 表示跟随原始消息的保留消息标志位。
   - **消息模板**：用于生成转发消息 Payload 的模板，默认留空表示转发规则输出结果。此处可以输入 `${payload}` 表示仅转发 Payload。
7. 其他配置使用默认值，点击**创建**按钮完成输出动作的创建。

创建成功后将回到创建规则页面，在**规则列表**中可以查看到新创建的规则。消息重新发布动作目前不显示在**输出动作列表**，如需查看，点击规则编辑按钮，在规则设置的下方，可以查看到消息重新发布的输出动作。

### 测试规则
在之前创建的规则中配置了将外部 MQTT 服务中 `temp_hum/emqx` 主题的消息桥接到当前部署 `sub/${topic}` 主题，因此当您向外部 MQTT 服务 `temp_hum/emqx` 主题发布消息时，消息将被转发到部署的 `sub/temp_hum/emqx` 主题中。

以下步骤演示了如何使用 [MQTTX](https://mqttx.app/zh/) 向外部 MQTT 服务中发送消息，并订阅转发的主题获取消息。

1. 使用 MQTTX 订阅当前部署的主题 `sub/#`。

2. 使用 MQTTX 向外部 MQTT 服务的 `temp_hum/emqx` 主题发布消息：

   ```json
   {
      "temp": 55,
      "hum": 32
   }
   ```
3. MQTTX 收到当前部署 `sub/temp_hum/emqx` 主题内消息：

   ```json
   {
      "temp": 55,
      "hum": 32
   }
   ```

## 故障诊断

本节介绍在使用 MQTT 服务数据集成的 MQTT 连接器时，可能遇到的常见问题及其解决方案。

### MQTT 服务连接器界面显示 “Inconsistent for nodes in the cluster”

#### 问题描述

当 EMQX 集群节点中 MQTT 服务连接器状态不一致时，可能会在数据集成连接器界面看到此提示。

#### 可能原因

1. **远程 MQTT 服务器连接限制**
   - 当远程 MQTT 服务器达到连接上限或存在其他限制时，EMQX 集群节点可能无法全部成功连接至该服务器，导致状态不一致。
2. **VPC 对等连接限制**
   - 如果连接是通过 VPC 对等连接建立的，安全组规则可能未完全放通，导致部分 EMQX 节点无法访问远端 MQTT 服务器。

#### 解决方法

- 对于 **服务器连接限制**：

  在数据集成 MQTT 服务连接器配置中，将连接池大小减少为当前值的一半，然后重试。

- 对于 **VPC 限制**：

  检查安全组规则，确保 EMQX 部署所在的 VPC 网段（通常为 `10.x.x.0/24`）可以访问远程 MQTT 服务器。

- 若问题仍未解决，请[提交工单](../feature/tickets.md#创建工单)联系 EMQ 支持团队。

### 更新或保存 MQTT 服务连接器配置时出现 “Axios Error: timeout”

#### 问题描述

在更新或保存使用共享订阅的 MQTT 服务连接器配置时，可能出现 `Axios Error: timeout` 报错。

#### 可能原因

当源连接器包含多个连接池和订阅主题时，EMQX 在执行更新操作（如修改配置或保存）时，需要重新建立所有共享订阅连接。

如果配置较大（例如 16 个连接池、2 个节点，共 32 个共享订阅客户端，每个客户端包含多个订阅主题），远程 MQTT 服务器会在重新建立连接过程中会按顺序完成订阅确认（SUBACK）过程。

由于共享订阅性能存在延迟，初始化过程可能耗时较长，整个过程可能超过 30 秒超时限制，从而导致控制台或 API 请求超时。

#### 解决方法

1. 为不同数据源创建单独的连接器，例如每个连接器仅处理 2–4 个源。
2. 将每个连接器的连接池大小调整为 `2`，以缩短重新初始化时间并提升性能。

### MQTT 动作数据未成功转发且消息出现过期丢弃

#### 问题描述

部分 MQTT 动作（Action）相关数据未成功转发，监控中可看到消息因过期被丢弃。

#### 可能原因

当远程 MQTT 服务器处理性能有限，或连接池大小不足以支撑高吞吐的数据集成 TPS 时，部分待执行的消息会进入缓存队列。根据默认机制，缓存队列中的消息具有 45 秒的请求超期 TTL（Time-To-Live），若在此期间未被处理，则会被丢弃。

#### 解决方法

1. 检查远程 MQTT 服务器 的消息处理性能，确保其能够支撑当前吞吐量（TPS）。
2. 根据业务负载适当增大连接池大小，以提升并发处理能力。
3. 在动作（Action）配置中适当延长请求超期时间，避免消息过早被丢弃。