Skip to content

SnowflakeへのMQTTデータ取り込み

Snowflake は、クラウドベースのデータプラットフォームであり、高いスケーラビリティと柔軟性を備えたデータウェアハウジング、分析、セキュアなデータ共有のソリューションを提供します。構造化データおよび半構造化データの処理に優れ、大量のデータを高速なクエリ性能で保存し、さまざまなツールやサービスとのシームレスな統合を実現します。

本ページでは、EMQXとSnowflake間のデータ統合について詳しく紹介し、ルールおよびSinkの作成方法について実践的なガイダンスを提供します。

動作概要

EMQXにおけるSnowflakeデータ統合は、複雑なビジネス開発に容易に対応できる即利用可能な機能です。典型的なIoTアプリケーションでは、EMQXがデバイス接続およびメッセージ送受信を担うIoTプラットフォームとして機能し、Snowflakeはメッセージデータの取り込み、保存、分析を担当するデータストレージおよび処理プラットフォームとして利用されます。

snowflake-architecture

EMQXはルールエンジンとSinkを利用してデバイスイベントやデータをSnowflakeに転送します。エンドユーザーやアプリケーションはSnowflakeのテーブルに格納されたデータにアクセス可能です。具体的なワークフローは以下の通りです。

  1. デバイスのEMQXへの接続:IoTデバイスはMQTTプロトコルで正常に接続されるとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレス、その他のプロパティ情報が含まれます。
  2. デバイスメッセージのパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信し、ルールエンジン内で比較処理を行います。
  3. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールにマッチしたメッセージやイベントに対し、データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を実施します。
  4. Snowflakeへの書き込み:ルールはSnowflakeのStageにメッセージを書き込み、その後Snowflakeテーブルにロードするアクションをトリガーします。

イベントおよびメッセージデータがSnowflakeに書き込まれた後は、以下のようなビジネスおよび技術用途に利用可能です。

  • データアーカイブ:IoTデータをSnowflakeに安全に長期保存し、コンプライアンスや履歴データの利用を保証します。
  • データ分析:Snowflakeのデータウェアハウジングおよび分析機能を活用し、リアルタイムまたはバッチ分析を実施。予知保全、運用インサイト、デバイス性能評価などを可能にします。

特長とメリット

EMQXのSnowflakeデータ統合を利用することで、以下の特長とメリットが得られます。

  • メッセージ変換:Snowflakeに書き込む前に、EMQXルール内でメッセージの高度な処理や変換が可能で、後続の保存や利用を容易にします。
  • 柔軟なデータ操作:Snowflake Sinkは書き込むフィールドを選択可能で、ビジネスニーズに応じた効率的かつ動的なストレージ構成を実現します。
  • 統合されたビジネスプロセス:Snowflake Sinkにより、デバイスデータをSnowflakeの豊富なエコシステムアプリケーションと組み合わせ、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
  • 低コストの長期保存:Snowflakeのスケーラブルなストレージ基盤は、従来のデータベースに比べ低コストでの長期データ保持に最適で、大量のIoTデータ保存に適しています。

これらの特長により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に貢献できます。

はじめる前に

ここでは、EMQXでSnowflake Sinkを作成する前に必要な準備について説明します。

前提条件

Snowflake ODBCドライバーの初期化

EMQXがSnowflakeと通信し効率的にデータ転送を行うために、Snowflake Open Database Connectivity(ODBC)ドライバーのインストールと設定が必要です。これは通信の橋渡し役となり、データの適切なフォーマット化、認証、転送を保証します。

詳細は公式のODBC Driverページおよびライセンス契約を参照してください。

Linux

以下のスクリプトを実行してSnowflake ODBCドライバーをインストールし、odbc.iniファイルを設定します。

scripts/install-snowflake-driver.sh

注意

このスクリプトはテスト用であり、本番環境でのODBCドライバー設定方法の推奨ではありません。公式のLinux向けインストール手順を参照してください。

macOS

macOSでSnowflake ODBCドライバーをインストールおよび設定する手順は以下の通りです。

  1. unixODBCをインストール(例):

    brew install unixodbc
  2. iODBCのダウンロードとインストール

  3. Snowflake ODBCドライバーのダウンロードとインストール

  4. 詳細なインストールおよび設定手順はmacOS向けODBCドライバーのインストールと設定を参照。

  5. インストール後、以下の設定ファイルを更新:

    • Snowflake ODBCドライバーの権限と設定を更新:

      bash
      chown $(id -u):$(id -g) /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
      echo 'ODBCInstLib=libiodbcinst.dylib' >> /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
    • ~/.odbc.iniファイルを作成または更新し、ODBC接続を設定:

      cat << EOF > ~/.odbc.ini
      [ODBC]
      Trace=no
      TraceFile=
      
      [ODBC Drivers]
      Snowflake = Installed
      
      [ODBC Data Sources]
      snowflake = Snowflake
      
      [Snowflake]
      Driver = /opt/snowflake/snowflakeodbc/lib/universal/libSnowflake.dylib
      EOF

ユーザーアカウントとデータベースの作成

Snowflake ODBCドライバーのインストール後、データ取り込み用のユーザーアカウント、データベース、および関連リソースを設定する必要があります。以下の認証情報は後でEMQXのコネクターおよびSink設定時に使用します。

項目
Data Source Name (DSN)snowflake
ユーザー名snowpipeuser
パスワードSnowpipeuser99
データベース名testdatabase
スキーマpublic
ステージemqx
パイプemqx
パイプユーザーsnowpipeuser
プライベートキーfile://<path to snowflake_rsa_key.private.pem>

RSA鍵ペアの生成

Snowflakeへの安全な接続のため、以下のコマンドでRSA鍵ペアを生成します。

bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem

詳細は鍵ペア認証と鍵ペアローテーションを参照してください。

SQLを使ったSnowflakeリソースのセットアップ

ODBCドライバーの設定とRSA鍵ペアの生成後、Snowflakeのデータベース、テーブル、ステージ、パイプをSQLコマンドで作成します。

  1. SnowflakeコンソールのSQLワークシートで以下のSQLを実行し、データベース、テーブル、ステージ、パイプを作成します。

    sql
    USE ROLE accountadmin;
    
    CREATE DATABASE IF NOT EXISTS testdatabase;
    
    CREATE OR REPLACE TABLE testdatabase.public.emqx (
        clientid STRING,
        topic STRING,
        payload STRING,
        publish_received_at TIMESTAMP_LTZ
    );
    
    CREATE STAGE IF NOT EXISTS testdatabase.public.emqx
    FILE_FORMAT = (TYPE = CSV PARSE_HEADER = TRUE FIELD_OPTIONALLY_ENCLOSED_BY = '"')
    COPY_OPTIONS = (ON_ERROR = CONTINUE PURGE = TRUE);
    
    CREATE PIPE IF NOT EXISTS testdatabase.public.emqx AS
    COPY INTO testdatabase.public.emqx
    FROM @testdatabase.public.emqx
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
  2. 新規ユーザーを作成し、そのユーザーにRSA公開鍵を設定します。

    sql
    CREATE USER IF NOT EXISTS snowpipeuser
        PASSWORD = 'Snowpipeuser99'
        MUST_CHANGE_PASSWORD = FALSE;
    
    ALTER USER snowpipeuser SET RSA_PUBLIC_KEY = '
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_1>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_2>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_3>
    <YOUR_PUBLIC_KEY_CONTENTS_LINE_4>
    ';

    TIP

    PEMファイルの-----BEGIN PUBLIC KEY-----および-----END PUBLIC KEY-----の行は削除し、残りの内容は改行を保持して含めてください。

  3. 必要なロールを作成し、ユーザーに割り当てます。

    sql
    CREATE OR REPLACE ROLE snowpipe;
    
    GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
    GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
    GRANT INSERT, SELECT ON testdatabase.public.emqx TO ROLE snowpipe;
    GRANT READ, WRITE ON STAGE testdatabase.public.emqx TO ROLE snowpipe;
    GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqx TO ROLE snowpipe;
    GRANT ROLE snowpipe TO USER snowpipeuser;
    ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;

コネクターの作成

Snowflake Sinkを追加する前に、EMQXでSnowflakeとの接続を確立するためのコネクターを作成する必要があります。

  1. ダッシュボードの Integration -> Connector ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. コネクタータイプとして Snowflake を選択し、次へ進みます。

  4. コネクター名を入力します。英数字の組み合わせで、ここでは my-snowflake と入力します。

  5. 接続情報を入力します。

    • Account:Snowflakeの組織IDとアカウント名をハイフン(-)で区切って入力します。これはSnowflakeプラットフォームのURLの一部で、Snowflakeコンソールで確認可能です。

    • Server Host:SnowflakeのエンドポイントURLで、通常は <Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com の形式です。<Your Snowflake Organization ID>-<Your Snowflake Account Name> はご自身のSnowflakeインスタンス固有のサブドメインに置き換えてください。

    • Data Source Name (DSN):ODBCドライバー設定時に.odbc.iniで設定したsnowflakeを入力します。

    • Username:前述の設定で作成したsnowpipeuserを入力します。

    • Password:ODBC経由でユーザー名/パスワード認証を行う場合のパスワード。任意入力です。

      • ここにパスワード(例:Snowpipeuser99)を入力するか、
      • /etc/odbc.iniで設定するか、
      • キーペア認証を使用する場合は空欄のままにします。

      TIP

      認証にはパスワードかプライベートキーのいずれかを使用し、両方は使用しません。どちらも設定しない場合は、適切な認証情報が/etc/odbc.iniに設定されていることを確認してください。

    • Private Key Path:ODBC経由でキーペア認証を行うためのプライベートRSA鍵ファイルの絶対パス。クラスタの全ノードで同一パスである必要があります。file://で始まる形式で指定します(例:file:///etc/emqx/certs/snowflake_rsa_key.private.pem)。

    • Private Key Password:プライベートRSA鍵ファイルが暗号化されている場合の復号パスワード。OpenSSLの-nocryptオプションで生成した鍵は空欄のままにします。

    • Proxy:HTTPプロキシサーバー経由でSnowflakeに接続する場合の設定。HTTPSプロキシはサポートしていません。デフォルトはプロキシなし。プロキシを有効にする場合はEnable Proxyを選択し、以下を入力します。

      • Proxy Host:プロキシサーバーのホスト名またはIPアドレス。
      • Proxy Port:プロキシサーバーのポート番号。
  6. 暗号化接続を確立する場合は、Enable TLS トグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。

  7. 詳細設定(任意):Advanced Settingsを参照してください。

  8. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがSnowflakeに接続できるかテスト可能です。

  9. 最後に、Createボタンをクリックしてコネクター作成を完了します。

これでコネクターの作成が完了し、次にルールとSinkを作成してSnowflakeへのデータ書き込み方法を指定できます。

Snowflake Sinkを使ったルールの作成

このセクションでは、EMQXでソースMQTTトピック t/# からのメッセージを処理し、処理結果を設定済みのSnowflake Sink経由でSnowflakeに書き込むルールの作成方法を示します。

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. ルールIDに my_rule を入力し、SQLエディターに以下のルールSQLを入力します。

    sql
    SELECT
      clientid,
      unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at,
      topic,
      payload
    FROM
        "t/#"

    TIP

    SQLに不慣れな場合は、SQL ExamplesEnable DebugをクリックしてルールSQLの学習やテストが可能です。

    TIP

    Snowflake連携では、選択するフィールドはSnowflakeで定義したテーブルのカラム数および名前と完全に一致させる必要があります。余分なフィールドを追加したり、*で全選択することは避けてください。

  4. アクションを追加し、Action TypeドロップダウンからSnowflakeを選択します。アクションドロップダウンはデフォルトのcreate actionのままにするか、既存のSnowflakeアクションを選択します。ここでは新規Sinkを作成し、ルールに追加します。

  5. Sinkの名前(例:snowflake_sink)と簡単な説明を入力します。

  6. 先ほど作成したmy-snowflakeコネクターをコネクタードロップダウンから選択します。ドロップダウン横の作成ボタンをクリックして新規コネクターをポップアップで素早く作成することも可能です。必要な設定パラメータはコネクターの作成を参照してください。

  7. 以下の設定を行います。

    • Database Nametestdatabase。EMQXデータ保存用に作成したSnowflakeデータベース名。
    • Schemapublictestdatabase内のデータテーブルが存在するスキーマ名。
    • Stageemqx。Snowflakeで作成した、テーブルにロードする前のデータを保持するステージ名。
    • Pipeemqx。ステージからテーブルへのロード処理を自動化するパイプ名。
    • Pipe Usersnowpipeuser。パイプ管理権限を持つSnowflakeユーザー名。
    • Private Key:プライベートRSA鍵のパス(例:file://<path to snowflake_rsa_key.private.pem>)またはRSAプライベート鍵ファイルの内容。安全な認証に必要で、パイプへの安全なアクセスに使用します。ファイルパス指定の場合はクラスター全ノードで同一かつEMQXアプリケーションユーザーがアクセス可能である必要があります。
  8. Upload Modeを選択します。現在はAggregated Uploadのみサポート。複数のルールトリガー結果を1つのファイル(例:CSV)にまとめてSnowflakeにアップロードし、ファイル数を減らして書き込み効率を向上させます。

  9. Aggregation Typeを選択します。現在はcsvのみサポート。データはカンマ区切りCSV形式でSnowflakeにステージングされます。

    • Column Order:ドロップダウンから列の並び順を選択します。生成されるCSVファイルは選択した列の順に並び、未選択列はアルファベット順に並びます。

    • Max Records:集約をトリガーする最大レコード数を設定します。例えば1000に設定すると、1000件のレコード収集後にアップロードが行われ、時間間隔がリセットされます。

    • Time Interval:集約を行う時間間隔(秒)を設定します。例えば60に設定すると、最大レコード数に達していなくても60秒ごとにデータがアップロードされ、最大レコード数がリセットされます。

  10. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。プライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。

  11. 詳細設定を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細はAdvanced Settingsを参照してください。

  12. 残りの設定はデフォルト値のままにし、CreateボタンをクリックしてSink作成を完了します。作成成功後はルール作成画面に戻り、新規Sinkがルールアクションに追加されます。

  13. ルール作成画面でCreateをクリックし、ルール作成全体を完了します。

これでルールの作成が完了しました。Rulesページで新規作成したルールを確認でき、**Actions (Sink)**タブで新しいSnowflake Sinkも確認可能です。

また、Integration -> Flow Designerをクリックするとトポロジーを視覚的に確認できます。トポロジーは、トピックt/#配下のメッセージがルールmy_ruleで解析され、Snowflakeに書き込まれる流れを示します。

ルールのテスト

ここでは設定したルールのテスト方法を説明します。

テストメッセージのパブリッシュ

MQTTXを使い、トピックt/1にメッセージをパブリッシュします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Snowflake" }'

この操作を数回繰り返し、複数のテストメッセージを生成してください。

Snowflake内のデータ確認

テストメッセージ送信後、Snowflakeにデータが正常に書き込まれているかを確認します。

  1. SnowflakeのWebインターフェースにアクセスし、認証情報でSnowflakeコンソールにログインします。

  2. コンソールで以下のSQLクエリを実行し、ルールで書き込まれたemqxテーブルのデータを表示します。

    SELECT * FROM testdatabase.public.emqx;

    これにより、emqxテーブルにアップロードされたすべてのレコード(clientidtopicpayloadpublish_received_atフィールドを含む)が表示されます。

  3. 送信したテストメッセージ(例:{ "msg": "Hello Snowflake" })や、トピック、タイムスタンプなどのメタデータが確認できるはずです。

詳細設定

このセクションでは、Snowflake Sinkの詳細設定オプションについて説明します。ダッシュボードのSink設定画面でAdvanced Settingsを展開し、用途に応じて以下のパラメータを調整可能です。

項目名説明デフォルト値
Max Retriesアップロード失敗時の最大リトライ回数を設定します。例:3で3回までリトライ可能。3
Buffer Pool SizeEMQXとSnowflake間のデータフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータを一時的に保持・処理し、性能最適化とスムーズなデータ送信を支えます。16
Request TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。TTLを超えてバッファ内に滞留、または送信後にSnowflakeからの応答やアックが得られない場合、リクエストは期限切れと判断されます。
Health Check IntervalSnowflakeとの接続状態をSinkが自動的にチェックする間隔(秒)を指定します。15
Max Buffer Queue SizeSnowflake Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。ワーカーはデータを一時保持し、効率的なデータストリーム処理を実現します。システム性能やデータ転送要件に応じて調整してください。256
Query Modeリクエストモードをsynchronousまたはasynchronousから選択し、メッセージ送信を最適化します。非同期モードではSnowflakeへの書き込みがMQTTメッセージパブリッシュをブロックしませんが、クライアントがSnowflake到達前にメッセージを受信する可能性があります。Asynchronous
Batch SizeEMQXからSnowflakeへ一度に転送するデータバッチの最大サイズを指定します。サイズ調整により転送効率や性能を最適化可能です。
1に設定すると、データはバッチ化せず個別に送信されます。
1
Inflight Window送信済みだが応答やアックをまだ受け取っていない「インフライト」キューリクエストの最大数を制御します。
Request Modeasynchronousの場合、この設定は特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は1に設定してください。
100
Connect TimeoutSnowflakeへの接続確立を待つ最大時間(秒)を指定します。例:30秒。接続できない場合はリトライ(Max Retriesに基づく)またはエラーを返します。ネットワーク遅延や接続信頼性管理に有用です。15
HTTP Pipelining応答待ち前に送信可能なHTTPリクエストの最大数を指定します。100
Connection Pool SizeEMQXがSnowflakeに同時に維持可能な接続数を定義します。大きいほど高負荷時の同時リクエスト数が増えますが、システムリソース消費も増加します。8