Skip to content

QuasarDBへのMQTTデータ取り込み

QuasarDBは、大量のタイムスタンプ付きデータの保存とクエリに特化した高性能なカラム指向時系列データベースです。EMQXはQuasarDBとの統合をサポートしており、MQTTメッセージやクライアントイベントをQuasarDBに保存できます。これにより、IoTテレメトリの管理や分析のためのデータパイプラインや分析プロセスの構築が容易になります。

本ページでは、EMQXとQuasarDB間のデータ統合について詳細に解説し、実際の作成および検証手順を紹介します。

動作概要

QuasarDBデータ統合は、EMQXに標準搭載された機能であり、EMQXのデバイス接続およびメッセージ伝送機能とQuasarDBの高性能時系列ストレージを組み合わせています。組み込みのルールエンジンコンポーネントとSinkを通じて、MQTTメッセージやクライアントイベントをQuasarDBに保存可能です。この統合により、EMQXからQuasarDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとQuasarDB間の典型的なデータ統合アーキテクチャを示しています。

quasardb_integration

MQTTデータのQuasarDBへの取り込みは以下のように動作します:

  1. メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、リアルタイムMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに従って処理されます。ルールは事前定義された条件に基づき、QuasarDBへルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. QuasarDBへのデータ取り込み:ルールがトリガーされると、メッセージのQuasarDBへの書き込みが開始されます。SQLテンプレートを利用して、ルール処理結果からデータを抽出しSQLを構築、QuasarDBに送信して実行することで、メッセージの特定フィールドを対応するテーブルに書き込みます。
  4. データの保存と活用:データがQuasarDBに保存されることで、企業はその時系列クエリ機能を活用し、分析、監視、運用用途に利用できます。

特長と利点

QuasarDBとのデータ統合は以下の特長と利点を提供します:

  • リアルタイムデータストリーミング:EMQXはリアルタイムデータストリームの処理に最適化されており、ソースシステムからQuasarDBへの効率的かつ信頼性の高いデータ伝送を実現します。即時の洞察やアクションが必要なユースケースに最適です。
  • 高性能な時系列ストレージ:QuasarDBのカラム型エンジンは時系列ワークロードに最適化されており、高速な取り込みスループットと大量のタイムスタンプデータに対する効率的な範囲クエリを提供します。
  • 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを提供し、QuasarDBに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、強化など多様なデータ変換がサポートされています。
  • バッチ処理対応:QuasarDB Sinkはバッチ書き込みをサポートし、往復回数を削減して全体の取り込みスループットを向上させます。

はじめる前に

このセクションでは、QuasarDBデータ統合を作成する前に必要な準備について説明します。ODBCドライバーの設定やQuasarDBのインストール方法を含みます。

前提条件

ODBCドライバーのインストールと設定

QuasarDBコネクターはODBCを使ってデータベースに接続します。EMQXが稼働するホストにQuasarDB ODBCドライバーをインストールおよび設定してからコネクターを作成してください。

詳細なインストール手順はQuasarDB ODBCドキュメントを参照してください。以下はDebian系システムでドライバー3.14.1を使用した典型的なセットアップ例です。

  1. QuasarDB C APIパッケージとODBCドライバーをダウンロード・インストール:

    bash
    curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/c/qdb-api_3.14.1.deb
    curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/odbc/qdb-3.14.1-linux-64bit-odbc-driver.tar.gz
    apt-get install -yqq ./qdb-api_3.14.1.deb
    tar -C /tmp/qdb_odbc_driver -xf qdb-3.14.1-linux-64bit-odbc-driver.tar.gz
  2. /etc/odbcinst.iniにドライバーを登録:

    ini
    [qdb_odbc_driver]
    Description=Quasardb ODBC Driver
    Driver=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so
    Setup=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so
  3. /etc/odbc.iniにデータソース名(DSN)を作成:

    ini
    [qdb]
    Driver = qdb_odbc_driver
    Description = QuasarDB ODBC Data Source
    #URI = qdb://172.100.239.30:2836
    #UID = user_name
    #PWD = user_key
    #KEY = cluster_public_key

ここで設定したDSN名(例:qdb)は、コネクター作成時のODBC Data Source Name欄に入力します。

QuasarDBのインストールと接続

Dockerを使ってQuasarDBインスタンスを起動する手順を示します。

  1. QuasarDB Dockerイメージをプルして起動:

    bash
    docker run -d --name qdb \
      -p 2836:2836 \
      bureau14/qdb:3.14.1

    TIP

    QuasarDBはホスト名ではなくIPアドレスでの接続が必要です。URIには127.0.0.1(または実際のホストIP)を使用してください。ホスト名ベースの接続はサポートされていません。

  2. QuasarDBシェルで接続確認:

    bash
    docker run -it --rm bureau14/qdbsh --cluster qdb://127.0.0.1:2836

ユーザー認証やクラスターキー認証を有効にする場合は、QuasarDBセキュリティドキュメントを参照してください。

テーブルの作成

取り込んだデータを格納するためのQuasarDBテーブルを作成します。以下は温度と湿度の読み取り値を保存するテーブル作成例です。

sql
CREATE TABLE temp_hum (temp DOUBLE, hum DOUBLE);

TIP

QuasarDBのテーブルには暗黙の$timestampインデックス列が常に含まれます。テーブル作成時に宣言する必要はありませんが、INSERT文で参照可能です。

コネクターの作成

EMQXとQuasarDBを接続するコネクターの作成手順を示します。

  1. EMQXダッシュボードで Integration -> Connectors をクリック。

  2. 画面右上の Create をクリック。

  3. Create Connector ページで QuasarDB を選択し、Next をクリック。

  4. コネクター名を入力します。英数字の組み合わせで、例:my_quasardb

  5. 接続情報を設定:

    • Server URI:QuasarDBクラスターのURIをIPアドレス形式で入力(例:qdb://127.0.0.1:2836)。
    • ODBC Data Source Name/etc/odbc.iniで定義したDSN名を入力(例:qdb)。
    • Username:ユーザー名(あれば)。
    • Password:ユーザーのシークレットキー(あれば)。
    • Cluster Public Key:クラスター公開鍵(あれば)。
  6. 詳細設定(任意):詳細設定を参照。

  7. Createをクリックする前に、Test ConnectivityでEMQXからQuasarDBへの接続確認が可能。

  8. Createをクリックしてコネクター作成完了。作成成功ダイアログが表示され、ルール作成を促されます。Create Ruleをクリックするとコネクターが事前選択された状態でルール作成画面に進みます。Back To Connector Listをクリックすると戻って後でルールを作成できます。

QuasarDB Sinkを使ったルールの作成

このセクションでは、ソースMQTTトピックt/#からのメッセージを処理し、QuasarDBのtemp_humテーブルに保存するルール作成手順を示します。

  1. 前のステップでCreate Ruleをクリックした場合、Add Actionパネルが自動で開き、Type of ActionQuasarDB、コネクターが事前選択されています。ステップ5へ進んでください。

    それ以外の場合は、EMQXダッシュボードで Integration -> Rules をクリックし、右上の Create をクリック、続いて + Add Action をクリック。

  2. 左側のSQL EditorにルールIDと以下のSQLを入力し、トピックt/#のメッセージにマッチさせます。

    ※独自SQLを指定する場合は、Sinkが必要とするフィールドがSELECT句に含まれていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方はSQL Examplesをクリックし、Enable TestでSQLルールの学習とテストが可能です。

  3. 右側のAdd Actionパネルで、Type of ActionドロップダウンからQuasarDBを選択。ActionはデフォルトのCreate Actionのまま。

  4. Connectorsドロップダウンから先ほど作成したmy_quasardbコネクターを選択。新規コネクター作成はドロップダウン横のボタンから可能。設定パラメータはコネクター作成を参照。

  5. Sinkの名前と任意の説明を入力。

  6. SQL Templateを設定し、QuasarDBへの書き込み方法を定義。

    注意

    SQLテンプレートはINSERT文のみ受け付けます。UPDATEやDELETEなどの文はサポートされていません。

    SQLテンプレートは${clientid}などのプレースホルダー変数をサポート。QuasarDBは暗黙のタイムスタンプインデックス列として$timestampを使用し、now()で現在サーバー時刻を挿入可能です。

    注意

    QuasarDB ODBCドライバーはプリペアドステートメントをサポートしていません。STRINGまたはBLOB型に解決される値は、SQLテンプレート内で手動でシングルクォート(')で囲む必要があります。

    sql
    insert into temp_hum($timestamp, temp, hum)
    values (now(), ${.temp}, ${.hum})
  7. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照。

  8. 詳細設定(任意)Sink詳細設定を参照。

  9. Createをクリックする前に、Test ConnectivityでSinkからQuasarDBへの接続確認が可能。

  10. CreateをクリックしてSink設定を完了。新しいSinkがAction Outputsに追加されます。

  11. Create Ruleページに戻り、設定内容を確認後、Saveをクリックしてルールを生成。

これでルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいQuasarDB Sinkが表示されます。

また、Integration -> Flow Designerを開くとトポロジーを確認でき、トピックt/#のメッセージがルールmy_ruleで処理された後にQuasarDBへ転送されていることを検証できます。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、ルールをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "temp": "27.5", "hum": "41.8" }'

QuasarDB Sinkの稼働統計を確認してください。1件の新規マッチと1件の新規送信メッセージがあるはずです。QuasarDBのtemp_humテーブルにデータが書き込まれていることを検証してください。

詳細設定

このセクションでは、QuasarDBコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードで設定する際、Advanced Settingsを展開し、ニーズに応じて以下のパラメータを調整できます。

コネクター詳細設定

フィールド名説明デフォルト値
Connection Pool Sizeプール内で維持される同時接続数。大きすぎるとシステムリソース枯渇、小さすぎるとスループット制限に。8
Connect TimeoutQuasarDBへの接続確立時の最大待機時間。5
Start Timeout自動起動リソースが正常になるまでの最大待機時間。5
Health Check IntervalQuasarDB接続の自動ヘルスチェック実行間隔。15
Health Check Timeout各ヘルスチェック完了までの最大許容時間。60

Sink詳細設定

フィールド名説明デフォルト値
Buffer Pool SizeEMQXとQuasarDB間のデータフローを処理するバッファワーカー数。高負荷時のスループット向上に増加推奨。16
Request TTLバッファ内でリクエストが有効な最大時間。期限切れのリクエストは破棄される。45
Health Check IntervalSinkがQuasarDB接続の自動ヘルスチェックを行う間隔。15
Health Check Interval Jitter複数ノードが同時にチェックしないように間隔に加えるランダム遅延。複数のActionやSourceが同一コネクターを共有する場合に有効。0ミリ秒
Health Check Timeout各Sinkヘルスチェック完了までの最大許容時間。60
Max Buffer Queue Size各バッファワーカーが保持可能な最大バイト数。バースト処理が多い場合は増加推奨。256MB
Batch SizeQuasarDBに一度に送信する最大レコード数。1に設定するとバッチ処理を無効化し、個別送信に。100
Query ModeasyncはQuasarDBの書き込み確認を待たずにパブリッシュを継続。syncは確認後に進行。Asyncは高スループットだが順序が乱れる可能性あり。Async
Inflight Window同時に未アックのリクエスト最大数。Query Modeasync時、クライアント単位のメッセージ順序保証には1推奨。100