# 将 MQTT 数据写入到 Lindorm

[阿里云 Lindorm](https://cn.aliyun.com/product/apsaradb/lindorm?from_alibabacloud=) 是一款云原生多模数据库，具备高吞吐、高压缩、高可扩展能力，支持时序（TSDB）、宽表、向量等数据模型，广泛应用于物联网遥测、工业监控、车联网等场景。

EMQX 虽未提供专用的 Lindorm Sink，但 Lindorm 提供了兼容 MySQL 协议的访问方式，用户可直接使用数据集成中的 MySQL Sink 组件将设备数据写入 Lindorm。本页将介绍如何通过 EMQX 与 Lindorm 的数据集成实现 MQTT 数据的提取、转换和写入，构建稳定、高效的物联网数据通道。

## 工作原理

Lindorm 的后端支持多种数据引擎，其中 TSDB 节点为时序数据设计，具备高压缩、高并发、高查询效率的特点。EMQX 作为设备接入平台，结合其规则引擎和数据集成功能，无需复杂编码便可将 MQTT 消息高效写入 Lindorm 后端（通常为 TSDB 节点），实现对设备遥测数据的结构化采集、处理与存储。

![lindorm_architecture](./assets/lindorm_architecture.png)

其具体的工作流程如下：

- **设备连接到 EMQX**：物联网设备通过 MQTT 协议与 EMQX 建立连接。
- **设备消息发布和接收**：设备通过特定的主题发布遥测和状态数据，EMQX 接收到消息后将在规则引擎中进行比对。
- **规则引擎处理消息**：通过内置的规则引擎，可以根据主题匹配处理特定来源的消息。规则引擎会匹配对应的规则，并对消息进行处理，例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
- **写入到 Lindorm**：规则触发后，EMQX 使用 MySQL Sink 调用 Lindorm 提供的兼容接口。
- **Lindorm 后端存储与优化**：根据表结构和字段定义，Lindorm 自动将数据组织为时间序列或宽表结构，进行压缩、索引和聚合优化。
- **外部应用读取分析**：业务系统或可视化工具（如 QuickBI、DataV）可基于 SQL 查询进行设备状态感知、指标监控与趋势分析。

## 特性与优势

在 EMQX 中使用 Lindorm 数据集成能够为您的业务带来以下特性与优势：

- **高并发写入能力**：Lindorm TSDB 节点专为高并发写入场景设计，支持海量设备的遥测数据采集，适用于工业监测、城市感知等场景。
- **消息转换**：消息可以写入 Lindorm 之前，通过 EMQX 规则中进行丰富的处理和转换，方便后续的存储和使用。
- **灵活字段映射与规则处理**：EMQX 规则引擎可动态提取、转换消息字段；支持自定义 SQL 模板，精细控制数据入库结构。
- **高效压缩与持久化存储**：Lindorm 针对时序和结构化数据提供存储优化能力，可在高频写入场景下有效降低存储占用。
   具备自动压缩与持久化能力，适用于长期数据保留、日志归档、指标记录等物联网场景。
- **运行时指标**：支持查看每个 Sink 的运行时指标，例如消息总数、成功/失败计数、当前速率等。

通过丰富的消息转换结合灵活的字段映射机制与 Lindorm 提供的存储和查询能力，可用于构建稳定、可扩展的物联网数据通道，满足多种业务采集和分析需求。

## 准备工作

本节介绍了在 EMQX 中创建 Lindorm 数据集成之前需要做的准备工作，包括创建和连接 Lindorm 实例和创建数据表。

### 前置准备

- 了解[规则](./rules.md)。
- 了解[数据集成](./data-bridges.md)。

### 创建和连接 Lindorm 实例

在开始集成前，请确保已在阿里云控制台完成 Lindorm 实例的创建，并配置网络访问权限。具体步骤如下：

1. 登录阿里云控制台并[创建 Lindorm 实例](https://help.aliyun.com/zh/lindorm/getting-started/create-an-instance)。

2. [设置访问白名单](https://help.aliyun.com/zh/lindorm/getting-started/configure-a-whitelist)，允许 EMQX 所在主机 IP 访问。

3. 根据 EMQX 的实际部署位置选择对应的 Lindorm 连接方式：

   - 如果 EMQX 部署在阿里云的 ECS 主机或 VPC 网络中，建议使用 Lindorm 的内网访问地址（VPC 地址），以获得更稳定和低延迟的通信性能。

   - 如果 EMQX 部署在本地机房、其他云服务商，或不在阿里云网络内，则需要：

     - 在 Lindorm 控制台开启公网访问功能。
     - 使用公网 SQL 接入地址（端口通常为 `33060`）。
     - 将 EMQX 所在服务器的公网 IP 添加至 Lindorm 实例的白名单中。

     具体操作可参考[官方接入文档](https://help.aliyun.com/zh/lindorm/getting-started/connect-to-an-instance)中 “[通过 JDBC Driver 连接并使用时序引擎](https://help.aliyun.com/zh/lindorm/user-guide/use-the-jdbc-driver-for-lindorm-to-connect-to-and-use-lindormtsdb?spm=a2c4g.11186623.0.0.73395a0fPp3qp7#task-2079050)”。

### 创建数据库与数据表

```sql
CREATE DATABASE emqx_data;

CREATE TABLE demo_sensor (
  device_id VARCHAR(255) COMMENT 'TAG',
  time BIGINT,
  msg VARCHAR(255),
  PRIMARY KEY (device_id, time)
);
```

该表结构示例适用于时序数据，使用 `device_id` 作为标签字段，`time` 作为时间戳，`msg` 字段用于存储业务数据。

## 创建连接器

在创建 Lindorm Sink（基于 MySQL 协议）之前，您需要创建一个 MySQL 类型的连接器，以便 EMQX 与 Lindorm 实例建立连接。以下步骤将指导您在 Dashboard 中完成连接器配置。

1. 转到 Dashboard **集成** -> **连接器** 页面。点击页面右上角的**创建**。

2. 在连接器类型中选择 **MySQL**，点击**下一步**。

3. 在**配置**步骤，配置以下信息：

   - **连接器名称**：应为大写和小写字母及数字的组合，例如：`my_lindorm`。

   - **服务器地址**：

     - 如果 EMQX 部署在阿里云 VPC 网络内（例如 ECS 实例），请填写 Lindorm 实例的内网 SQL 地址，格式通常为：填写 Lindorm 提供的内网域名地址，如：`ld-xxxx-proxy-sql-lindorm.lindorm.rds.aliyuncs.com:33060`。
     - 如果 EMQX 部署在本地机房或其他非阿里云环境，请确保已在 Lindorm 控制台启用公网访问功能，并填写分配的公网 SQL 地址，格式通常为：`ld-xxxx-proxy-sql-public.lindorm.rds.aliyuncs.com:33060`。

     请确保 EMQX 所在主机的 IP 地址已加入 Lindorm 的访问白名单。

   - **数据库名字**：填写 `emqx_data`。

   - **用户名**：填写 `root`。

   - **密码**：填写 `public`。

4. 根据需要配置高级设置选项（可选），详细请参考[高级设置](#高级设置)。

5. 点击**创建**按钮完成连接器创建。

6. 在弹出的**创建成功**对话框中您可以点击**创建规则**，继续创建规则以指定需要写入 Lindorm 的数据。

## 创建 Lindorm Sink 规则

本节演示了如何在 Dashboard 中创建一条规则，用于处理来自源 MQTT 主题 `#` 的消息，并通过配置的 Sink 将处理后的结果写入到 Lindorm 的数据表 `demo_sensor` 中。

1. 转到 Dashboard **集成** -> **规则**页面。

2. 点击页面右上角的**创建**。

3. 输入规则 ID `my_rule`，在 SQL 编辑器中输入规则，此处选择将 `#` 主题的 MQTT 消息存储至 Lindorm，请确保规则选择出来的字段（SELECT 部分）包含所有 SQL 模板中用到的变量，此处规则 SQL 如下：

   ```sql
   SELECT
     clientid AS device_id,
     timestamp AS time,
     payload.msg AS msg
   FROM
     "#"
   ```

   ::: tip

   如果您初次使用 SQL，可以点击 **SQL 示例**和**启用调试**来学习和测试规则 SQL 的结果。

   :::

4. 点击右侧的**添加动作**按钮，为规则在被触发的情况下指定一个动作。通过这个动作，EMQX 会将经规则处理的数据发送到 Lindorm。

5. 在**动作类型**下拉框中选择 MySQL，保持**动作**下拉框为默认的“创建动作”选项，您也可以选择一个之前已经创建好的 MySQL Sink。此处我们创建一个全新的 Sink 并添加到规则中。

6. 输入 Sink 名称，要求是大小写英文字母或数字组合。

7. 从**连接器**下拉框中选择刚刚创建的 `my_lindorm`。您也可以通过点击下拉框旁边的按钮创建一个新的连接器。有关配置参数，请参见[创建连接器](#创建连接器)。

8. 配置 **SQL 模板**，使用如下 SQL 完成数据插入，此处为[预处理 SQL](./data-bridges.md#sql-预处理)，字段不应当包含引号，SQL 末尾不要带分号 `;`:

   ```sql
   INSERT INTO demo_sensor(device_id, time, msg) VALUES (
     ${device_id},
     ${time},
     ${msg}
   )
   ```
   
   如果在模板中使用未定义的占位符变量，您可以切换**未定义变量作为 NULL** 开关（位于 **SQL 模板** 上方）来定义规则引擎的行为：
   
   - **关闭**（默认）：规则引擎可以将字符串 `undefined` 插入数据库。
   
   - **启用**：允许规则引擎在变量未定义时将 `NULL` 插入数据库。
   
     ::: tip
   
     如果可能，应该始终启用此选项；禁用该选项仅用于确保向后兼容性。
   
     :::
   
9. **备选动作（可选）**：如果您希望在消息投递失败时提升系统的可靠性，可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时，这些备选动作将被触发。更多信息请参见：[备选动作](./data-bridges.md#备选动作)。

10. 展开**高级设置**，根据需要配置高级设置选项（可选），详细请参考[高级设置](#高级设置)。

11. 点击**创建**按钮完成 Sink 创建，新建的 Sink 将被添加到规则动作中。

12. 回到创建规则页面，对配置的信息进行确认，点击**创建**。一条规则应该出现在规则列表中。

至此您已经完成整个创建过程，可以前往 **集成** -> **Flow 设计器** 页面查看拓扑图，此时应当看到 `#` 主题的消息经过名为 `my_rule` 的规则处理，处理结果交由 MySQL 进行存储。

## 测试规则

使用 MQTTX 向 `sensor/1` 主题发布消息：

```bash
mqttx pub -i emqx_test -t sensor/1 -m '{ "msg": "hello lindorm" }'
```

查看 Sink 运行统计，命中、发送成功次数均 +1。

通过 API 查询数据是否成功写入：

```bash
curl -X POST http://${LINDORM_SERVER}:8242/api/v2/sql?database=emqx_data \
  -H "Content-Type: text/plain" \
  -d 'SELECT * FROM demo_sensor'
```

## 高级设置

本节将深入介绍可用于 MySQL Sink 的高级配置选项。在 Dashboard 中配置 Sink 时，您可以根据您的特定需求展开**高级设置**，调整以下参数。

| 字段名称             | 描述                                                         | 默认值   |
| -------------------- | ------------------------------------------------------------ | -------- |
| **连接池大小**       | 指定在与 MySQL 服务进行接口时，可以维护在连接池中的并发连接数。此选项有助于通过限制或增加 EMQX 与 MySQL 之间的活动连接数量来管理应用程序的可扩展性和性能。<br /> 注意：设置适当的连接池大小取决于诸多因素，如系统资源、网络延迟以及应用程序的具体工作负载等。过大的连接池大小可能导致资源耗尽，而过小的大小可能会限制吞吐量。 | `8`      |
| **启动超时时间**     | 确定连接器在回应资源创建请求之前等待自动启动的资源达到健康状态的最长时间间隔（以秒为单位）。此设置有助于确保连接器在验证连接的资源（例如 Lindorm 数据库实例）完全运行并准备好处理数据事务之前不会执行操作。 | `5` 秒   |
| **缓存池大小**       | 指定缓冲区工作进程数量，这些工作进程将被分配用于管理 EMQX 与 MySQL 的出口 （egress）类型 Sink 中的数据流，它们负责在将数据发送到目标服务之前临时存储和处理数据。此设置对于优化性能并确保出口（egress）场景中的数据传输顺利进行尤为重要。对于仅处理入口 （ingress）数据流的桥接，此选项可设置为 `0`，因为不适用。 | `16`     |
| **请求超期**         | “请求 TTL”（生存时间）设置指定了请求在进入缓冲区后被视为有效的最长持续时间（以秒为单位）。此计时器从请求进入缓冲区时开始计时。如果请求在缓冲区内停留的时间超过了此 TTL 设置或者如果请求已发送但未能在 MySQL 中及时收到响应或确认，则将视为请求已过期。 | `45` 秒  |
| **健康检查间隔**     | 指定 Sink 对与 Lindorm 的连接执行自动健康检查的时间间隔（以秒为单位）。 | `15` 秒  |
| **缓存队列最大长度** | 指定可以由 MySQL Sink 中的每个缓冲器工作进程缓冲的最大字节数。缓冲器工作进程在将数据发送到 Lindorm 之前会临时存储数据，充当处理数据流的中介以更高效地处理数据流。根据系统性能和数据传输要求调整该值。 | `256` MB |
| **最大批量请求大小** | 指定可以在单个传输操作中从 EMQX 发送到 Lindorm 的数据批处理的最大大小。通过调整此大小，您可以微调 EMQX 与 Lindorm 之间数据传输的效率和性能。<br />如果将“最大批处理大小”设置为“1”，则数据记录将单独发送，而不会分组成批处理。 | `1`      |
| **请求模式**         | 允许您选择`同步`或`异步`请求模式，以根据不同要求优化消息传输。在异步模式下，写入到 Lindorm 不会阻塞 MQTT 消息发布过程。但是，这可能导致客户在它们到达 Lindorm 之前就收到了消息。 | `异步`   |
| **请求飞行队列窗口** | “飞行队列请求”是指已启动但尚未收到响应或确认的请求。此设置控制 Sink 与 Lindorm 通信时可以同时存在的最大飞行队列请求数。<br/>当 **请求模式** 设置为 `异步` 时，“请求飞行队列窗口”参数变得特别重要。如果对于来自同一 MQTT 客户端的消息严格按顺序处理很重要，则应将此值设置为 `1`。 | `100`    |

