# 使用流模式将 MQTT 数据导入 Snowflake

::: tip 注意

Snowflake Streaming 数据集成功能仅适用于 EMQX 6.1.2 及之后版本的专有版或弹性专有版部署。

:::

[Snowflake](https://www.snowflake.com/) 是一个用于数据仓库、数据分析和安全数据共享的云数据平台。EMQX Cloud 可以通过 Snowflake Streaming Sink 将 MQTT 消息写入 Snowflake。该 Sink 使用 Snowflake Streaming 连接器调用 Snowpipe Streaming API，从而以低延迟方式将 MQTT 数据导入 Snowflake 表。

本页介绍如何在 EMQX Cloud 中创建 Snowflake Streaming 数据集成。示例将 MQTT 主题 `t/#` 中的消息通过管道 `testdatabase.public.emqxstreaming` 写入 Snowflake 表 `testdatabase.public.emqx`。

## 工作原理

Snowflake Streaming 集成使用 EMQX Broker 规则引擎选择和转换 MQTT 消息，然后通过 Snowflake Streaming Sink 将规则输出发送到 Snowflake。

数据流如下：

```text
MQTT 客户端 -> EMQX Cloud -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表
```

1. MQTT 客户端向 `t/1`、`t/device001` 或 `t/test` 等主题发布消息。
2. 规则匹配来自 `t/#` 的消息，并选择要写入 Snowflake 的字段。
3. Snowflake Streaming Sink 将选中的字段发送到 Snowflake 管道。
4. Snowflake 将流式记录加载到目标表中。
5. 您可以在 Snowflake 中查询该表，验证导入的 MQTT 数据。

## 准备工作

### 前置准备

开始之前，请确保您已了解：

- [数据集成](./introduction.md)
- [规则](./rules.md)
- Snowflake 数据库、Schema、表、管道、用户、角色和密钥对认证

### 配置网络访问

Snowflake Streaming 连接器通过 HTTPS 连接到 Snowflake。请根据 Snowflake 账户的暴露方式配置网络：

- 如果使用 Snowflake 私有 URL，请在 EMQX Cloud 和 Snowflake 之间创建私有网络连接，例如 VPC 对等连接。
- 如果使用 Snowflake 公网 URL，请确保部署可以访问公网。您可能需要启用 [NAT 网关](../vas/nat-gateway.md)。

### 准备 Snowflake 对象

在 Snowflake 中创建目标数据库、Schema、表和 Streaming pipe。以下 SQL 会创建本示例使用的对象：

```sql
CREATE DATABASE IF NOT EXISTS testdatabase;

CREATE SCHEMA IF NOT EXISTS testdatabase.public;

CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
  clientid STRING,
  topic STRING,
  payload STRING,
  publish_received_at TIMESTAMP_LTZ
);

CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
  clientid,
  topic,
  payload,
  publish_received_at
)
FROM (
  SELECT
    $1:clientid::STRING,
    $1:topic::STRING,
    $1:payload::STRING,
    $1:publish_received_at::TIMESTAMP_LTZ
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
```

### 创建 Snowflake 用户并授予权限

连接器使用密钥对认证连接 Snowflake。请创建用于管道的用户，分配角色，并授予操作管道和写入表所需的权限。

1. 生成 RSA 密钥对。请保留私钥用于 EMQX Cloud 连接器，并将公钥注册到 Snowflake。

   ```bash
   openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
   openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
   ```

   - EMQX Cloud 连接器使用 RSA 私钥签发 JWT 作为安全、可验证的身份凭证。
   - Snowflake 使用预先上传的公钥验证该签名的合法性。

   如需了解更多信息，请参考官方文档： [Key-pair authentication and key-pair rotation](https://docs.snowflake.com/en/user-guide/key-pair-auth)。

2. 创建管道用户使用的 Snowflake 角色。

   ```sql
   CREATE ROLE IF NOT EXISTS snowpipe;
   ```

3. 创建 Snowflake 用户并分配公钥。将 `<PUBLIC_KEY_CONTENT>` 替换为公钥内容，不包含 `-----BEGIN PUBLIC KEY-----` 和 `-----END PUBLIC KEY-----` 行。

   ```sql
   CREATE USER IF NOT EXISTS snowpipeuser
     RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';
   ```

4. 向角色授予权限，将角色授予用户，并将其设置为用户的默认角色。

   ```sql
   GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
   GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
   GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe;
   GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe;
   GRANT ROLE snowpipe TO USER snowpipeuser;
   ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;
   ```

请确保对象名称与稍后在 EMQX Cloud 中配置的值一致：

| Snowflake 对象 | 值 |
| -------------- | --- |
| 数据库 | `testdatabase` |
| Schema | `public` |
| 表 | `emqx` |
| 管道 | `emqxstreaming` |

## 创建 Snowflake Streaming 连接器

创建规则前，请先创建 Snowflake Streaming 连接器，用于连接 EMQX Cloud 和您的 Snowflake 账户。

1. 在 EMQX Cloud 控制台中进入您的部署。
2. 在左侧导航菜单中点击**数据集成**。
3. 如果这是您创建的第一个连接器，请在**数据持久化**分类下选择 **Snowflake Streaming**。如果已经存在连接器，请点击**新建连接器**，然后选择 **Snowflake Streaming**。
4. 在**新建连接器**页面中，配置以下字段：

   - **连接器名称**：使用自动生成的名称。
   - **服务器地址**：输入 Snowflake 的端点 URL，通常格式为 `<您的 Snowflake 组织 ID>-<您的 Snowflake 账户名>.snowflakecomputing.com`。您需要用自己 Snowflake 实例的子域替换 `<您的 Snowflake 组织 ID>-<您的 Snowflake 账户名称>`。
   - **账户**：输入您的 Snowflake 组织 ID 和账户名，用连字符（`-`）分隔，可以在 Snowflake 控制台中找到该信息，通常也是您访问 Snowflake 平台的 URL 中的一部分。
   - **管道用户**：输入操作管道的 Snowflake 用户，例如 `snowpipeuser`。该角色至少需要具备 `OPERATE` 和 `MONITOR` 权限。
   - **私钥**：粘贴用于密钥对认证的 PEM 格式 RSA 私钥。
   - **私钥密码**：如果私钥已加密，输入私钥密码。如果生成的是未加密私钥（即使用 OpenSSL 的 `-nocrypt` 选项生成），请留空。
   - **代理**：除非部署必须通过 HTTP 代理访问 Snowflake，否则保留默认值。
   - **启用 TLS**：启用此选项。Snowflake Streaming 使用 HTTPS。
   - **TLS 验证**、**Middle Box 兼容模式**、**SNI**、**TLS 证书**和 **TLS 密钥**：仅在网络或证书策略要求时配置这些字段。

5. 点击**测试连接**。如果连接测试成功，点击**新建**。

现在，您可以在为规则添加 Snowflake Streaming Sink 时选择该连接器。

## 创建规则

创建一条规则，用于选择要写入 Snowflake 的 MQTT 消息字段。

1. 在 EMQX Cloud 控制台中进入**数据集成**。
2. 使用以下任一方式创建规则：

   - 在**连接器列表**中，点击 Snowflake Streaming 连接器**操作**列下的新建规则图标。
   - 在**规则列表**中，点击 **+ 新建规则**。

3. 在 **SQL 编辑器**中输入以下 SQL：

   ```sql
   SELECT
     clientid,
     unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at,
     topic,
     payload
   FROM
     "t/#"
   ```

   该规则监听主题匹配 `t/#` 的消息。测试时，可以向 `t/1`、`t/device001` 或 `t/test` 等主题发布消息。

   ::: tip

   对于 Snowflake 集成，选中的字段名和值应与目标 Snowflake 管道和表所需的列匹配。请避免选择不必要的字段。

   :::

4. 创建规则后，点击页面底部的**下一步**，进入**新建动作**。

## 添加 Snowflake Streaming Sink

在**新建动作**页面中，配置 Snowflake Streaming Sink，将规则输出写入 Snowflake。

1. 配置动作：

   - **连接器**：选择之前创建的 Snowflake Streaming 连接器。
   - **动作类型**：值为 **Snowflake Streaming**。
   - **动作名称**：使用自动生成的名称，或输入自定义名称。
   - **数据库名字**：输入 `testdatabase`。
   - **模式**：输入 `public`。
   - **管道**：输入 `emqxstreaming`。

2. 除非需要调整连接或缓存行为，否则保持**高级设置**为默认值。
3. 点击**确认**创建规则和动作。

## 测试规则

使用 MQTTX 或其他 MQTT 客户端向匹配 `t/#` 的主题发布测试消息。

1. 向 EMQX Cloud 发布以下消息：

   - 主题：`t/1`
   - Payload：

     ```json
     {"msg":"hello snowflake"}
     ```

   MQTTX CLI 示例命令：

   ```bash
   mqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'
   ```

2. 在 Snowflake 中查询目标表：

   ```sql
   SELECT
     clientid,
     topic,
     payload,
     publish_received_at
   FROM testdatabase.public.emqx
   ORDER BY publish_received_at DESC
   LIMIT 10;
   ```

如果查询返回测试消息，说明集成已正常工作：

```text
MQTT -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表
```
