SnowflakeへのMQTTデータ取り込み
Snowflake は、クラウドベースのデータプラットフォームであり、高いスケーラビリティと柔軟性を備えたデータウェアハウジング、分析、セキュアなデータ共有のソリューションを提供します。構造化データおよび半構造化データの処理に優れ、大量のデータを高速なクエリ性能で保存し、さまざまなツールやサービスとのシームレスな統合を実現します。
本ページでは、EMQXとSnowflake間のデータ統合について詳しく紹介し、ルールおよびSinkの作成方法について実践的なガイダンスを提供します。
動作概要
EMQXにおけるSnowflakeデータ統合は、複雑なビジネス開発に容易に対応できる即利用可能な機能です。典型的なIoTアプリケーションでは、EMQXがデバイス接続およびメッセージ送受信を担うIoTプラットフォームとして機能し、Snowflakeはメッセージデータの取り込み、保存、分析を担当するデータストレージおよび処理プラットフォームとして利用されます。
EMQXはルールエンジンとSinkを利用してデバイスイベントやデータをSnowflakeに転送します。エンドユーザーやアプリケーションはSnowflakeのテーブルに格納されたデータにアクセス可能です。具体的なワークフローは以下の通りです。
- デバイスのEMQXへの接続:IoTデバイスはMQTTプロトコルで正常に接続されるとオンラインイベントをトリガーします。このイベントにはデバイスID、送信元IPアドレス、その他のプロパティ情報が含まれます。
- デバイスメッセージのパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信し、ルールエンジン内で比較処理を行います。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールにマッチしたメッセージやイベントに対し、データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を実施します。
- Snowflakeへの書き込み:ルールはSnowflakeのStageにメッセージを書き込み、その後Snowflakeテーブルにロードするアクションをトリガーします。
イベントおよびメッセージデータがSnowflakeに書き込まれた後は、以下のようなビジネスおよび技術用途に利用可能です。
- データアーカイブ:IoTデータをSnowflakeに安全に長期保存し、コンプライアンスや履歴データの利用を保証します。
- データ分析:Snowflakeのデータウェアハウジングおよび分析機能を活用し、リアルタイムまたはバッチ分析を実施。予知保全、運用インサイト、デバイス性能評価などを可能にします。
特長とメリット
EMQXのSnowflakeデータ統合を利用することで、以下の特長とメリットが得られます。
- メッセージ変換:Snowflakeに書き込む前に、EMQXルール内でメッセージの高度な処理や変換が可能で、後続の保存や利用を容易にします。
- 柔軟なデータ操作:Snowflake Sinkは書き込むフィールドを選択可能で、ビジネスニーズに応じた効率的かつ動的なストレージ構成を実現します。
- 統合されたビジネスプロセス:Snowflake Sinkにより、デバイスデータをSnowflakeの豊富なエコシステムアプリケーションと組み合わせ、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
- 低コストの長期保存:Snowflakeのスケーラブルなストレージ基盤は、従来のデータベースに比べ低コストでの長期データ保持に最適で、大量のIoTデータ保存に適しています。
これらの特長により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に貢献できます。
はじめる前に
ここでは、EMQXでSnowflake Sinkを作成する前に必要な準備について説明します。
前提条件
Snowflake ODBCドライバーの初期化
EMQXがSnowflakeと通信し効率的にデータ転送を行うために、Snowflake Open Database Connectivity(ODBC)ドライバーのインストールと設定が必要です。これは通信の橋渡し役となり、データの適切なフォーマット化、認証、転送を保証します。
詳細は公式のODBC Driverページおよびライセンス契約を参照してください。
Linux
以下のスクリプトを実行してSnowflake ODBCドライバーをインストールし、odbc.ini
ファイルを設定します。
scripts/install-snowflake-driver.sh
注意
このスクリプトはテスト用であり、本番環境でのODBCドライバー設定方法の推奨ではありません。公式のLinux向けインストール手順を参照してください。
macOS
macOSでSnowflake ODBCドライバーをインストールおよび設定する手順は以下の通りです。
unixODBCをインストール(例):
brew install unixodbc
詳細なインストールおよび設定手順はmacOS向けODBCドライバーのインストールと設定を参照。
インストール後、以下の設定ファイルを更新:
Snowflake ODBCドライバーの権限と設定を更新:
bashchown $(id -u):$(id -g) /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini echo 'ODBCInstLib=libiodbcinst.dylib' >> /opt/snowflake/snowflakeodbc/lib/universal/simba.snowflake.ini
~/.odbc.ini
ファイルを作成または更新し、ODBC接続を設定:cat << EOF > ~/.odbc.ini [ODBC] Trace=no TraceFile= [ODBC Drivers] Snowflake = Installed [ODBC Data Sources] snowflake = Snowflake [Snowflake] Driver = /opt/snowflake/snowflakeodbc/lib/universal/libSnowflake.dylib EOF
ユーザーアカウントとデータベースの作成
Snowflake ODBCドライバーのインストール後、データ取り込み用のユーザーアカウント、データベース、および関連リソースを設定する必要があります。以下の認証情報は後でEMQXのコネクターおよびSink設定時に使用します。
項目 | 値 |
---|---|
Data Source Name (DSN) | snowflake |
ユーザー名 | snowpipeuser |
パスワード | Snowpipeuser99 |
データベース名 | testdatabase |
スキーマ | public |
ステージ | emqx |
パイプ | emqx |
パイプユーザー | snowpipeuser |
プライベートキー | file://<path to snowflake_rsa_key.private.pem> |
RSA鍵ペアの生成
Snowflakeへの安全な接続のため、以下のコマンドでRSA鍵ペアを生成します。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
詳細は鍵ペア認証と鍵ペアローテーションを参照してください。
SQLを使ったSnowflakeリソースのセットアップ
ODBCドライバーの設定とRSA鍵ペアの生成後、Snowflakeのデータベース、テーブル、ステージ、パイプをSQLコマンドで作成します。
SnowflakeコンソールのSQLワークシートで以下のSQLを実行し、データベース、テーブル、ステージ、パイプを作成します。
sqlUSE ROLE accountadmin; CREATE DATABASE IF NOT EXISTS testdatabase; CREATE OR REPLACE TABLE testdatabase.public.emqx ( clientid STRING, topic STRING, payload STRING, publish_received_at TIMESTAMP_LTZ ); CREATE STAGE IF NOT EXISTS testdatabase.public.emqx FILE_FORMAT = (TYPE = CSV PARSE_HEADER = TRUE FIELD_OPTIONALLY_ENCLOSED_BY = '"') COPY_OPTIONS = (ON_ERROR = CONTINUE PURGE = TRUE); CREATE PIPE IF NOT EXISTS testdatabase.public.emqx AS COPY INTO testdatabase.public.emqx FROM @testdatabase.public.emqx MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
新規ユーザーを作成し、そのユーザーにRSA公開鍵を設定します。
sqlCREATE USER IF NOT EXISTS snowpipeuser PASSWORD = 'Snowpipeuser99' MUST_CHANGE_PASSWORD = FALSE; ALTER USER snowpipeuser SET RSA_PUBLIC_KEY = ' <YOUR_PUBLIC_KEY_CONTENTS_LINE_1> <YOUR_PUBLIC_KEY_CONTENTS_LINE_2> <YOUR_PUBLIC_KEY_CONTENTS_LINE_3> <YOUR_PUBLIC_KEY_CONTENTS_LINE_4> ';
TIP
PEMファイルの
-----BEGIN PUBLIC KEY-----
および-----END PUBLIC KEY-----
の行は削除し、残りの内容は改行を保持して含めてください。必要なロールを作成し、ユーザーに割り当てます。
sqlCREATE OR REPLACE ROLE snowpipe; GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe; GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe; GRANT INSERT, SELECT ON testdatabase.public.emqx TO ROLE snowpipe; GRANT READ, WRITE ON STAGE testdatabase.public.emqx TO ROLE snowpipe; GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqx TO ROLE snowpipe; GRANT ROLE snowpipe TO USER snowpipeuser; ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;
コネクターの作成
Snowflake Sinkを追加する前に、EMQXでSnowflakeとの接続を確立するためのコネクターを作成する必要があります。
ダッシュボードの Integration -> Connector ページに移動します。
右上の Create ボタンをクリックします。
コネクタータイプとして Snowflake を選択し、次へ進みます。
コネクター名を入力します。英数字の組み合わせで、ここでは
my-snowflake
と入力します。接続情報を入力します。
Account:Snowflakeの組織IDとアカウント名をハイフン(
-
)で区切って入力します。これはSnowflakeプラットフォームのURLの一部で、Snowflakeコンソールで確認可能です。Server Host:SnowflakeのエンドポイントURLで、通常は
<Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com
の形式です。<Your Snowflake Organization ID>-<Your Snowflake Account Name>
はご自身のSnowflakeインスタンス固有のサブドメインに置き換えてください。Data Source Name (DSN):ODBCドライバー設定時に
.odbc.ini
で設定したsnowflake
を入力します。Username:前述の設定で作成した
snowpipeuser
を入力します。Password:ODBC経由でユーザー名/パスワード認証を行う場合のパスワード。任意入力です。
- ここにパスワード(例:
Snowpipeuser99
)を入力するか、 /etc/odbc.ini
で設定するか、- キーペア認証を使用する場合は空欄のままにします。
TIP
認証にはパスワードかプライベートキーのいずれかを使用し、両方は使用しません。どちらも設定しない場合は、適切な認証情報が
/etc/odbc.ini
に設定されていることを確認してください。- ここにパスワード(例:
Private Key Path:ODBC経由でキーペア認証を行うためのプライベートRSA鍵ファイルの絶対パス。クラスタの全ノードで同一パスである必要があります。
file://
で始まる形式で指定します(例:file:///etc/emqx/certs/snowflake_rsa_key.private.pem
)。Private Key Password:プライベートRSA鍵ファイルが暗号化されている場合の復号パスワード。OpenSSLの
-nocrypt
オプションで生成した鍵は空欄のままにします。Proxy:HTTPプロキシサーバー経由でSnowflakeに接続する場合の設定。HTTPSプロキシはサポートしていません。デフォルトはプロキシなし。プロキシを有効にする場合は
Enable Proxy
を選択し、以下を入力します。- Proxy Host:プロキシサーバーのホスト名またはIPアドレス。
- Proxy Port:プロキシサーバーのポート番号。
暗号化接続を確立する場合は、Enable TLS トグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
詳細設定(任意):Advanced Settingsを参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてコネクターがSnowflakeに接続できるかテスト可能です。
最後に、Createボタンをクリックしてコネクター作成を完了します。
これでコネクターの作成が完了し、次にルールとSinkを作成してSnowflakeへのデータ書き込み方法を指定できます。
Snowflake Sinkを使ったルールの作成
このセクションでは、EMQXでソースMQTTトピック t/#
からのメッセージを処理し、処理結果を設定済みのSnowflake Sink経由でSnowflakeに書き込むルールの作成方法を示します。
ダッシュボードの Integration -> Rules ページに移動します。
右上の Create ボタンをクリックします。
ルールIDに
my_rule
を入力し、SQLエディターに以下のルールSQLを入力します。sqlSELECT clientid, unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at, topic, payload FROM "t/#"
TIP
SQLに不慣れな場合は、SQL ExamplesやEnable DebugをクリックしてルールSQLの学習やテストが可能です。
TIP
Snowflake連携では、選択するフィールドはSnowflakeで定義したテーブルのカラム数および名前と完全に一致させる必要があります。余分なフィールドを追加したり、
*
で全選択することは避けてください。アクションを追加し、Action Typeドロップダウンから
Snowflake
を選択します。アクションドロップダウンはデフォルトのcreate action
のままにするか、既存のSnowflakeアクションを選択します。ここでは新規Sinkを作成し、ルールに追加します。Sinkの名前(例:
snowflake_sink
)と簡単な説明を入力します。先ほど作成した
my-snowflake
コネクターをコネクタードロップダウンから選択します。ドロップダウン横の作成ボタンをクリックして新規コネクターをポップアップで素早く作成することも可能です。必要な設定パラメータはコネクターの作成を参照してください。以下の設定を行います。
- Database Name:
testdatabase
。EMQXデータ保存用に作成したSnowflakeデータベース名。 - Schema:
public
。testdatabase
内のデータテーブルが存在するスキーマ名。 - Stage:
emqx
。Snowflakeで作成した、テーブルにロードする前のデータを保持するステージ名。 - Pipe:
emqx
。ステージからテーブルへのロード処理を自動化するパイプ名。 - Pipe User:
snowpipeuser
。パイプ管理権限を持つSnowflakeユーザー名。 - Private Key:プライベートRSA鍵のパス(例:
file://<path to snowflake_rsa_key.private.pem>
)またはRSAプライベート鍵ファイルの内容。安全な認証に必要で、パイプへの安全なアクセスに使用します。ファイルパス指定の場合はクラスター全ノードで同一かつEMQXアプリケーションユーザーがアクセス可能である必要があります。
- Database Name:
Upload Modeを選択します。現在は
Aggregated Upload
のみサポート。複数のルールトリガー結果を1つのファイル(例:CSV)にまとめてSnowflakeにアップロードし、ファイル数を減らして書き込み効率を向上させます。Aggregation Typeを選択します。現在は
csv
のみサポート。データはカンマ区切りCSV形式でSnowflakeにステージングされます。Column Order:ドロップダウンから列の並び順を選択します。生成されるCSVファイルは選択した列の順に並び、未選択列はアルファベット順に並びます。
Max Records:集約をトリガーする最大レコード数を設定します。例えば
1000
に設定すると、1000件のレコード収集後にアップロードが行われ、時間間隔がリセットされます。Time Interval:集約を行う時間間隔(秒)を設定します。例えば
60
に設定すると、最大レコード数に達していなくても60秒ごとにデータがアップロードされ、最大レコード数がリセットされます。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。プライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細はAdvanced Settingsを参照してください。
残りの設定はデフォルト値のままにし、CreateボタンをクリックしてSink作成を完了します。作成成功後はルール作成画面に戻り、新規Sinkがルールアクションに追加されます。
ルール作成画面でCreateをクリックし、ルール作成全体を完了します。
これでルールの作成が完了しました。Rulesページで新規作成したルールを確認でき、**Actions (Sink)**タブで新しいSnowflake Sinkも確認可能です。
また、Integration -> Flow Designerをクリックするとトポロジーを視覚的に確認できます。トポロジーは、トピックt/#
配下のメッセージがルールmy_rule
で解析され、Snowflakeに書き込まれる流れを示します。
ルールのテスト
ここでは設定したルールのテスト方法を説明します。
テストメッセージのパブリッシュ
MQTTXを使い、トピックt/1
にメッセージをパブリッシュします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Snowflake" }'
この操作を数回繰り返し、複数のテストメッセージを生成してください。
Snowflake内のデータ確認
テストメッセージ送信後、Snowflakeにデータが正常に書き込まれているかを確認します。
SnowflakeのWebインターフェースにアクセスし、認証情報でSnowflakeコンソールにログインします。
コンソールで以下のSQLクエリを実行し、ルールで書き込まれた
emqx
テーブルのデータを表示します。SELECT * FROM testdatabase.public.emqx;
これにより、
emqx
テーブルにアップロードされたすべてのレコード(clientid
、topic
、payload
、publish_received_at
フィールドを含む)が表示されます。送信したテストメッセージ(例:
{ "msg": "Hello Snowflake" }
)や、トピック、タイムスタンプなどのメタデータが確認できるはずです。
詳細設定
このセクションでは、Snowflake Sinkの詳細設定オプションについて説明します。ダッシュボードのSink設定画面でAdvanced Settingsを展開し、用途に応じて以下のパラメータを調整可能です。
項目名 | 説明 | デフォルト値 |
---|---|---|
Max Retries | アップロード失敗時の最大リトライ回数を設定します。例:3 で3回までリトライ可能。 | 3 |
Buffer Pool Size | EMQXとSnowflake間のデータフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータを一時的に保持・処理し、性能最適化とスムーズなデータ送信を支えます。 | 16 |
Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。TTLを超えてバッファ内に滞留、または送信後にSnowflakeからの応答やアックが得られない場合、リクエストは期限切れと判断されます。 | |
Health Check Interval | Snowflakeとの接続状態をSinkが自動的にチェックする間隔(秒)を指定します。 | 15 |
Max Buffer Queue Size | Snowflake Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。ワーカーはデータを一時保持し、効率的なデータストリーム処理を実現します。システム性能やデータ転送要件に応じて調整してください。 | 256 |
Query Mode | リクエストモードをsynchronous またはasynchronous から選択し、メッセージ送信を最適化します。非同期モードではSnowflakeへの書き込みがMQTTメッセージパブリッシュをブロックしませんが、クライアントがSnowflake到達前にメッセージを受信する可能性があります。 | Asynchronous |
Batch Size | EMQXからSnowflakeへ一度に転送するデータバッチの最大サイズを指定します。サイズ調整により転送効率や性能を最適化可能です。1 に設定すると、データはバッチ化せず個別に送信されます。 | 1 |
Inflight Window | 送信済みだが応答やアックをまだ受け取っていない「インフライト」キューリクエストの最大数を制御します。Request Mode がasynchronous の場合、この設定は特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は1 に設定してください。 | 100 |
Connect Timeout | Snowflakeへの接続確立を待つ最大時間(秒)を指定します。例:30 秒。接続できない場合はリトライ(Max Retriesに基づく)またはエラーを返します。ネットワーク遅延や接続信頼性管理に有用です。 | 15 |
HTTP Pipelining | 応答待ち前に送信可能なHTTPリクエストの最大数を指定します。 | 100 |
Connection Pool Size | EMQXがSnowflakeに同時に維持可能な接続数を定義します。大きいほど高負荷時の同時リクエスト数が増えますが、システムリソース消費も増加します。 | 8 |