Ingest MQTT Data into Snowflake with Streaming Mode
Note
The Snowflake Streaming data integration is available for EMQX version 6.1.2 and later in the Dedicated or Dedicated Flex edition.
Snowflake is a cloud data platform for data warehousing, analytics, and secure data sharing. EMQX Cloud can write MQTT messages to Snowflake through a Snowflake Streaming Sink. The Sink uses a Snowflake Streaming Connector to call the Snowpipe Streaming API, so MQTT data can be ingested into a Snowflake table with low latency.
This page describes how to create a Snowflake Streaming data integration in EMQX Cloud. The example writes messages from the MQTT topic t/# into the Snowflake table testdatabase.public.emqx through the pipe testdatabase.public.emqxstreaming.
How It Works
The Snowflake Streaming integration uses the EMQX Broker rule engine to select and transform MQTT messages, and then sends the rule output to Snowflake through a Snowflake Streaming Sink.
The data flow is:
MQTT client -> EMQX Cloud -> Rule -> Snowflake Streaming Sink -> Pipe -> Table- MQTT clients publish messages to topics such as
t/1,t/device001, ort/test. - A rule matches messages from
t/#and selects the fields to write to Snowflake. - The Snowflake Streaming Sink sends the selected fields to the Snowflake pipe.
- Snowflake loads the streamed records into the target table.
- You can query the table in Snowflake to verify the ingested MQTT data.
Before You Start
Prerequisites
Before you begin, make sure you are familiar with:
- Data integration
- Rules
- Snowflake databases, schemas, tables, pipes, users, roles, and key-pair authentication
Configure Network Access
The Snowflake Streaming Connector connects to Snowflake over HTTPS. Configure the network according to how your Snowflake account is exposed:
- If you use a private Snowflake URL, create a private network connection, such as VPC peering, between EMQX Cloud and Snowflake.
- If you use a public Snowflake URL, make sure your deployment can access the public network. You may need to enable the NAT Gateway.
Prepare Snowflake Objects
In Snowflake, create the target database, schema, table, and streaming pipe. The following SQL creates the objects used in this example:
CREATE DATABASE IF NOT EXISTS testdatabase;
CREATE SCHEMA IF NOT EXISTS testdatabase.public;
CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
clientid STRING,
topic STRING,
payload STRING,
publish_received_at TIMESTAMP_LTZ
);
CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
clientid,
topic,
payload,
publish_received_at
)
FROM (
SELECT
$1:clientid::STRING,
$1:topic::STRING,
$1:payload::STRING,
$1:publish_received_at::TIMESTAMP_LTZ
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);Create a Snowflake User and Grant Permissions
The connector authenticates to Snowflake with key-pair authentication. Create a user for the pipe, assign a role, and grant the permissions required to operate the pipe and write to the table.
Generate an RSA key pair. Keep the private key for the EMQX Cloud connector and register the public key in Snowflake.
bashopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem- EMQX Cloud connector uses the private RSA key to sign a JWT, which serves as a secure, verifiable identity token.
- Snowflake verifies the token’s signature using the public key.
For more information, refer to Key-pair authentication and key-pair rotation.
Create the Snowflake role used by the pipe user.
sqlCREATE ROLE IF NOT EXISTS snowpipe;Create a Snowflake user and assign the public key. Replace
<PUBLIC_KEY_CONTENT>with the content of the public key without the-----BEGIN PUBLIC KEY-----and-----END PUBLIC KEY-----lines.sqlCREATE USER IF NOT EXISTS snowpipeuser RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';Grant permissions to the role, grant the role to the user, and set it as the user's default role.
sqlGRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe; GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe; GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe; GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe; GRANT ROLE snowpipe TO USER snowpipeuser; ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;
Make sure the object names match the values you will configure in EMQX Cloud:
| Snowflake Object | Value |
|---|---|
| Database | testdatabase |
| Schema | public |
| Table | emqx |
| Pipe | emqxstreaming |
Create a Snowflake Streaming Connector
Before creating a rule, create a Snowflake Streaming Connector to connect EMQX Cloud to your Snowflake account.
In the EMQX Cloud Console, go to your deployment.
From the left navigation menu, click Data Integration.
If this is your first connector, select Snowflake Streaming under the Data Persistence category. If connectors already exist, click New Connector, and then select Snowflake Streaming.
On the New Connector page, configure the following fields:
- Connector Name: Use the automatically generated name.
- Server Host: Enter your Snowflake endpoint URL, typically in the format
<Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com. You need to replace<Your Snowflake Organization ID>-<Your Snowflake Account Name>with the subdomain specific to your Snowflake instance. - Account: Enter your Snowflake Organization ID and Snowflake account name separated by a dash (
-), which is part of the URL you use to access the Snowflake platform and can be found in your Snowflake console. - Pipe User: Enter the Snowflake user that operates the pipe, for example,
snowpipeuser. The role must have at least theOPERATEandMONITORprivileges. - Private Key: Paste the PEM-formatted RSA private key used for key-pair authentication.
- Private Key Password: Enter the password if the private key is encrypted. Leave it empty if the key was generated without encryption (i.e., with the
-nocryptoption in OpenSSL). - Proxy: Keep the default value unless your deployment must access Snowflake through an HTTP proxy.
- Enable TLS: Enable this option. Snowflake Streaming uses HTTPS.
- TLS Verify, Middle Box Compatibility Mode, SNI, TLS Cert, and TLS Key: Configure these fields only if required by your network or certificate policy.
Click Test. If the connectivity test succeeds, click Save.
You can now select this connector when adding a Snowflake Streaming Sink to a rule.
Create a Rule
Create a rule to select the MQTT message fields that will be written to Snowflake.
In the EMQX Cloud Console, go to Data Integration.
Create a rule by using either of the following methods:
- In the Connectors section, click the New Rule icon in the Actions column of the Snowflake Streaming Connector.
- In the Rules section, click + New Rule.
In the SQL Editor, enter the following SQL:
sqlSELECT clientid, unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at, topic, payload FROM "t/#"This rule listens for messages whose topics match
t/#. For testing, publish messages to topics such ast/1,t/device001, ort/test.TIP
For Snowflake integration, the selected field names and values should match the columns expected by the target Snowflake pipe and table. Avoid selecting unnecessary fields.
After you create the rule, click Next at the bottom of the page to proceed to New Action.
Add a Snowflake Streaming Sink
On the New Action page, configure a Snowflake Streaming Sink to write the rule output to Snowflake.
Configure the action:
- Connector: Select the Snowflake Streaming Connector you created.
- Action Type: The value is Snowflake Streaming.
- Action Name: Use the automatically generated name, or enter a name.
- Database Name: Enter
testdatabase. - Schema: Enter
public. - Pipe: Enter
emqxstreaming.
Keep Advanced Settings at the default values unless you need to tune connection or buffering behavior.
Click Confirm to create the rule and action.
Test the Rule
Use MQTTX or another MQTT client to publish a test message to a topic that matches t/#.
Publish the following message to EMQX Cloud:
Topic:
t/1Payload:
json{"msg":"hello snowflake"}
Example MQTTX CLI command:
bashmqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'In Snowflake, query the target table:
sqlSELECT clientid, topic, payload, publish_received_at FROM testdatabase.public.emqx ORDER BY publish_received_at DESC LIMIT 10;
If the query returns the test message, the integration is working correctly:
MQTT -> Rule -> Snowflake Streaming Sink -> Pipe -> Table