# 将 MQTT 数据写入到 Apache Doris

::: tip 注意

Apache Doris 数据集成功能仅适用于 EMQX 5.91 版本以及之后的专有版部署。

:::

[Apache Doris](https://doris.apache.org/) 是一款现代化的大规模并行处理（MPP）分析型数据库系统，具有高并发、高性能、易于使用等特点。它特别适用于实时分析和数据仓库等场景。您可以将 MQTT 数据集成至 Apache Doris，实现高效存储、实时分析和强大的数据可视化能力。

本页详细介绍了 EMQX Cloud 与 Apache Doris 的数据集成并提供了实用的规则创建指导。

::: tip 提示

EMQX Cloud 中的 Apahe Doris 数据集成支持 Apache Doris 2.1.7 及之后版本。

:::

## 工作原理

Apache Doris 数据集成是 EMQX Cloud 中开箱即用的功能，通过简单的配置即可实现复杂的业务开发。在一个典型的物联网应用中，EMQX Cloud 作为物联网平台，负责接入设备，进行消息传输，Apache Doris 作为数据存储平台，负责设备状态与元数据的存储，以及消息数据存储和数据分析等。

<img src="./_assets/doris-integration.png" alt="doris-integration" style="zoom:67%;" />

EMQX Cloud 通过规则引擎与 Sink 将设备事件和数据转发至 Apache Doris，应用读取 Apache Doris 中数据即可感知设备状态，获取设备上下线记录，以及分析设备数据。其具体的工作流程如下：

- **设备连接到 EMQX Cloud**：物联网设备通过 MQTT 协议连接成功后将触发上线事件，事件包含设备 ID、来源 IP 地址以及其他属性等信息。
- **设备消息发布和接收**：设备通过特定的主题发布遥测和状态数据，EMQX Cloud 接收到消息后将在规则引擎中进行比对。
- **规则引擎处理消息**：通过内置的规则引擎，可以根据主题匹配处理特定来源的消息和事件。规则引擎会匹配对应的规则，并对消息和事件进行处理，例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
- **写入到 Apache Doris**：规则触发将消息写入到 Apache Doris 的操作。借助 SQL 模板，用户可以从规则处理结果中提取数据构造 SQL 发送给 Apache Doris 执行，实现将消息特定字段写入或更新到数据库对应表和列中。

事件和消息数据写入到 Apache Doris 后，您可以连接到 Apache Doris 读取数据，进行灵活的应用开发，例如：

- 连接到可视化工具，例如 Grafana，根据数据生成图表，展示数据变化。
- 连接到设备管理系统，查看设备列表与状态，并检测设备异常行为，及时排除潜在的问题。

## 特性与优势

在 EMQX Cloud 中使用 Apache Doris 数据集成能够为您的业务带来以下特性与优势：

- **灵活的事件处理**：通过 EMQX Cloud 规则引擎，Apache Doris 可以处理设备全生命周期事件，极大的方便开发实现物联网应用所需的各类管理与监控业务。您可以通过分析事件数据，及时发现设备故障、异常行为或趋势变化，以便采取适当的措施。
- **消息转换**：消息可以写入 Apache Doris 之前，通过 EMQX Cloud 规则中进行丰富的处理和转换，方便后续的存储和使用。
- **实时数据写入**：Apache Doris 支持通过 HTTP 和 JDBC 接口进行实时数据写入。结合 EMQX Cloud 使用时，MQTT 数据可以低延迟地直接写入 Doris 表，非常适合对实时查询和分析有高要求的场景。
- **流式同步**：Apache Doris 还支持从 Flink、Kafka 以及各类事务型数据库等数据源接入实时数据流。这使得用户可以构建统一的数据处理流水线，将 EMQX Cloud 中的 MQTT 数据与其他流式数据源结合，实现全面的实时分析能力。
- **标准 SQL 与生态兼容性**：Doris 完全兼容 MySQL 语法并支持标准 SQL，使用户无需学习新语言即可执行强大的分析查询。同时，它能够轻松集成各类商业智能（BI）工具和客户端应用，用于构建仪表盘、生成报表以及实现自动化工作流程。
- **运行时指标**：支持查看每个 Sink 的运行时指标，例如消息总数、成功/失败计数、当前速率等。

通过灵活的事件处理、丰富的消息转换、灵活的数据操作以及实时监控与分析能力，您可以构建高效、可靠和可扩展的物联网应用，并在业务决策和优化方面受益。

## 准备工作

本节介绍了在 EMQX Cloud 中创建 Apache Doris 数据集成之前需要做的准备工作，包括安装 Apache Doris 和创建数据表。

### 前置准备

- 了解[规则](./rules.md)。
- 了解[数据集成](./introduction.md)。

### 网络设置

<!--@include: ./network-setting.md-->

### 安装 Apache Doris 服务器

请按照[官方指南](https://doris.apache.org/zh-CN/docs/dev/gettingStarted/quick-start#use-docker-for-quick-deployment)使用 Docker Compose 在本地运行 Doris。

### 创建数据表

您可以使用 MySQL 客户端连接到 Doris 的 Frontend 并执行命令。参见[官方文档](https://doris.apache.org/zh-CN/docs/dev/gettingStarted/quick-start#%E8%BF%90%E8%A1%8C%E6%9F%A5%E8%AF%A2)。

例如：

```
mysql -uroot -P9030 -h127.0.0.1
```

您需要在 Apache Doris 中创建数据库和两张数据表：

- 数据表 `emqx_messages` 存储每条消息的发布者客户端 ID、主题、Payload 以及发布时间。
- 数据表 `emqx_client_events` 存储上下线的客户端 ID、事件类型以及事件发生时间。

  ```sql
  create database mqtt;
  use mqtt;
  
  create table if not exists
    emqx_messages(
      clientid varchar,
      topic string,
      payload string,
      created_at datetime
    )
    properties (replication_num = 1);
  
  create table if not exists
    emqx_client_events(
      clientid varchar,
      event varchar,
      created_at datetime)
    properties (replication_num = 1);

## 创建连接器

您需要创建一个 Apache Doris 连接器，以便 EMQX Cloud 与 Apache Doris 服务建立连接。

1. 在部署菜单中选择**数据集成**，在**数据持久化**分类下选择 **Doris**。如果您已经创建了其他的连接器，点击**新建连接器**，然后在**数据持久化**分类下选择 **Doris**。
2. 在**新建连接器** 页面中配置以下信息：
   - **连接器名称**：系统将自动生成一个连接器的名称。
   - **服务器地址**：填写 `127.0.0.1:3306`。
   - **数据库名字**：填写 `mqtt`。
   - **用户名**：填写 `root`。
   - **密码**：填写 `public`。
3. 根据需要配置高级设置选项（可选）。
4. 点击**测试连接**按钮，如果 Doris 服务能够正常访问，则会返回成功提示。
5. 点击**新建**按钮完成连接器创建。

## 创建消息存储 Sink 规则

本节演示了如何创建一条规则，用于处理来自源 MQTT 主题 `t/#` 的消息，并通过配置的动作将处理后的结果写入到 Apache Doris 的数据表 `emqx_messages` 中。

1. 点击连接器列表**操作**列下的新建规则图标或在**规则列表**中点击**新建规则**进入**新建规则**步骤页。

2. 在 **SQL 编辑器**中输入规则，此处选择将 `t/#` 主题的 MQTT 消息存储至 Apache Doris，请确保规则选择出来的字段（SELECT 部分）包含所有 SQL 模板中用到的变量，此处规则 SQL 如下：

   ```sql
   SELECT 
     *
   FROM
     "t/#"
   ```

   ::: tip

   如果您初次使用 SQL，可以点击 **SQL 示例** 和**启用调试**来学习和测试规则 SQL 的结果。

   :::

3. 点击**下一步**开始创建动作。

4. 从**使用连接器**下拉框中选择您之前创建的连接器。

5. 配置 **SQL 模板**，使用如下 SQL 完成数据插入，此处为[预处理 SQL](https://docs.emqx.com/zh/emqx/latest/data-integration/data-bridges.html#sql-%E9%A2%84%E5%A4%84%E7%90%86)，字段不应当包含引号，SQL 末尾不要带分号 （`;`）:

   ```sql
   INSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES(
     ${clientid},
     ${topic},
     ${payload},
     FROM_UNIXTIME(${timestamp}/1000)
   )
   ```

   如果在模板中使用未定义的占位符变量，您可以切换**未定义变量作为 NULL** 开关（位于 **SQL 模板** 上方）来定义规则引擎的行为：

   - **关闭**（默认）：规则引擎可以将字符串 `undefined` 插入数据库。

   - **启用**：允许规则引擎在变量未定义时将 `NULL` 插入数据库。

     ::: tip

     如果可能，应该始终启用此选项；禁用该选项仅用于确保向后兼容性。

     :::

6. **备选动作（可选）**：如果您希望在消息投递失败时提升系统的可靠性，可以为动作配置一个或多个备选动作。当动作无法成功处理消息时，将触发这些备选动作。更多信息请参见：[备选动作](./actions.md#备选动作)。

7. 展开**高级设置**，根据需要配置高级设置选项（可选），详细请参考[高级设置](#高级设置)。

8. 点击**确认**按钮完成动作的配置。

9. 在弹出的**成功创建规则**提示框中点击**返回规则列表**，从而完成整个数据集成的配置。

## 创建事件记录 Sink 规则

本节展示如何创建用于记录客户端上/下线状态的规则，并通过配置的 Sink 将记录写入 Apache Doris 的数据表 `emqx_client_events` 中。除 SQL 模板与规则外，其他操作步骤与[创建消息存储 Sink 规则](#创建消息存储-sink-规则)章节完全相同。

您可以使用以下规则 SQL 创建规则：

```sql
SELECT
  *
FROM 
  "$events/client/connected", "$events/client/disconnected"
```

您可以使用以下 SQL 模板创建实现设备上下线记录的 Sink，请注意字段不应当包含引号，SQL 末尾不要带分号 `;`:

```sql
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  ${clientid},
  ${event},
  FROM_UNIXTIME(${timestamp}/1000)
)
```

## 测试规则

使用 MQTTX 向 `t/1` 主题发布消息，此操作同时会触发上下线事件：

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Apache Doris" }'
```

分别查看两个 Sink 运行统计，命中数和发送成功次数应各增加 1 次，上下线记录 Sink 命中和成功次数应增加 2 次。

查看数据是否已经写入表中，`emqx_messages` 表：

```bash
mysql> select * from emqx_messages;
+----+----------+-------+--------------------------+---------------------+
| id | clientid | topic | payload                  | created_at          |
+----+----------+-------+--------------------------+---------------------+
|  1 | emqx_c   | t/1   | { "msg": "hello Apache Doris" } | 2022-12-09 08:44:07 |
+----+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)
```

`emqx_client_events` 表：

```bash
mysql> select * from emqx_client_events;
+----+----------+---------------------+---------------------+
| id | clientid | event               | created_at          |
+----+----------+---------------------+---------------------+
|  1 | emqx_c   | client.connected    | 2022-12-09 08:44:07 |
|  2 | emqx_c   | client.disconnected | 2022-12-09 08:44:07 |
+----+----------+---------------------+---------------------+
2 rows in set (0.00 sec)
```

