Skip to content

使用流模式将 MQTT 数据导入 Snowflake

注意

Snowflake Streaming 数据集成功能仅适用于 EMQX 6.1.2 及之后版本的专有版或弹性专有版部署。

Snowflake 是一个用于数据仓库、数据分析和安全数据共享的云数据平台。EMQX Cloud 可以通过 Snowflake Streaming Sink 将 MQTT 消息写入 Snowflake。该 Sink 使用 Snowflake Streaming 连接器调用 Snowpipe Streaming API,从而以低延迟方式将 MQTT 数据导入 Snowflake 表。

本页介绍如何在 EMQX Cloud 中创建 Snowflake Streaming 数据集成。示例将 MQTT 主题 t/# 中的消息通过管道 testdatabase.public.emqxstreaming 写入 Snowflake 表 testdatabase.public.emqx

工作原理

Snowflake Streaming 集成使用 EMQX Broker 规则引擎选择和转换 MQTT 消息,然后通过 Snowflake Streaming Sink 将规则输出发送到 Snowflake。

数据流如下:

text
MQTT 客户端 -> EMQX Cloud -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表
  1. MQTT 客户端向 t/1t/device001t/test 等主题发布消息。
  2. 规则匹配来自 t/# 的消息,并选择要写入 Snowflake 的字段。
  3. Snowflake Streaming Sink 将选中的字段发送到 Snowflake 管道。
  4. Snowflake 将流式记录加载到目标表中。
  5. 您可以在 Snowflake 中查询该表,验证导入的 MQTT 数据。

准备工作

前置准备

开始之前,请确保您已了解:

  • 数据集成
  • 规则
  • Snowflake 数据库、Schema、表、管道、用户、角色和密钥对认证

配置网络访问

Snowflake Streaming 连接器通过 HTTPS 连接到 Snowflake。请根据 Snowflake 账户的暴露方式配置网络:

  • 如果使用 Snowflake 私有 URL,请在 EMQX Cloud 和 Snowflake 之间创建私有网络连接,例如 VPC 对等连接。
  • 如果使用 Snowflake 公网 URL,请确保部署可以访问公网。您可能需要启用 NAT 网关

准备 Snowflake 对象

在 Snowflake 中创建目标数据库、Schema、表和 Streaming pipe。以下 SQL 会创建本示例使用的对象:

sql
CREATE DATABASE IF NOT EXISTS testdatabase;

CREATE SCHEMA IF NOT EXISTS testdatabase.public;

CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
  clientid STRING,
  topic STRING,
  payload STRING,
  publish_received_at TIMESTAMP_LTZ
);

CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
  clientid,
  topic,
  payload,
  publish_received_at
)
FROM (
  SELECT
    $1:clientid::STRING,
    $1:topic::STRING,
    $1:payload::STRING,
    $1:publish_received_at::TIMESTAMP_LTZ
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

创建 Snowflake 用户并授予权限

连接器使用密钥对认证连接 Snowflake。请创建用于管道的用户,分配角色,并授予操作管道和写入表所需的权限。

  1. 生成 RSA 密钥对。请保留私钥用于 EMQX Cloud 连接器,并将公钥注册到 Snowflake。

    bash
    openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
    openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
    • EMQX Cloud 连接器使用 RSA 私钥签发 JWT 作为安全、可验证的身份凭证。
    • Snowflake 使用预先上传的公钥验证该签名的合法性。

    如需了解更多信息,请参考官方文档: Key-pair authentication and key-pair rotation

  2. 创建管道用户使用的 Snowflake 角色。

    sql
    CREATE ROLE IF NOT EXISTS snowpipe;
  3. 创建 Snowflake 用户并分配公钥。将 <PUBLIC_KEY_CONTENT> 替换为公钥内容,不包含 -----BEGIN PUBLIC KEY----------END PUBLIC KEY----- 行。

    sql
    CREATE USER IF NOT EXISTS snowpipeuser
      RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';
  4. 向角色授予权限,将角色授予用户,并将其设置为用户的默认角色。

    sql
    GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
    GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
    GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe;
    GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe;
    GRANT ROLE snowpipe TO USER snowpipeuser;
    ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;

请确保对象名称与稍后在 EMQX Cloud 中配置的值一致:

Snowflake 对象
数据库testdatabase
Schemapublic
emqx
管道emqxstreaming

创建 Snowflake Streaming 连接器

创建规则前,请先创建 Snowflake Streaming 连接器,用于连接 EMQX Cloud 和您的 Snowflake 账户。

  1. 在 EMQX Cloud 控制台中进入您的部署。

  2. 在左侧导航菜单中点击数据集成

  3. 如果这是您创建的第一个连接器,请在数据持久化分类下选择 Snowflake Streaming。如果已经存在连接器,请点击新建连接器,然后选择 Snowflake Streaming

  4. 新建连接器页面中,配置以下字段:

    • 连接器名称:使用自动生成的名称。
    • 服务器地址:输入 Snowflake 的端点 URL,通常格式为 <您的 Snowflake 组织 ID>-<您的 Snowflake 账户名>.snowflakecomputing.com。您需要用自己 Snowflake 实例的子域替换 <您的 Snowflake 组织 ID>-<您的 Snowflake 账户名称>
    • 账户:输入您的 Snowflake 组织 ID 和账户名,用连字符(-)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。
    • 管道用户:输入操作管道的 Snowflake 用户,例如 snowpipeuser。该角色至少需要具备 OPERATEMONITOR 权限。
    • 私钥:粘贴用于密钥对认证的 PEM 格式 RSA 私钥。
    • 私钥密码:如果私钥已加密,输入私钥密码。如果生成的是未加密私钥(即使用 OpenSSL 的 -nocrypt 选项生成),请留空。
    • 代理:除非部署必须通过 HTTP 代理访问 Snowflake,否则保留默认值。
    • 启用 TLS:启用此选项。Snowflake Streaming 使用 HTTPS。
    • TLS 验证Middle Box 兼容模式SNITLS 证书TLS 密钥:仅在网络或证书策略要求时配置这些字段。
  5. 点击测试连接。如果连接测试成功,点击新建

现在,您可以在为规则添加 Snowflake Streaming Sink 时选择该连接器。

创建规则

创建一条规则,用于选择要写入 Snowflake 的 MQTT 消息字段。

  1. 在 EMQX Cloud 控制台中进入数据集成

  2. 使用以下任一方式创建规则:

    • 连接器列表中,点击 Snowflake Streaming 连接器操作列下的新建规则图标。
    • 规则列表中,点击 + 新建规则
  3. SQL 编辑器中输入以下 SQL:

    sql
    SELECT
      clientid,
      unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at,
      topic,
      payload
    FROM
      "t/#"

    该规则监听主题匹配 t/# 的消息。测试时,可以向 t/1t/device001t/test 等主题发布消息。

    TIP

    对于 Snowflake 集成,选中的字段名和值应与目标 Snowflake 管道和表所需的列匹配。请避免选择不必要的字段。

  4. 创建规则后,点击页面底部的下一步,进入新建动作

添加 Snowflake Streaming Sink

新建动作页面中,配置 Snowflake Streaming Sink,将规则输出写入 Snowflake。

  1. 配置动作:

    • 连接器:选择之前创建的 Snowflake Streaming 连接器。
    • 动作类型:值为 Snowflake Streaming
    • 动作名称:使用自动生成的名称,或输入自定义名称。
    • 数据库名字:输入 testdatabase
    • 模式:输入 public
    • 管道:输入 emqxstreaming
  2. 除非需要调整连接或缓存行为,否则保持高级设置为默认值。

  3. 点击确认创建规则和动作。

测试规则

使用 MQTTX 或其他 MQTT 客户端向匹配 t/# 的主题发布测试消息。

  1. 向 EMQX Cloud 发布以下消息:

    • 主题:t/1

    • Payload:

      json
      {"msg":"hello snowflake"}

    MQTTX CLI 示例命令:

    bash
    mqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'
  2. 在 Snowflake 中查询目标表:

    sql
    SELECT
      clientid,
      topic,
      payload,
      publish_received_at
    FROM testdatabase.public.emqx
    ORDER BY publish_received_at DESC
    LIMIT 10;

如果查询返回测试消息,说明集成已正常工作:

text
MQTT -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表