Skip to content

SnowflakeへのMQTTデータ取り込み

Snowflakeは、クラウドベースのデータプラットフォームであり、高いスケーラビリティと柔軟性を備えたデータウェアハウジング、分析、セキュアなデータ共有のソリューションを提供します。構造化データおよび半構造化データの処理に優れ、大量のデータを高速なクエリ性能で保存し、さまざまなツールやサービスとシームレスに統合できるよう設計されています。

本ページでは、EMQXとSnowflake間のデータ統合について詳細に紹介し、ルールおよびSinkの作成方法について実践的なガイドを提供します。

動作の仕組み

EMQXにおけるSnowflakeデータ統合はすぐに使える機能であり、複雑なIoTビジネスワークフローを簡単にサポートできるよう構成可能です。典型的なIoTアプリケーションでは、EMQXがデバイス接続およびメッセージ送受信を担うIoTプラットフォームとして機能し、Snowflakeはメッセージデータの取り込み、保存、分析を行うデータストレージおよび処理プラットフォームとして役割を果たします。

snowflake-architecture

EMQXはルールエンジンとSinkを利用してデバイスのイベントやデータをSnowflakeに転送します。エンドユーザーやアプリケーションはSnowflakeのテーブル内のデータにアクセス可能です。具体的なワークフローは以下の通りです:

  1. デバイスのEMQXへの接続:IoTデバイスはMQTTプロトコルで正常に接続されるとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレスなどの識別情報が含まれます。

  2. デバイスからのメッセージパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリおよびステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信し、ルールエンジン内で比較処理を行います。

  3. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールをマッチングし、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などの処理を行います。

  4. Snowflakeへの書き込み:ルールはメッセージデータをSnowflakeに書き込むアクションをトリガーします。メッセージをファイルにバッチングしてStageおよびPipe経由でロードする(集約モード)、またはSnowpipe Streaming APIを用いて直接ストリーミングする(ストリーミングモード)方法があります。

イベントやメッセージデータがSnowflakeに書き込まれた後は、以下のようなビジネスや技術的な目的で活用可能です:

  • データアーカイブ:IoTデータをSnowflakeに安全に長期保存し、コンプライアンスや履歴データの利用を保証します。
  • データ分析:Snowflakeのデータウェアハウジングおよび分析機能を活用し、リアルタイムまたはバッチ分析を行い、予知保全、運用インサイト、デバイス性能評価を実現します。

特長と利点

EMQXのSnowflakeデータ統合を利用することで、以下の特長と利点をビジネスにもたらします:

  • メッセージ変換:メッセージはEMQXのルール内で高度に処理・変換されてからSnowflakeに書き込まれるため、後続の保存や活用が容易になります。
  • 柔軟なデータ操作:Snowflake Sinkは書き込むフィールドを選択可能であり、ビジネスニーズに応じた効率的かつ動的なストレージ構成を実現します。
  • 統合されたビジネスプロセス:Snowflake SinkによりデバイスデータをSnowflakeの豊富なエコシステムアプリケーションと組み合わせ、多様なビジネスシナリオ(データ分析やアーカイブなど)を可能にします。
  • 低コストの長期保存:Snowflakeのスケーラブルなストレージ基盤は従来のデータベースより低コストで長期データ保持に最適であり、大量のIoTデータ保存に適しています。

これらの特長により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

はじめる前に

このセクションでは、EMQXでSnowflake Sinkを作成する前に必要な準備について説明します。

前提条件

  • EMQXのルールおよびデータ統合の基本概念の理解
  • 管理者権限を持つ動作中のSnowflakeアカウント

アップロードモードの選択

TIP

最初にモードを選択してください。これによりEMQXとSnowflake環境の両方の設定方法が決まります。

EMQXはSnowflakeへのデータ送信に以下の2つのモードをサポートしています:

モード説明ODBC 必須か
集約モードEMQXはMQTTメッセージをローカルファイルにバッファリングし、Snowflakeのステージにアップロードします。COPY INTO文で設定されたパイプが自動的にこれらのファイルをターゲットテーブルにロードします。詳細はSnowflake Snowpipeドキュメントを参照してください。必須
ストリーミングモードSnowpipe Streaming APIを介してリアルタイムにデータを送信し、行を直接Snowflakeテーブルに書き込みます。必須

Snowflake ODBCドライバーの初期化

EMQXがSnowflakeと通信し効率的にデータを転送するために、SnowflakeのOpen Database Connectivity(ODBC)ドライバーをインストールおよび設定する必要があります。このドライバーはEMQXがSnowflakeのステージにデータを書き込むための通信ブリッジとして機能し、データの適切なフォーマット、認証、転送を保証します。

詳細は公式のODBCドライバーページおよびライセンス契約を参照してください。

LinuxでのSnowflake ODBCドライバー初期化

EMQXはDebian系(Ubuntuなど)向けにSnowflake ODBCドライバーの迅速な展開と必要なシステム設定を行うインストールスクリプトを提供しています。

注意

このスクリプトはテスト用であり、本番環境でのODBCドライバー設定方法の推奨ではありません。公式のLinux向けインストール手順を参照してください。

インストールスクリプトの実行

scripts/install-snowflake-driver.shスクリプトをローカルにコピーし、chmod a+xで実行権限を付与してからsudoで実行します:

bash
chmod a+x scripts/install-snowflake-driver.sh
sudo ./scripts/install-snowflake-driver.sh

スクリプトはSnowflake ODBCの.debインストールパッケージ(例:snowflake-odbc-3.4.1.x86_64.deb)をカレントディレクトリに自動ダウンロードし、ドライバーをインストールして以下のシステム設定ファイルを更新します:

  • /etc/odbc.ini:Snowflakeデータソース設定を追加
  • /etc/odbcinst.ini:Snowflakeドライバーパスを登録

設定例

/etc/odbc.iniの内容を確認するコマンド:

emqx@emqx-0:~$ cat /etc/odbc.ini

[snowflake]
Description=SnowflakeDB
Driver=SnowflakeDSIIDriver
Locale=en-US
PORT=443
SSL=on

[ODBC Data Sources]
snowflake = SnowflakeDSIIDriver

/etc/odbcinst.iniの内容を確認するコマンド:

emqx@emqx-0:~$ cat /etc/odbcinst.ini

[ODBC Driver 18 for SQL Server]
Description=Microsoft ODBC Driver 18 for SQL Server
Driver=/opt/microsoft/msodbcsql18/lib64/libmsodbcsql-18.5.so.1.1
UsageCount=1

[ODBC Driver 17 for SQL Server]
Description=Microsoft ODBC Driver 17 for SQL Server
Driver=/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.6.1
UsageCount=1

[SnowflakeDSIIDriver]
APILevel=1
ConnectFunctions=YYY
Description=Snowflake DSII
Driver=/usr/lib/snowflake/odbc/lib/libSnowflake.so
DriverODBCVer=03.52
SQLLevel=1
UsageCount=1

macOSでのSnowflake ODBCドライバー初期化

macOSでSnowflake ODBCドライバーをインストールおよび設定する手順は以下の通りです:

  1. unixODBCをインストール(例):

    brew install unixodbc
  2. iODBCをダウンロードしてインストール

  3. Snowflake ODBCドライバーをダウンロードしてインストール

  4. 詳細なインストールおよび設定手順はmacOS向けODBCドライバーのインストールと設定を参照。

  5. インストール後、以下の設定ファイルを更新:

    • Snowflake ODBCドライバーの権限と設定を更新:

      bash
      chown $(id -u):$(id -g) /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
      echo 'ODBCInstLib=libiodbcinst.dylib' >> /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
    • ODBC接続設定のため~/.odbc.iniファイルを作成または更新:

      cat << EOF > ~/.odbc.ini
      [ODBC]
      Trace=no
      TraceFile=
      
      [ODBC Drivers]
      Snowflake = Installed
      
      [ODBC Data Sources]
      snowflake = Snowflake
      
      [Snowflake]
      Driver = /opt/snowflake/snowflakeodbc/lib/universal/libSnowflake.dylib
      EOF

ユーザーアカウント作成とSnowflakeリソースのセットアップ

アップロードモードに関わらず、Snowflake環境でユーザーアカウント、データベース、関連リソースを設定し、データ取り込み用の準備を行う必要があります。以下の認証情報は後でEMQXのコネクターおよびSink設定時に必要となります:

項目名説明
データソース名 (DSN)snowflake(集約モードのみ)/etc/odbc.iniで設定したODBC DSN。集約アップロード時に使用。
ユーザー名snowpipeuserSnowflake接続認証に使用するユーザー。適切な権限を持つ必要あり。
パスワードSnowpipeuser99キーペア認証使用時は省略可能。
データベース名testdatabase対象テーブルが存在するSnowflakeデータベース名。
スキーマpublicデータベース内のスキーマ名。テーブルやパイプが存在する場所。
ステージ(集約モード)emqxデータ取り込み前にファイルを保持するSnowflakeステージ名。
パイプ(集約モード)emqxステージからテーブルへデータをロードするパイプ名。
パイプ(ストリーミング)emqxstreamingSnowpipe Streaming API経由でデータを取り込むためのパイプ名。
プライベートキーfile://<path to snowflake_rsa_key.private.pem>API認証用JWT署名に使用するRSAプライベートキーのパス。

RSAキーペアの生成(集約モードは任意)

Snowflakeは複数の認証方式をサポートしており、EMQXではアップロードモードや接続設定に応じて選択します:

アップロードモード認証オプションキーペア必須か
ストリーミング(HTTPS)RSAキーペア + JWT(唯一のサポート方式)必須
集約(ODBC)ユーザー名/パスワード(DSNまたはEMQX経由)
RSAキーペア + JWT(任意、EMQX設定のみ)
任意

キーペア認証はストリーミングモードで必須であり、EMQXはJWTを署名してSnowflakeのStreaming APIに安全に認証します。

集約モードではユーザー名/パスワードまたはRSAキーペアのいずれかで認証可能です。認証情報は以下のいずれかの方法で提供できます:

  • ダッシュボードのEMQXコネクター設定でユーザー名とパスワードを直接入力
  • キーペア認証を使う場合はプライベートRSAキーのパスを指定
  • EMQXでいずれも指定しない場合は、システムのODBC DSN(Linuxの/etc/odbc.iniやmacOSの~/.odbc.ini)に正しく設定されていることを確認

TIP

認証にはパスワードかプライベートキーのいずれかを使用し、両方を同時に使わないでください。

EMQXでどちらも設定されていない場合、コネクターは/etc/odbc.iniの認証情報を参照します。

例:ユーザー名/パスワードを含む/etc/odbc.ini

ini
[snowflake]
Driver=SnowflakeDSIIDriver
Server=<account>.snowflakecomputing.com
UID=snowpipeuser
PWD=Snowpipeuser99
Database=testdatabase
Schema=public
Warehouse=compute_wh
Role=snowpipe

この方法により、EMQXは設定で直接認証情報を含めずにDSNsnowflake)を参照できます。

キーペア認証を利用する場合

RSAキーペア認証を使用する場合(例:ストリーミングモード)、以下のコマンドで鍵を生成し設定します:

bash
# 秘密鍵の生成
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt

# 公開鍵の生成
openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem

EMQXがキーペア認証を使用する場合(集約・ストリーミング両モード対応):

  • EMQXはプライベートRSAキーでJWTに署名し、安全で検証可能なIDトークンとして利用
  • Snowflakeは公開鍵で署名を検証

詳細はキーペア認証とキーペアローテーションを参照してください。

SQLでSnowflakeリソースをセットアップ

RSAキーペア生成後、aggregatedまたはstreamingの取り込み用に必要なSnowflakeオブジェクトをSQLで作成します。

対象は以下を含みます:

  • データベースとテーブルの作成
  • ステージとパイプの作成(集約モード)
  • ストリーミングパイプの作成(ストリーミングモード)
  • ユーザーとロールの作成および権限付与
  1. SnowflakeコンソールのSQLワークシートで以下のSQLを実行し、データベース、テーブル、ステージ、パイプを作成します:

    sql
    USE ROLE accountadmin;
    
    -- データ保存用データベース作成(存在しない場合)
    CREATE DATABASE IF NOT EXISTS testdatabase;
    
    -- MQTTデータ受け取り用テーブル作成
    CREATE OR REPLACE TABLE testdatabase.public.emqx (
        clientid STRING,
        topic STRING,
        payload STRING,
        publish_received_at TIMESTAMP_LTZ
    );
    
    -- ファイルアップロード用Snowflakeステージ作成(集約モードのみ)
    CREATE STAGE IF NOT EXISTS testdatabase.public.emqx
    FILE_FORMAT = (TYPE = CSV PARSE_HEADER = TRUE FIELD_OPTIONALLY_ENCLOSED_BY = '"')
    COPY_OPTIONS = (ON_ERROR = CONTINUE PURGE = TRUE);
    
    -- ステージからコピーする集約モード用パイプ作成
    CREATE PIPE IF NOT EXISTS testdatabase.public.emqx AS
    COPY INTO testdatabase.public.emqx
    FROM @testdatabase.public.emqx
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
    
    -- ストリーミングモード用パイプ作成(直接取り込み)
    CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
    COPY INTO testdatabase.public.emqx (
        clientid,
        topic,
        payload,
        publish_received_at
    )
    FROM (
        SELECT
            $1:clientid::STRING,
            $1:topic::STRING,
            $1:payload::STRING,
            $1:publish_received_at::TIMESTAMP_LTZ
        FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
    );
    • パイプ内のCOPY INTO文により、Snowflakeはステージまたはストリーミングされたデータを自動的にテーブルにロードします。
    • ストリーミングパイプの$1:field構文は、EMQX経由で取り込まれたJSONペイロードからフィールドを抽出します。
  2. EMQXが認証に使用する専用ユーザー(例:snowpipeuser)を作成し、RSA公開鍵をバインドします:

    sql
    -- ユーザーアカウント作成
    CREATE USER IF NOT EXISTS snowpipeuser
        PASSWORD = 'Snowpipeuser99'
        MUST_CHANGE_PASSWORD = FALSE;
    
    -- RSA公開鍵をユーザーに設定
    ALTER USER snowpipeuser SET RSA_PUBLIC_KEY = '
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_1>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_2>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_3>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_4>
    ';

    TIP

    PEMファイルの-----BEGIN PUBLIC KEY-----および-----END PUBLIC KEY-----行は削除し、残りの内容を改行を保持したまま含めてください。

    この鍵はSnowflakeユーザーにアップロードされ、Snowflake内に保存されます。

  3. 必要なロールを作成し、ユーザーに割り当ててSnowflakeリソースを管理できるようにします:

    sql
    CREATE OR REPLACE ROLE snowpipe;
    
    -- データベースとスキーマの使用権限付与
    GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
    GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
    GRANT INSERT, SELECT ON testdatabase.public.emqx TO ROLE snowpipe;
    
    -- 集約モード用にステージとパイプの権限付与
    GRANT READ, WRITE ON STAGE testdatabase.public.emqx TO ROLE snowpipe;
    GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqx TO ROLE snowpipe;
    
    -- ストリーミングモード用にストリーミングパイプの権限付与
    GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe;
    
    -- ユーザーにロールを割り当て、デフォルトに設定
    GRANT ROLE snowpipe TO USER snowpipeuser;
    ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;

集約モード用Snowflakeコネクターの作成

Snowflake Sinkで集約アップロードモードを使用する場合、Snowflake環境との接続を確立するためにSnowflakeコネクターを作成する必要があります。このコネクターはODBC(DSN経由)を使ってステージに接続します。

  1. ダッシュボードの Integration -> Connector ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. コネクタータイプで Snowflake を選択し、次へ進みます。

  4. コネクター名を入力します。英数字の組み合わせで、ここでは my-snowflake とします。

  5. 接続情報を入力します。

    • Server Host:SnowflakeのエンドポイントURL。通常は <Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com の形式で、実際にはSnowflakeインスタンス固有のサブドメインに置き換えます。

    • Account:Snowflakeの組織IDとアカウント名をハイフン(-)で区切って入力します。SnowflakeコンソールのURLに含まれています。

    • Data Source Name (DSN):ODBCドライバー設定時に.odbc.iniで設定したsnowflakeを入力します。

    • Username:前述の設定で作成したsnowpipeuserを入力します。

    • Password:SnowflakeへのODBC認証用パスワード。任意入力です。

      • ここにSnowpipeuser99などを入力してもよいです。

      • または/etc/odbc.iniに設定している場合は空欄でも構いません。

      • キーペア認証を使う場合は空欄にします。

        TIP

        認証にはパスワードかプライベートキーのいずれかを使用し、両方を同時に使わないでください。ここで設定しない場合は/etc/odbc.iniの認証情報を参照します。

    • Private Key Path:ODBC経由でSnowflake認証に使用するRSAプライベートキーの絶対パス。クラスター内の全ノードで共通のパスである必要があります。例:/etc/emqx/certs/snowflake_rsa_key.private.pem

    • Private Key Password:プライベートキーが暗号化されている場合の復号パスワード。暗号化なし(OpenSSLの-nocryptオプション)で生成した場合は空欄。

    • Proxy:HTTPプロキシ経由でSnowflakeに接続する設定。HTTPSプロキシはサポートされていません。デフォルトはプロキシなし。プロキシを使う場合はEnable Proxyをオンにし、以下を入力:

      • Proxy Host:プロキシサーバーのホスト名またはIPアドレス

      • Proxy Port:プロキシサーバーのポート番号

  6. 暗号化接続を確立したい場合は、Enable TLSのトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。ストリーミングモードではHTTPS通信のためTLS必須です。

  7. 高度な設定(任意):Advanced Settingsを参照してください。

  8. Createをクリックする前に、Test ConnectivityでSnowflakeへの接続テストが可能です。

  9. Createボタンをクリックしてコネクター作成を完了します。

これでコネクターの作成が完了し、ルールとSinkを作成してSnowflakeへのデータ書き込みを指定できます。

ストリーミングモード用Snowflakeコネクターの作成

Snowflake Sinkでストリーミングアップロードモードを使用する場合、Snowflake環境との接続を確立するためにSnowflakeストリーミングコネクターを作成します。このコネクターはHTTPSおよびSnowpipe Streaming REST APIを使用します。

  1. ダッシュボードの Integration -> Connector ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. コネクタータイプで Snowflake Streaming を選択し、次へ進みます。

  4. コネクター名を入力します。英数字の組み合わせで、ここでは my-snowflake-streaming とします。

  5. 接続情報を入力します。

    • Server Host:SnowflakeのエンドポイントURL。通常は <Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com の形式で、実際にはSnowflakeインスタンス固有のサブドメインに置き換えます。

    • Account:Snowflakeの組織IDとアカウント名をハイフン(-)で区切って入力します。SnowflakeコンソールのURLに含まれています。

    • Pipe User:対象パイプの操作権限を持つSnowflakeユーザー名(例:snowpipeuser)。OPERATEおよびMONITOR権限が必要です。

    • Private Key Path:EMQXがJWT署名に使用するRSAプライベートキー。PEM形式のキー全文を文字列として貼り付けるか、file://で始まるファイルパスを指定可能。例:file:///etc/emqx/certs/snowflake_rsa_key.private.pem

    • Private Key Password:プライベートキーが暗号化されている場合の復号パスワード。暗号化なしで生成した場合は空欄。

    • Proxy:HTTPプロキシ経由でSnowflakeに接続する設定。HTTPSプロキシはサポートされていません。デフォルトはプロキシなし。プロキシを使う場合はEnable Proxyをオンにし、以下を入力:

      • Proxy Host:プロキシサーバーのホスト名またはIPアドレス

      • Proxy Port:プロキシサーバーのポート番号

  6. 暗号化接続を確立したい場合は、Enable TLSのトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。ストリーミングモードではHTTPS通信のためTLS必須です。

  7. 高度な設定(任意):Advanced Settingsを参照してください。

  8. Createをクリックする前に、Test ConnectivityでSnowflakeへの接続テストが可能です。

  9. Createボタンをクリックしてコネクター作成を完了します。

これでコネクターの作成が完了し、ルールとSinkを作成してSnowflakeへのデータ書き込みを指定できます。

Snowflake Sinkを使ったルールの作成

このセクションでは、EMQXでルールを作成し、メッセージ(例:ソースMQTTトピックt/#)を処理して、処理結果を設定済みのSnowflake Sinkを通じて書き込む方法を示します。

SQLを定義したルールの作成

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. ルールIDに my_rule を入力し、SQLエディターに以下のルールSQLを入力します:

    sql
    SELECT
      clientid,
      unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at,
      topic,
      payload
    FROM
        "t/#"

    TIP

    SQLに不慣れな場合は、SQL ExamplesEnable DebugをクリックしてルールSQLの結果を学習・テストできます。

    TIP

    Snowflake連携では、選択するフィールドがSnowflakeのテーブルの列数および名前と完全に一致することが重要です。余分なフィールドを追加したり*で全選択するのは避けてください。

  4. ルールにアクションを追加してSinkを設定します。

  5. アクション追加後、Action Outputsセクションに新規追加されたSinkが表示されます。Create RuleページのSaveボタンをクリックしてルール作成を完了します。

これでルールが正常に作成され、Rulesページで新規ルールを確認でき、**Actions (Sink)**タブに新しいSnowflake Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#のメッセージがルールmy_ruleで解析され、Snowflakeに書き込まれる流れを視覚的に確認できます。

集約アップロードモードでSnowflake Sinkを追加

このセクションでは、ルールにSinkを追加し、集約アップロードモードで処理結果をSnowflakeに書き込む方法を示します。このモードは複数のルールトリガー結果を単一ファイル(例:CSVファイル)にまとめてSnowflakeにアップロードし、ファイル数を削減して書き込み効率を向上させます。

  1. Create RuleページのAction OutputsセクションでAdd Actionをクリックし、ルールにアクションを追加します。

  2. Action TypeドロップダウンからSnowflakeを選択し、ActionはデフォルトのCreate Actionのままか、既存のSnowflakeアクションを選択します。ここでは新規Sinkを作成してルールに追加します。

  3. Sinkの名前(例:snowflake_sink)と簡単な説明を入力します。

  4. Connectorsドロップダウンから先に作成したmy-snowflakeコネクターを選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成可能です。必要な設定パラメーターは集約モード用Snowflakeコネクターの作成を参照してください。

  5. 集約アップロードモードの設定を行います。

    • Database Nametestdatabaseを入力。EMQXデータ保存用に作成したSnowflakeデータベースです。

    • Schemapublicを入力。testdatabase内のデータテーブルが存在するスキーマ名です。

    • Stageemqxを入力。Snowflakeで作成したデータアップロード前のステージ名です。

    • Pipeemqxを入力。ステージからテーブルへのロードを自動化するパイプ名です。

    • Pipe Usersnowpipeuserを入力。パイプ管理権限を持つSnowflakeユーザーです。

    • Private Key:パイプユーザーがSnowflakeパイプに安全にアクセスするためのRSAプライベートキー。以下のいずれかの形式で指定可能:

      • プレーンテキスト:PEM形式のプライベートキー全文を文字列として直接貼り付け
      • ファイルパスfile://で始まるプライベートキーファイルのパス。クラスター内の全ノードで共通かつEMQXアプリケーションユーザーがアクセス可能である必要があります。例:file:///etc/emqx/certs/snowflake_rsa_key.private.pem
    • Private Key Password:プライベートキーが暗号化されている場合の復号パスワード。OpenSSLの-nocryptオプションで生成した場合は空欄にします。

    • Aggregation Upload Format:現状はcsvのみサポート。データはカンマ区切りCSV形式でSnowflakeにステージされます。

    • Column Order:ドロップダウンから列の並び順を選択。生成されるCSVファイルは選択した列順にソートされ、未選択列はアルファベット順にソートされます。

    • Max Records:集約がトリガーされる最大レコード数。例:1000に設定すると1000件収集後にアップロードされ、時間間隔はリセットされます。

    • Time Interval:集約が発生する時間間隔(秒)。例:60に設定すると最大レコード数に達していなくても60秒ごとにアップロードされ、最大レコード数はリセットされます。

    • Proxy:HTTPプロキシ経由でSnowflakeに接続する設定。HTTPSプロキシはサポートされていません。デフォルトはプロキシなし。プロキシを使う場合はEnable Proxyをオンにし、以下を入力:

      • Proxy Host:プロキシサーバーのホスト名またはIPアドレス
      • Proxy Port:プロキシサーバーのポート番号
  6. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照してください。

  7. Advanced Settingsを展開し、必要に応じて高度な設定を行います(任意)。詳細はAdvanced Settingsを参照してください。

  8. Createをクリックする前に、Test ConnectivityでSinkがSnowflakeサーバーに接続できるかテスト可能です。

  9. CreateボタンをクリックしてSink作成を完了します。作成成功後、ページはルール作成画面に戻り、新規Sinkがルールアクションに追加されます。

ストリーミングアップロードモードでSnowflake Sinkを追加

このセクションでは、ルールにSinkを追加し、ストリーミングアップロードモードで処理結果をSnowflakeに書き込む方法を示します。このモードはSnowpipe Streaming APIを使ったリアルタイム取り込みを可能にします。

  1. Create RuleページのAction OutputsセクションでAdd Actionをクリックし、ルールにアクションを追加します。

  2. Action TypeドロップダウンからSnowflake Streamingを選択し、ActionはデフォルトのCreate Actionのままか、既存のSnowflakeアクションを選択します。ここでは新規Sinkを作成してルールに追加します。

  3. Sinkの名前(例:snowflake_sink_streaming)と簡単な説明を入力します。

  4. コネクタードロップダウンから先に作成したmy-snowflake-streamingコネクターを選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成可能です。必要な設定パラメーターはストリーミングモード用Snowflakeコネクターの作成を参照してください。

  5. ストリーミングアップロードモードの設定を行います。

    • Database Nametestdatabaseを入力。EMQXデータ保存用に作成したSnowflakeデータベースです。
    • Schemapublicを入力。testdatabase内のデータテーブルが存在するスキーマ名です。
    • Pipeemqxstreamingを入力。SQL文で作成したSnowflakeストリーミングパイプの名前で、Snowflakeで定義した名前と完全に一致する必要があります。
    • HTTP Pipelining:レスポンスを待たずに送信可能な最大HTTPリクエスト数。デフォルトは100
    • Connect Timeout:Snowflakeへの接続確立タイムアウト秒数。デフォルトは15秒。
    • Connection Pool Size:このSinkがSnowflakeに対して維持可能な最大同時接続数。デフォルトは8
    • Max Inactive:アイドル状態の接続が閉じられるまでの最大時間(秒)。デフォルトは10秒。
  6. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照してください。

  7. Advanced Settingsを展開し、必要に応じて高度な設定を行います(任意)。詳細はAdvanced Settingsを参照してください。

  8. Createをクリックする前に、Test ConnectivityでSinkがSnowflakeサーバーに接続できるかテスト可能です。

  9. CreateボタンをクリックしてSink作成を完了します。作成成功後、ページはルール作成画面に戻り、新規Sinkがルールアクションに追加されます。

ルールのテスト

このセクションでは、設定したルールのテスト方法を示します。

テストメッセージのパブリッシュ

MQTTXを使ってトピックt/1にメッセージをパブリッシュします:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Snowflake" }'

複数回繰り返してテストメッセージを生成してください。

Snowflake内のデータ確認

テストメッセージ送信後、Snowflakeに正常にデータが書き込まれたかをSnowflakeインスタンスにアクセスしてターゲットテーブルをクエリし確認します。

  1. SnowflakeのWebインターフェースを開き、認証情報でSnowflakeコンソールにログインします。

  2. Snowflakeコンソールで以下のSQLを実行し、ルールで書き込まれたemqxテーブルのデータを表示します:

    SELECT * FROM testdatabase.public.emqx;

    これにより、emqxテーブルにアップロードされたすべてのレコード(clientidtopicpayloadpublish_received_atフィールドを含む)が表示されます。

  3. 送信したテストメッセージ(例:{ "msg": "Hello Snowflake" })やトピック、タイムスタンプなどのメタデータが確認できるはずです。

高度な設定

このセクションでは、Snowflake Sinkの高度な設定オプションについて説明します。ダッシュボードでSinkを設定する際、Advanced Settingsを展開して以下のパラメーターをニーズに応じて調整できます。

項目名説明デフォルト値
Buffer Pool SizeEMQXとSnowflake間のデータフローを管理するバッファワーカーの数を指定します。これらのワーカーはデータを一時的に保持・処理し、ターゲットサービスへの送信を最適化しスムーズなデータ伝送を保証します。16
Request TTLバッファに入ったリクエストが有効とみなされる最大秒数です。リクエストがこのTTLを超えてバッファに滞留するか、送信後にSnowflakeから適時の応答やアックを受け取れなかった場合、リクエストは期限切れとみなされます。45
Health Check IntervalSinkがSnowflakeとの接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
Health Check Interval Jitterヘルスチェック間隔に加える一様ランダム遅延(ミリ秒)です。複数のノードが同時にヘルスチェックを開始する確率を減らします。複数のアクションやソースが同一コネクターを共有する場合に有効です。0
Health Check TimeoutSinkがSnowflakeとの接続状態をヘルスチェックする際のタイムアウト秒数を指定します。60
Max Buffer Queue SizeSnowflake Sinkの各バッファワーカーがバッファリング可能な最大バイト数です。バッファワーカーはデータを一時的に保持し、Snowflakeへの送信を効率化します。システム性能やデータ伝送要件に応じて調整可能です。256 MB
Query Mode同期(synchronous)または非同期(asynchronous)のリクエストモードを選択し、メッセージ伝送を最適化します。非同期モードではSnowflakeへの書き込みがMQTTメッセージパブリッシュをブロックしませんが、クライアントがSnowflake到達前にメッセージを受信する可能性があります。Asynchronous
Batch SizeEMQXからSnowflakeへ一度に転送するデータバッチの最大サイズを指定します。サイズ調整によりデータ転送の効率と性能を微調整可能です。
Batch Size1に設定すると、データはバッチ化されず個別に送信されます。
100
Inflight Window送信済みだが応答やアックをまだ受け取っていない「インフライト」キューリクエストの最大数を制御します。
Request Modeasynchronousの場合、このパラメーターが重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合は1に設定してください。
100