Skip to content

SnowflakeへのMQTTデータ取り込み(ストリーミングモード)

注意

Snowflakeストリーミングデータ統合は、EMQXバージョン6.1.2以降の専用版または専用Flex版で利用可能です。

Snowflakeは、データウェアハウジング、分析、セキュアなデータ共有のためのクラウドデータプラットフォームです。EMQX Cloudは、Snowflakeストリーミングシンクを通じてMQTTメッセージをSnowflakeに書き込むことができます。このシンクはSnowflakeストリーミングコネクターを使用してSnowpipeストリーミングAPIを呼び出すため、MQTTデータを低レイテンシでSnowflakeテーブルに取り込むことが可能です。

本ページでは、EMQX CloudでSnowflakeストリーミングデータ統合を作成する方法を説明します。例として、MQTTトピック t/# からのメッセージを、パイプ testdatabase.public.emqxstreaming を介してSnowflakeテーブル testdatabase.public.emqx に書き込みます。

動作概要

Snowflakeストリーミング統合は、EMQXブローカーのルールエンジンを使ってMQTTメッセージを選択・変換し、そのルールの出力をSnowflakeストリーミングシンク経由でSnowflakeに送信します。

データフローは以下の通りです:

text
MQTTクライアント -> EMQX Cloud -> ルール -> Snowflakeストリーミングシンク -> パイプ -> テーブル
  1. MQTTクライアントが t/1t/device001t/test などのトピックにメッセージをパブリッシュします。
  2. ルールが t/# のメッセージにマッチし、Snowflakeに書き込むフィールドを選択します。
  3. Snowflakeストリーミングシンクが選択されたフィールドをSnowflakeのパイプに送信します。
  4. Snowflakeがストリームされたレコードをターゲットテーブルにロードします。
  5. Snowflakeでテーブルをクエリし、取り込まれたMQTTデータを確認できます。

はじめる前に

前提条件

以下を理解していることを確認してください:

  • データ統合
  • ルール
  • Snowflakeのデータベース、スキーマ、テーブル、パイプ、ユーザー、ロール、およびキーペア認証

ネットワークアクセスの設定

SnowflakeストリーミングコネクターはHTTPS経由でSnowflakeに接続します。Snowflakeアカウントの公開方法に応じてネットワークを設定してください:

  • プライベートSnowflake URLを使用する場合は、EMQX CloudとSnowflake間でVPCピアリングなどのプライベートネットワーク接続を作成してください。
  • パブリックSnowflake URLを使用する場合は、デプロイメントがパブリックネットワークにアクセスできることを確認してください。必要に応じてNATゲートウェイを有効にしてください。

Snowflakeオブジェクトの準備

Snowflakeでターゲットのデータベース、スキーマ、テーブル、ストリーミングパイプを作成します。以下のSQLは本例で使用するオブジェクトを作成します:

sql
CREATE DATABASE IF NOT EXISTS testdatabase;

CREATE SCHEMA IF NOT EXISTS testdatabase.public;

CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
  clientid STRING,
  topic STRING,
  payload STRING,
  publish_received_at TIMESTAMP_LTZ
);

CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
  clientid,
  topic,
  payload,
  publish_received_at
)
FROM (
  SELECT
    $1:clientid::STRING,
    $1:topic::STRING,
    $1:payload::STRING,
    $1:publish_received_at::TIMESTAMP_LTZ
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

Snowflakeユーザーの作成と権限付与

コネクターはキーペア認証でSnowflakeに認証します。パイプ用のユーザーを作成し、ロールを割り当て、パイプの操作とテーブルへの書き込みに必要な権限を付与してください。

  1. RSAキーペアを生成します。秘密鍵はEMQX Cloudコネクター用に保持し、公開鍵はSnowflakeに登録します。

    bash
    openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
    openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
    • EMQX Cloudコネクターは秘密鍵でJWTに署名し、安全かつ検証可能なIDトークンとして使用します。
    • Snowflakeは公開鍵でトークンの署名を検証します。

    詳細はキーペア認証とキーペアローテーションを参照してください。

  2. パイプユーザーが使用するSnowflakeロールを作成します。

    sql
    CREATE ROLE IF NOT EXISTS snowpipe;
  3. 公開鍵を割り当てたSnowflakeユーザーを作成します。<PUBLIC_KEY_CONTENT>は公開鍵の内容で、-----BEGIN PUBLIC KEY----------END PUBLIC KEY-----の行は除いてください。

    sql
    CREATE USER IF NOT EXISTS snowpipeuser
      RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';
  4. ロールに権限を付与し、ユーザーにロールを割り当て、デフォルトロールに設定します。

    sql
    GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
    GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
    GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe;
    GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe;
    GRANT ROLE snowpipe TO USER snowpipeuser;
    ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;

EMQX Cloudで設定する値とオブジェクト名が一致していることを確認してください:

Snowflakeオブジェクト
データベースtestdatabase
スキーマpublic
テーブルemqx
パイプemqxstreaming

Snowflakeストリーミングコネクターの作成

ルールを作成する前に、EMQX CloudとSnowflakeアカウントを接続するSnowflakeストリーミングコネクターを作成します。

  1. EMQX Cloudコンソールで、対象のデプロイメントにアクセスします。

  2. 左側のナビゲーションメニューからデータ統合をクリックします。

  3. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からSnowflake Streamingを選択します。既にコネクターがある場合は新規コネクターをクリックし、Snowflake Streamingを選択します。

  4. 新規コネクターページで以下の項目を設定します:

    • コネクター名:自動生成された名前を使用します。
    • サーバーホスト:SnowflakeのエンドポイントURLを入力します。通常は <Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com の形式です。<Your Snowflake Organization ID>-<Your Snowflake Account Name> はSnowflakeインスタンス固有のサブドメインに置き換えてください。
    • アカウント:Snowflakeの組織IDとアカウント名をハイフン(-)で区切って入力します。これはSnowflakeプラットフォームにアクセスするURLの一部で、Snowflakeコンソールで確認できます。
    • パイプユーザー:パイプを操作するSnowflakeユーザー名を入力します。例:snowpipeuser。ロールには少なくともOPERATEMONITOR権限が必要です。
    • 秘密鍵:キーペア認証に使用するPEM形式のRSA秘密鍵を貼り付けます。
    • 秘密鍵パスワード:秘密鍵が暗号化されている場合はパスワードを入力します。OpenSSLの-nocryptオプションで生成した場合は空欄のままにしてください。
    • プロキシ:デフォルト値のままで問題ありません。SnowflakeにHTTPプロキシ経由でアクセスする必要がある場合のみ設定してください。
    • TLSを有効化:必ず有効にしてください。SnowflakeストリーミングはHTTPSを使用します。
    • TLS検証ミドルボックス互換モードSNITLS証明書TLSキー:ネットワークや証明書ポリシーで必要な場合のみ設定してください。
  5. テストをクリックし、接続テストが成功したら保存をクリックします。

これでルールにSnowflakeストリーミングシンクを追加するときに、このコネクターを選択できます。

ルールの作成

Snowflakeに書き込むMQTTメッセージのフィールドを選択するルールを作成します。

  1. EMQX Cloudコンソールでデータ統合にアクセスします。

  2. 以下のいずれかの方法でルールを作成します:

    • コネクターセクションで、Snowflakeストリーミングコネクターのアクション列にある新規ルールアイコンをクリック。
    • ルールセクションで**+ 新規ルール**をクリック。
  3. SQLエディターに以下のSQLを入力します:

    sql
    SELECT
      clientid,
      unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at,
      topic,
      payload
    FROM
      "t/#"

    このルールはトピックが t/# にマッチするメッセージを監視します。テスト用に t/1t/device001t/test などのトピックにメッセージをパブリッシュしてください。

    TIP

    Snowflake統合では、選択するフィールド名と値はターゲットのSnowflakeパイプおよびテーブルが期待するカラムと一致させてください。不必要なフィールドは選択しないようにしてください。

  4. ルール作成後、ページ下部の次へをクリックして新規アクションに進みます。

Snowflakeストリーミングシンクの追加

新規アクションページで、ルールの出力をSnowflakeに書き込むSnowflakeストリーミングシンクを設定します。

  1. アクションを設定します:

    • コネクター:作成したSnowflakeストリーミングコネクターを選択。
    • アクションタイプSnowflake Streaming が選択されています。
    • アクション名:自動生成された名前を使用するか任意の名前を入力。
    • データベース名testdatabase を入力。
    • スキーマpublic を入力。
    • パイプemqxstreaming を入力。
  2. 詳細設定は接続やバッファリングの調整が必要な場合を除き、デフォルトのままにします。

  3. 確定をクリックしてルールとアクションを作成します。

ルールのテスト

MQTTXなどのMQTTクライアントを使って、t/# にマッチするトピックにテストメッセージをパブリッシュします。

  1. EMQX Cloudに以下のメッセージをパブリッシュします:

    • トピック:t/1

    • ペイロード:

      json
      {"msg":"hello snowflake"}

    MQTTX CLIの例:

    bash
    mqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'
  2. Snowflakeでターゲットテーブルをクエリします:

    sql
    SELECT
      clientid,
      topic,
      payload,
      publish_received_at
    FROM testdatabase.public.emqx
    ORDER BY publish_received_at DESC
    LIMIT 10;

クエリでテストメッセージが返されれば、統合は正常に動作しています:

text
MQTT -> ルール -> Snowflakeストリーミングシンク -> パイプ -> テーブル