QuasarDBへのMQTTデータ取り込み
QuasarDBは、大量のタイムスタンプ付きデータの保存とクエリに特化した高性能なカラム指向時系列データベースです。EMQXはQuasarDBとの連携をサポートしており、MQTTメッセージやクライアントイベントをQuasarDBに保存できます。これにより、IoTテレメトリの管理や分析のためのデータパイプラインや分析プロセスの構築が容易になります。
本ページでは、EMQXとQuasarDB間のデータ統合について詳細に解説し、データ統合の作成と検証方法を実践的に説明します。
動作概要
QuasarDBデータ統合はEMQXに標準搭載された機能であり、EMQXのデバイス接続およびメッセージ送信機能とQuasarDBの高性能時系列ストレージを組み合わせています。組み込みのルールエンジンコンポーネントとSinkを利用して、MQTTメッセージやクライアントイベントをQuasarDBに保存できます。この統合により、EMQXからQuasarDBへのデータ取り込みが簡素化され、複雑なコーディングを必要としません。
以下の図は、EMQXとQuasarDB間の典型的なデータ統合アーキテクチャを示しています:

MQTTデータをQuasarDBに取り込む流れは以下の通りです:
- メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、リアルタイムのMQTTデータをEMQXにパブリッシュします。EMQXがメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過しEMQXで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、QuasarDBにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの処理が適用されます。
- QuasarDBへのデータ取り込み:ルールがトリガーされると、メッセージのQuasarDBへの書き込みが開始されます。SQLテンプレートを用いてルール処理結果からデータを抽出し、QuasarDBに送信するSQLを構築して実行します。これにより、メッセージの特定フィールドを対応するテーブルに書き込めます。
- データの保存と活用:データがQuasarDBに保存されることで、企業はQuasarDBの時系列クエリ機能を活用し、分析、監視、運用用途に利用できます。
特長と利点
QuasarDBとのデータ統合は以下のような特長と利点を提供します:
- リアルタイムデータストリーミング:EMQXはリアルタイムデータストリーム処理に最適化されており、ソースシステムからQuasarDBへの効率的かつ信頼性の高いデータ伝送を実現します。即時のインサイトやアクションが必要なユースケースに最適です。
- 高性能な時系列ストレージ:QuasarDBのカラム型エンジンは時系列ワークロードに最適化されており、大量のタイムスタンプ付きデータに対して高速な取り込みスループットと効率的な範囲クエリを提供します。
- 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを備えており、QuasarDBに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、拡充など多様な変換機能をサポートします。
- バッチ処理対応:QuasarDB Sinkはバッチ書き込みをサポートしており、ラウンドトリップ回数を削減し、全体の取り込みスループットを向上させます。
はじめる前に
このセクションでは、QuasarDBデータ統合を作成する前に必要な準備について説明します。ODBCドライバーの設定やQuasarDBのインストール方法も含みます。
前提条件
ODBCドライバーのインストールと設定
QuasarDBコネクターはODBCを使用してデータベースに接続します。EMQXが動作するホストにQuasarDB ODBCドライバーをインストールし、設定する必要があります。
完全なインストール手順はQuasarDB ODBCドキュメントを参照してください。以下はDebian系システムでドライバー3.14.1を使用した典型的なセットアップ例です。
QuasarDB C APIパッケージとODBCドライバーをダウンロードしインストールします:
bashcurl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/c/qdb-api_3.14.1.deb curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/odbc/qdb-3.14.1-linux-64bit-odbc-driver.tar.gz apt-get install -yqq ./qdb-api_3.14.1.deb tar -C /tmp/qdb_odbc_driver -xf qdb-3.14.1-linux-64bit-odbc-driver.tar.gz/etc/odbcinst.iniにドライバーを登録します:ini[qdb_odbc_driver] Description=Quasardb ODBC Driver Driver=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so Setup=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so/etc/odbc.iniにデータソース名(DSN)を作成します:ini[qdb] Driver = qdb_odbc_driver Description = QuasarDB ODBC Data Source #URI = qdb://172.100.239.30:2836 #UID = user_name #PWD = user_key #KEY = cluster_public_key
ここで設定したDSN名(例:qdb)は、コネクター作成時のODBC Data Source Nameフィールドに入力します。
QuasarDBのインストールと接続
このセクションでは、Dockerを使ってQuasarDBインスタンスを起動する方法を説明します。
QuasarDBのDockerイメージをプルして起動します:
bashdocker run -d --name qdb \ -p 2836:2836 \ bureau14/qdb:3.14.1TIP
QuasarDBはホスト名ではなくIPアドレスでの接続を要求します。URIには
127.0.0.1(または実際のホストIP)を使用してください。ホスト名ベースの接続はサポートされていません。QuasarDBシェルで接続し、インスタンスが起動していることを確認します:
bashdocker run -it --rm bureau14/qdbsh --cluster qdb://127.0.0.1:2836
ユーザー認証やクラスターキー認証を有効にする場合は、QuasarDBセキュリティドキュメントを参照してください。
テーブルの作成
取り込んだデータを格納するため、QuasarDBにテーブルを作成します。以下は温度と湿度のデータを保存するテーブルの例です:
CREATE TABLE temp_hum (temp DOUBLE, hum DOUBLE);TIP
QuasarDBのテーブルには暗黙の$timestampインデックス列が常に含まれます。テーブル作成時に宣言する必要はありませんが、INSERT文で参照可能です。
コネクターの作成
このセクションでは、EMQXとQuasarDBを接続するコネクターの作成方法を説明します。
EMQXダッシュボードで、Integration -> Connectorsをクリックします。
画面右上のCreateをクリックします。
Create ConnectorページでQuasarDBを選択し、Nextをクリックします。
コネクター名を入力します。英数字の組み合わせで、例として
my_quasardbなどを指定します。接続情報を設定します:
- Server URI:QuasarDBクラスターのURIをIPアドレスで入力します。例:
qdb://127.0.0.1:2836 - ODBC Data Source Name:
/etc/odbc.iniで定義したDSN名を入力します。例:qdb - Username:ユーザー名があれば入力します。
- Password:ユーザーの秘密鍵があれば入力します。
- Cluster Public Key:クラスター公開鍵があれば入力します。
- Server URI:QuasarDBクラスターのURIをIPアドレスで入力します。例:
詳細設定(任意):詳細は高度な設定を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてEMQXがQuasarDBに接続できるか確認できます。
Createをクリックしてコネクターの作成を完了します。作成成功ダイアログが表示され、ルールを今すぐ作成するか尋ねられます。Create Ruleをクリックすると、コネクターが事前選択された状態でルール作成画面に進みます。Back To Connector Listをクリックすると戻って後でルールを作成できます。
QuasarDB Sinkを使ったルールの作成
このセクションでは、ソースMQTTトピックt/#からのメッセージを処理し、QuasarDBのtemp_humテーブルに保存するルールの作成方法を説明します。
もし前のステップでCreate Ruleをクリックしていれば、Add Actionパネルが自動で開き、Type of Actionが
QuasarDB、コネクターが事前選択されています。ステップ5へ進んでください。そうでなければ、EMQXダッシュボードでIntegration -> Rulesをクリックし、右上のCreateをクリック、続いて**+ Add Action**をクリックします。
左側のSQL EditorにルールIDと以下のSQLを入力し、トピック
t/#のメッセージにマッチさせます:※独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドが
SELECT句に含まれていることを確認してください。sqlSELECT * FROM "t/#"TIP
初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことを推奨します。
右側のAdd Actionパネルで、Type of Actionドロップダウンから
QuasarDBを選択します。ActionはデフォルトのCreate Actionのままにします。Connectorsドロップダウンから、先ほど作成した
my_quasardbコネクターを選択します。新しいコネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメーターはコネクターの作成を参照してください。Sinkの名前と任意の説明を入力します。
QuasarDBへの書き込み方法を定義するSQL Templateを設定します。
注意
SQLテンプレートはINSERT文のみ受け付けます。UPDATEやDELETEなど他の文はサポートされていません。
SQLテンプレートは
${clientid}などのプレースホルダー変数をサポートします。QuasarDBは暗黙のタイムスタンプインデックス列として$timestampを使用し、now()で現在のサーバー時刻を挿入できます。注意
QuasarDB ODBCドライバーはプリペアドステートメントをサポートしていません。
STRINGまたはBLOB型に解決される値は、SQLテンプレート内で手動でシングルクォート(')で囲む必要があります。sqlinsert into temp_hum($timestamp, temp, hum) values (now(), ${.temp}, ${.hum})フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):詳細はSinkの高度な設定を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてSinkがQuasarDBに接続できるかテストできます。
CreateをクリックしてSinkの設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認してSaveをクリックしルールを生成します。
これでルールの作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいQuasarDB Sinkが表示されます。
また、Integration -> Flow Designerをクリックするとトポロジーを確認でき、トピックt/#のメッセージがルールmy_ruleで処理された後にQuasarDBに転送されていることを検証できます。
ルールのテスト
MQTTクライアントMQTTXを使ってトピックt/1にメッセージを送信し、ルールをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "temp": "27.5", "hum": "41.8" }'QuasarDB Sinkの稼働統計を確認してください。1件の新規マッチと1件の新規送信メッセージがあるはずです。QuasarDBのtemp_humテーブルにデータが書き込まれていることを検証してください。
高度な設定
このセクションでは、QuasarDBコネクターおよびSinkの高度な設定オプションについて説明します。ダッシュボードで設定する際は、Advanced Settingsを展開し、用途に応じて以下のパラメーターを調整できます。
コネクターの高度な設定
| フィールド名 | 説明 | デフォルト値 |
|---|---|---|
| Connection Pool Size | プール内で維持される同時接続数。大きすぎるとシステムリソースを枯渇させ、小さすぎるとスループットが制限されます。 | 8 |
| Connect Timeout | QuasarDBへの接続確立時に待機する最大時間。 | 5秒 |
| Start Timeout | 自動起動したリソースが正常になるまでコネクターが待機する最大時間。 | 5秒 |
| Health Check Interval | QuasarDB接続の自動ヘルスチェックを実行する間隔。 | 15秒 |
| Health Check Timeout | 各ヘルスチェックが完了するまでの最大許容時間。 | 60秒 |
Sinkの高度な設定
| フィールド名 | 説明 | デフォルト値 |
|---|---|---|
| Buffer Pool Size | EMQXとQuasarDB間のデータフローを処理するバッファワーカープロセス数。高負荷時のスループット向上に有効。 | 16 |
| Request TTL | バッファ内でリクエストが有効な最大時間。キュー内または未アックのままTTLを超えたリクエストは破棄される。 | 45秒 |
| Health Check Interval | QuasarDB接続の自動ヘルスチェックを実行する間隔。 | 15秒 |
| Health Check Interval Jitter | 複数ノードが同時にヘルスチェックを実行しないよう、チェック間隔にランダム遅延を追加。複数のActionやSourceが同一コネクターを共有する場合に有効。 | 0ミリ秒 |
| Health Check Timeout | 各Sinkヘルスチェックが完了するまでの最大許容時間。 | 60秒 |
| Max Buffer Queue Size | バッファワーカーが保持できる最大バイト数。負荷のバーストがデフォルト容量を超える場合は増加を検討。 | 256MB |
| Batch Size | 1回の操作でQuasarDBに送信する最大レコード数。1に設定するとバッチ処理を無効化し、個別送信になる。 | 100 |
| Query Mode | asyncはQuasarDBの書き込み確認を待たずにパブリッシュを継続。syncは確認を待つ。Asyncはスループットが高いが順序が乱れる可能性あり。 | Async |
| Inflight Window | 同時に未アックのリクエスト数の最大値。Query Modeがasyncの場合、クライアントごとのメッセージ順序保証には1に設定。 | 100 |