使用流模式将 MQTT 数据导入 Snowflake
注意
Snowflake Streaming 数据集成功能仅适用于 EMQX 6.1.2 及之后版本的专有版或弹性专有版部署。
Snowflake 是一个用于数据仓库、数据分析和安全数据共享的云数据平台。EMQX Cloud 可以通过 Snowflake Streaming Sink 将 MQTT 消息写入 Snowflake。该 Sink 使用 Snowflake Streaming 连接器调用 Snowpipe Streaming API,从而以低延迟方式将 MQTT 数据导入 Snowflake 表。
本页介绍如何在 EMQX Cloud 中创建 Snowflake Streaming 数据集成。示例将 MQTT 主题 t/# 中的消息通过管道 testdatabase.public.emqxstreaming 写入 Snowflake 表 testdatabase.public.emqx。
工作原理
Snowflake Streaming 集成使用 EMQX Broker 规则引擎选择和转换 MQTT 消息,然后通过 Snowflake Streaming Sink 将规则输出发送到 Snowflake。
数据流如下:
MQTT 客户端 -> EMQX Cloud -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表- MQTT 客户端向
t/1、t/device001或t/test等主题发布消息。 - 规则匹配来自
t/#的消息,并选择要写入 Snowflake 的字段。 - Snowflake Streaming Sink 将选中的字段发送到 Snowflake 管道。
- Snowflake 将流式记录加载到目标表中。
- 您可以在 Snowflake 中查询该表,验证导入的 MQTT 数据。
准备工作
前置准备
开始之前,请确保您已了解:
配置网络访问
Snowflake Streaming 连接器通过 HTTPS 连接到 Snowflake。请根据 Snowflake 账户的暴露方式配置网络:
- 如果使用 Snowflake 私有 URL,请在 EMQX Cloud 和 Snowflake 之间创建私有网络连接,例如 VPC 对等连接。
- 如果使用 Snowflake 公网 URL,请确保部署可以访问公网。您可能需要启用 NAT 网关。
准备 Snowflake 对象
在 Snowflake 中创建目标数据库、Schema、表和 Streaming pipe。以下 SQL 会创建本示例使用的对象:
CREATE DATABASE IF NOT EXISTS testdatabase;
CREATE SCHEMA IF NOT EXISTS testdatabase.public;
CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
clientid STRING,
topic STRING,
payload STRING,
publish_received_at TIMESTAMP_LTZ
);
CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
clientid,
topic,
payload,
publish_received_at
)
FROM (
SELECT
$1:clientid::STRING,
$1:topic::STRING,
$1:payload::STRING,
$1:publish_received_at::TIMESTAMP_LTZ
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);创建 Snowflake 用户并授予权限
连接器使用密钥对认证连接 Snowflake。请创建用于管道的用户,分配角色,并授予操作管道和写入表所需的权限。
生成 RSA 密钥对。请保留私钥用于 EMQX Cloud 连接器,并将公钥注册到 Snowflake。
bashopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem- EMQX Cloud 连接器使用 RSA 私钥签发 JWT 作为安全、可验证的身份凭证。
- Snowflake 使用预先上传的公钥验证该签名的合法性。
如需了解更多信息,请参考官方文档: Key-pair authentication and key-pair rotation。
创建管道用户使用的 Snowflake 角色。
sqlCREATE ROLE IF NOT EXISTS snowpipe;创建 Snowflake 用户并分配公钥。将
<PUBLIC_KEY_CONTENT>替换为公钥内容,不包含-----BEGIN PUBLIC KEY-----和-----END PUBLIC KEY-----行。sqlCREATE USER IF NOT EXISTS snowpipeuser RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';向角色授予权限,将角色授予用户,并将其设置为用户的默认角色。
sqlGRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe; GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe; GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe; GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe; GRANT ROLE snowpipe TO USER snowpipeuser; ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;
请确保对象名称与稍后在 EMQX Cloud 中配置的值一致:
| Snowflake 对象 | 值 |
|---|---|
| 数据库 | testdatabase |
| Schema | public |
| 表 | emqx |
| 管道 | emqxstreaming |
创建 Snowflake Streaming 连接器
创建规则前,请先创建 Snowflake Streaming 连接器,用于连接 EMQX Cloud 和您的 Snowflake 账户。
在 EMQX Cloud 控制台中进入您的部署。
在左侧导航菜单中点击数据集成。
如果这是您创建的第一个连接器,请在数据持久化分类下选择 Snowflake Streaming。如果已经存在连接器,请点击新建连接器,然后选择 Snowflake Streaming。
在新建连接器页面中,配置以下字段:
- 连接器名称:使用自动生成的名称。
- 服务器地址:输入 Snowflake 的端点 URL,通常格式为
<您的 Snowflake 组织 ID>-<您的 Snowflake 账户名>.snowflakecomputing.com。您需要用自己 Snowflake 实例的子域替换<您的 Snowflake 组织 ID>-<您的 Snowflake 账户名称>。 - 账户:输入您的 Snowflake 组织 ID 和账户名,用连字符(
-)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。 - 管道用户:输入操作管道的 Snowflake 用户,例如
snowpipeuser。该角色至少需要具备OPERATE和MONITOR权限。 - 私钥:粘贴用于密钥对认证的 PEM 格式 RSA 私钥。
- 私钥密码:如果私钥已加密,输入私钥密码。如果生成的是未加密私钥(即使用 OpenSSL 的
-nocrypt选项生成),请留空。 - 代理:除非部署必须通过 HTTP 代理访问 Snowflake,否则保留默认值。
- 启用 TLS:启用此选项。Snowflake Streaming 使用 HTTPS。
- TLS 验证、Middle Box 兼容模式、SNI、TLS 证书和 TLS 密钥:仅在网络或证书策略要求时配置这些字段。
点击测试连接。如果连接测试成功,点击新建。
现在,您可以在为规则添加 Snowflake Streaming Sink 时选择该连接器。
创建规则
创建一条规则,用于选择要写入 Snowflake 的 MQTT 消息字段。
在 EMQX Cloud 控制台中进入数据集成。
使用以下任一方式创建规则:
- 在连接器列表中,点击 Snowflake Streaming 连接器操作列下的新建规则图标。
- 在规则列表中,点击 + 新建规则。
在 SQL 编辑器中输入以下 SQL:
sqlSELECT clientid, unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at, topic, payload FROM "t/#"该规则监听主题匹配
t/#的消息。测试时,可以向t/1、t/device001或t/test等主题发布消息。TIP
对于 Snowflake 集成,选中的字段名和值应与目标 Snowflake 管道和表所需的列匹配。请避免选择不必要的字段。
创建规则后,点击页面底部的下一步,进入新建动作。
添加 Snowflake Streaming Sink
在新建动作页面中,配置 Snowflake Streaming Sink,将规则输出写入 Snowflake。
配置动作:
- 连接器:选择之前创建的 Snowflake Streaming 连接器。
- 动作类型:值为 Snowflake Streaming。
- 动作名称:使用自动生成的名称,或输入自定义名称。
- 数据库名字:输入
testdatabase。 - 模式:输入
public。 - 管道:输入
emqxstreaming。
除非需要调整连接或缓存行为,否则保持高级设置为默认值。
点击确认创建规则和动作。
测试规则
使用 MQTTX 或其他 MQTT 客户端向匹配 t/# 的主题发布测试消息。
向 EMQX Cloud 发布以下消息:
主题:
t/1Payload:
json{"msg":"hello snowflake"}
MQTTX CLI 示例命令:
bashmqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'在 Snowflake 中查询目标表:
sqlSELECT clientid, topic, payload, publish_received_at FROM testdatabase.public.emqx ORDER BY publish_received_at DESC LIMIT 10;
如果查询返回测试消息,说明集成已正常工作:
MQTT -> 规则 -> Snowflake Streaming Sink -> 管道 -> 表