QuasarDBへのMQTTデータ取り込み
QuasarDBは、大量のタイムスタンプ付きデータの保存とクエリに特化した高性能なカラム指向時系列データベースです。EMQXはQuasarDBとの統合をサポートしており、MQTTメッセージやクライアントイベントをQuasarDBに保存できます。これにより、IoTテレメトリの管理や分析のためのデータパイプラインや分析プロセスの構築が容易になります。
本ページでは、EMQXとQuasarDB間のデータ統合について詳細に解説し、実際の作成および検証手順を紹介します。
動作概要
QuasarDBデータ統合は、EMQXに標準搭載された機能であり、EMQXのデバイス接続およびメッセージ伝送機能とQuasarDBの高性能時系列ストレージを組み合わせています。組み込みのルールエンジンコンポーネントとSinkを通じて、MQTTメッセージやクライアントイベントをQuasarDBに保存可能です。この統合により、EMQXからQuasarDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。
以下の図は、EMQXとQuasarDB間の典型的なデータ統合アーキテクチャを示しています。

MQTTデータのQuasarDBへの取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、リアルタイムMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに従って処理されます。ルールは事前定義された条件に基づき、QuasarDBへルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
- QuasarDBへのデータ取り込み:ルールがトリガーされると、メッセージのQuasarDBへの書き込みが開始されます。SQLテンプレートを利用して、ルール処理結果からデータを抽出しSQLを構築、QuasarDBに送信して実行することで、メッセージの特定フィールドを対応するテーブルに書き込みます。
- データの保存と活用:データがQuasarDBに保存されることで、企業はその時系列クエリ機能を活用し、分析、監視、運用用途に利用できます。
特長と利点
QuasarDBとのデータ統合は以下の特長と利点を提供します:
- リアルタイムデータストリーミング:EMQXはリアルタイムデータストリームの処理に最適化されており、ソースシステムからQuasarDBへの効率的かつ信頼性の高いデータ伝送を実現します。即時の洞察やアクションが必要なユースケースに最適です。
- 高性能な時系列ストレージ:QuasarDBのカラム型エンジンは時系列ワークロードに最適化されており、高速な取り込みスループットと大量のタイムスタンプデータに対する効率的な範囲クエリを提供します。
- 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを提供し、QuasarDBに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、強化など多様なデータ変換がサポートされています。
- バッチ処理対応:QuasarDB Sinkはバッチ書き込みをサポートし、往復回数を削減して全体の取り込みスループットを向上させます。
はじめる前に
このセクションでは、QuasarDBデータ統合を作成する前に必要な準備について説明します。ODBCドライバーの設定やQuasarDBのインストール方法を含みます。
前提条件
ODBCドライバーのインストールと設定
QuasarDBコネクターはODBCを使ってデータベースに接続します。EMQXが稼働するホストにQuasarDB ODBCドライバーをインストールおよび設定してからコネクターを作成してください。
詳細なインストール手順はQuasarDB ODBCドキュメントを参照してください。以下はDebian系システムでドライバー3.14.1を使用した典型的なセットアップ例です。
QuasarDB C APIパッケージとODBCドライバーをダウンロード・インストール:
bashcurl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/c/qdb-api_3.14.1.deb curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/odbc/qdb-3.14.1-linux-64bit-odbc-driver.tar.gz apt-get install -yqq ./qdb-api_3.14.1.deb tar -C /tmp/qdb_odbc_driver -xf qdb-3.14.1-linux-64bit-odbc-driver.tar.gz/etc/odbcinst.iniにドライバーを登録:ini[qdb_odbc_driver] Description=Quasardb ODBC Driver Driver=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so Setup=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so/etc/odbc.iniにデータソース名(DSN)を作成:ini[qdb] Driver = qdb_odbc_driver Description = QuasarDB ODBC Data Source #URI = qdb://172.100.239.30:2836 #UID = user_name #PWD = user_key #KEY = cluster_public_key
ここで設定したDSN名(例:qdb)は、コネクター作成時のODBC Data Source Name欄に入力します。
QuasarDBのインストールと接続
Dockerを使ってQuasarDBインスタンスを起動する手順を示します。
QuasarDB Dockerイメージをプルして起動:
bashdocker run -d --name qdb \ -p 2836:2836 \ bureau14/qdb:3.14.1TIP
QuasarDBはホスト名ではなくIPアドレスでの接続が必要です。URIには
127.0.0.1(または実際のホストIP)を使用してください。ホスト名ベースの接続はサポートされていません。QuasarDBシェルで接続確認:
bashdocker run -it --rm bureau14/qdbsh --cluster qdb://127.0.0.1:2836
ユーザー認証やクラスターキー認証を有効にする場合は、QuasarDBセキュリティドキュメントを参照してください。
テーブルの作成
取り込んだデータを格納するためのQuasarDBテーブルを作成します。以下は温度と湿度の読み取り値を保存するテーブル作成例です。
CREATE TABLE temp_hum (temp DOUBLE, hum DOUBLE);TIP
QuasarDBのテーブルには暗黙の$timestampインデックス列が常に含まれます。テーブル作成時に宣言する必要はありませんが、INSERT文で参照可能です。
コネクターの作成
EMQXとQuasarDBを接続するコネクターの作成手順を示します。
EMQXダッシュボードで Integration -> Connectors をクリック。
画面右上の Create をクリック。
Create Connector ページで QuasarDB を選択し、Next をクリック。
コネクター名を入力します。英数字の組み合わせで、例:
my_quasardb。接続情報を設定:
- Server URI:QuasarDBクラスターのURIをIPアドレス形式で入力(例:
qdb://127.0.0.1:2836)。 - ODBC Data Source Name:
/etc/odbc.iniで定義したDSN名を入力(例:qdb)。 - Username:ユーザー名(あれば)。
- Password:ユーザーのシークレットキー(あれば)。
- Cluster Public Key:クラスター公開鍵(あれば)。
- Server URI:QuasarDBクラスターのURIをIPアドレス形式で入力(例:
詳細設定(任意):詳細設定を参照。
Createをクリックする前に、Test ConnectivityでEMQXからQuasarDBへの接続確認が可能。
Createをクリックしてコネクター作成完了。作成成功ダイアログが表示され、ルール作成を促されます。Create Ruleをクリックするとコネクターが事前選択された状態でルール作成画面に進みます。Back To Connector Listをクリックすると戻って後でルールを作成できます。
QuasarDB Sinkを使ったルールの作成
このセクションでは、ソースMQTTトピックt/#からのメッセージを処理し、QuasarDBのtemp_humテーブルに保存するルール作成手順を示します。
前のステップでCreate Ruleをクリックした場合、Add Actionパネルが自動で開き、Type of Actionが
QuasarDB、コネクターが事前選択されています。ステップ5へ進んでください。それ以外の場合は、EMQXダッシュボードで Integration -> Rules をクリックし、右上の Create をクリック、続いて + Add Action をクリック。
左側のSQL EditorにルールIDと以下のSQLを入力し、トピック
t/#のメッセージにマッチさせます。※独自SQLを指定する場合は、Sinkが必要とするフィールドが
SELECT句に含まれていることを確認してください。sqlSELECT * FROM "t/#"TIP
初心者の方はSQL Examplesをクリックし、Enable TestでSQLルールの学習とテストが可能です。
右側のAdd Actionパネルで、Type of Actionドロップダウンから
QuasarDBを選択。ActionはデフォルトのCreate Actionのまま。Connectorsドロップダウンから先ほど作成した
my_quasardbコネクターを選択。新規コネクター作成はドロップダウン横のボタンから可能。設定パラメータはコネクター作成を参照。Sinkの名前と任意の説明を入力。
SQL Templateを設定し、QuasarDBへの書き込み方法を定義。
注意
SQLテンプレートはINSERT文のみ受け付けます。UPDATEやDELETEなどの文はサポートされていません。
SQLテンプレートは
${clientid}などのプレースホルダー変数をサポート。QuasarDBは暗黙のタイムスタンプインデックス列として$timestampを使用し、now()で現在サーバー時刻を挿入可能です。注意
QuasarDB ODBCドライバーはプリペアドステートメントをサポートしていません。
STRINGまたはBLOB型に解決される値は、SQLテンプレート内で手動でシングルクォート(')で囲む必要があります。sqlinsert into temp_hum($timestamp, temp, hum) values (now(), ${.temp}, ${.hum})フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照。
詳細設定(任意):Sink詳細設定を参照。
Createをクリックする前に、Test ConnectivityでSinkからQuasarDBへの接続確認が可能。
CreateをクリックしてSink設定を完了。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認後、Saveをクリックしてルールを生成。
これでルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいQuasarDB Sinkが表示されます。
また、Integration -> Flow Designerを開くとトポロジーを確認でき、トピックt/#のメッセージがルールmy_ruleで処理された後にQuasarDBへ転送されていることを検証できます。
ルールのテスト
MQTTXを使ってトピックt/1にメッセージを送信し、ルールをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "temp": "27.5", "hum": "41.8" }'QuasarDB Sinkの稼働統計を確認してください。1件の新規マッチと1件の新規送信メッセージがあるはずです。QuasarDBのtemp_humテーブルにデータが書き込まれていることを検証してください。
詳細設定
このセクションでは、QuasarDBコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードで設定する際、Advanced Settingsを展開し、ニーズに応じて以下のパラメータを調整できます。
コネクター詳細設定
| フィールド名 | 説明 | デフォルト値 |
|---|---|---|
| Connection Pool Size | プール内で維持される同時接続数。大きすぎるとシステムリソース枯渇、小さすぎるとスループット制限に。 | 8 |
| Connect Timeout | QuasarDBへの接続確立時の最大待機時間。 | 5秒 |
| Start Timeout | 自動起動リソースが正常になるまでの最大待機時間。 | 5秒 |
| Health Check Interval | QuasarDB接続の自動ヘルスチェック実行間隔。 | 15秒 |
| Health Check Timeout | 各ヘルスチェック完了までの最大許容時間。 | 60秒 |
Sink詳細設定
| フィールド名 | 説明 | デフォルト値 |
|---|---|---|
| Buffer Pool Size | EMQXとQuasarDB間のデータフローを処理するバッファワーカー数。高負荷時のスループット向上に増加推奨。 | 16 |
| Request TTL | バッファ内でリクエストが有効な最大時間。期限切れのリクエストは破棄される。 | 45秒 |
| Health Check Interval | SinkがQuasarDB接続の自動ヘルスチェックを行う間隔。 | 15秒 |
| Health Check Interval Jitter | 複数ノードが同時にチェックしないように間隔に加えるランダム遅延。複数のActionやSourceが同一コネクターを共有する場合に有効。 | 0ミリ秒 |
| Health Check Timeout | 各Sinkヘルスチェック完了までの最大許容時間。 | 60秒 |
| Max Buffer Queue Size | 各バッファワーカーが保持可能な最大バイト数。バースト処理が多い場合は増加推奨。 | 256MB |
| Batch Size | QuasarDBに一度に送信する最大レコード数。1に設定するとバッチ処理を無効化し、個別送信に。 | 100 |
| Query Mode | asyncはQuasarDBの書き込み確認を待たずにパブリッシュを継続。syncは確認後に進行。Asyncは高スループットだが順序が乱れる可能性あり。 | Async |
| Inflight Window | 同時に未アックのリクエスト最大数。Query Modeがasync時、クライアント単位のメッセージ順序保証には1推奨。 | 100 |