Skip to content

CockroachDBへのMQTTデータ取り込み

CockroachDBは、分散型でPostgreSQL互換のデータベースであり、フルマネージドクラウドサービス(CockroachDB Cloud)またはセルフホスト型のデプロイメントとして利用可能です。高いレジリエンス、水平スケーラビリティ、完全なSQL互換性を必要とするグローバルアプリケーション向けに設計されています。EMQXはCockroachDBとスムーズに統合し、IoTデバイスからのMQTTデータをリアルタイムでキャプチャして保存します。これにより、グローバル展開における高速かつ信頼性の高い取り込み、Raftベースのレプリケーションによる一貫性のあるデータ保持、そして運用および分析向けの低レイテンシな読み取りを実現します。

本ページでは、EMQXとCockroachDB間のデータ統合について包括的に紹介し、データ統合の作成および検証の実践的な手順を説明します。

動作概要

EMQXにおけるCockroachDBデータ統合は、MQTTベースのIoTデータストリームをCockroachDBの分散型PostgreSQL互換データベースに直接取り込む組み込み機能です。EMQXの組み込みルールエンジンを利用することで、複雑なカスタムコードを書くことなく、グローバルに一貫した保存とリアルタイムクエリが可能なCockroachDBへの直接取り込みが実現します。

CockroachDBの共有なし(shared-nothing)分散アーキテクチャは、複数のノードやリージョンにデータを自動的にレプリケートし、Raftベースのコンセンサスにより障害時でも強い一貫性を維持します。これにより、IoTデータは常に安全かつ同期され、利用可能な状態が保証されます。

以下の図は、EMQXとCockroachDB間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration CockroachDB

MQTTデータをCockroachDBに取り込む流れは以下の通りです。

  1. IoTデバイスがEMQXに接続:IoTデバイスがMQTTプロトコルで正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスIDや送信元IPアドレスなどの情報が含まれます。
  2. メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  3. ルールエンジンによるメッセージ処理:EMQXのルールエンジンは、トピックやメッセージ内容に基づいて定義されたルールにマッチングし、イベントやメッセージを処理します。処理内容には、データ変換(例:JSONからSQL用フォーマットへの変換)、フィルタリング、コンテキスト情報によるデータ強化などが含まれ、データベース挿入前に行われます。
  4. CockroachDBへの書き込み:マッチしたルールはCockroachDBに対するSQL実行をトリガーします。SQLテンプレートを使い、処理済みデータのフィールドをCockroachDBのテーブルやカラムにマッピングできます。CockroachDBの分散SQL実行およびベクトル化クエリエンジンにより、高スループットな書き込みと低レイテンシな分析クエリが可能です。マルチリージョン展開においては、ジオパーティションによる最適化もサポートされます。

イベントやメッセージデータがCockroachDBに書き込まれた後は、以下のような活用が可能です。

  • CockroachDBをGrafanaなどのツールに接続し、ライブのIoTメトリクスを表示するダッシュボードやチャートを作成。
  • デバイス管理プラットフォームやAI/MLモデルと連携し、ヘルスチェック、異常検知、アラートトリガーを実現。
  • CockroachDBの分散クエリエンジンを活用し、ライブのIoTデータに対する集計、結合、時系列分析を行いながら、新たなテレメトリ処理を並行して実施。

特徴と利点

CockroachDBとのデータ統合は、以下の特徴とメリットをもたらします。

  • 柔軟なイベント処理:EMQXルールエンジンを用いて、CockroachDBにデバイスのライフサイクルイベント(接続、切断、ステータス変化)を低レイテンシで保存・処理可能です。CockroachDBの分散実行と自動リバランシングと組み合わせることで、イベントデータの高可用性を確保し、障害や異常、トレンドのリアルタイム検出が可能です。
  • メッセージ変換:メッセージはEMQXルールを通じて広範な処理・変換を受けてからCockroachDBに書き込まれるため、保存されるデータは最初から分析に適した形となります。これによりクエリの複雑さが軽減され、下流での活用が最適化されます。
  • SQLテンプレートによる柔軟なデータ操作:EMQXのSQLテンプレートマッピングを通じて、構造化されたIoTデータをCockroachDBのテーブルやカラムに挿入・更新できます。PostgreSQL互換のCockroachDBは標準SQL、JSONBストレージ、インデックスをサポートし、ベクトル化実行エンジンによる高速分析やフォロワーリードによる低レイテンシなリージョンローカルアクセスが可能です。
  • 業務プロセスの統合:CockroachDBのPostgreSQL互換性により、ERP、CRM、GISなどの業務システムとの統合が可能です。EMQXと組み合わせることで、複雑なETLパイプラインを構築せずにイベント駆動型の自動化やクロスシステムのオーケストレーションを実現できます。
  • 高度な地理空間機能:PostGISなどのPostgreSQL拡張を通じて、CockroachDBは地理空間データの保存、インデックス作成、クエリをサポートします。これにより、ジオフェンシング、位置ベースのアラート、ルート追跡、リアルタイム資産監視がEMQXの信頼性の高いIoTデータ取り込みと連携して可能になります。
  • 組み込みのメトリクスと監視:EMQXは各CockroachDBシンクのランタイムメトリクス(メッセージ数、成功/失敗率、スループット)を提供し、CockroachDBは組み込みの可観測性ツールを備え、PrometheusやGrafanaと統合して詳細なパフォーマンスとヘルス監視を実現します。

はじめる前に

このセクションでは、CockroachDB統合を作成する前に必要な準備、CockroachDBのデプロイやデータベース・テーブルの作成方法について説明します。

前提条件

CockroachDBでのデータベースとテーブルの作成

EMQXでCockroachDBコネクターを作成する前に、CockroachDBクラスターが稼働していること、およびIoTデータを保存するためのデータベースとテーブルが準備されていることを確認してください。

  1. CockroachDBクラスターを作成します。

  2. EMQX用の専用SQLユーザーを作成します。詳細はCockroachDBユーザー管理ガイドを参照してください。本例ではSQLユーザー名をemqx_userとし、後でCockroachDBコネクター設定時に使用します。このユーザーには以下の権限が必要です。

    • 対象データベースへの接続権限
    • テーブル作成権限
    • EMQXデータテーブルへの読み書き権限
  3. データベースの作成に従い、データベースを作成します。本例ではデータベース名をemqx_dataとします。

  4. emqx_dataデータベースに接続し、MQTTメッセージとクライアントイベントデータを保存するための2つのテーブルを作成します。テーブルの作成の手順に従ってください。

    • 以下のSQL文で、クライアントID、トピック、QoS、ペイロード、到着時間などのメタデータを含むMQTTメッセージを保存するt_mqtt_msgテーブルを作成します。

      sql
      CREATE TABLE t_mqtt_msg (
        id SERIAL primary key,
        msgid character varying(64),
        sender character varying(64),
        topic character varying(255),
        qos integer,
        retain integer,
        payload text,
        arrived timestamp without time zone
      );
    • 以下のSQL文で、クライアントのオンライン/オフラインイベントをタイムスタンプ付きで保存するemqx_client_eventsテーブルを作成します。

      sql
      CREATE TABLE emqx_client_events (
        id SERIAL primary key,
        clientid VARCHAR(255),
        event VARCHAR(255),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

CockroachDBコネクターの作成

CockroachDBシンクを追加する前に、EMQXでCockroachDBコネクターを作成する必要があります。コネクターは、EMQXがセルフホスト型またはCockroachDB Cloudのクラスターにどのように接続するかを定義します。

  1. EMQXダッシュボードで、Integration -> Connector に移動します。

  2. ページ右上のCreateをクリックします。

  3. Create Connectorページで、CockroachDBを選択し、Nextをクリックします。

  4. コネクター名を入力します。名前は英数字で始まり、英数字、ハイフン、アンダースコアを含めることができます。例:my_cockroachdb

  5. 接続情報を入力します。

    • Server Host:CockroachDBクラスターのホスト名またはIPアドレス
      • CockroachDB Cloud:CockroachDB Cloudコンソールで提供される接続文字列のホスト値を使用(例:free-tier.gcp-us-central1.cockroachlabs.cloud
      • セルフホスト型:CockroachDBが稼働しているアドレス(例:ローカルなら127.0.0.1、サーバーのパブリック/プライベートIPなど)
    • Database Name:EMQXがデータを保存する対象データベース名。本例ではemqx_data
    • Username:認証および識別に使用するCockroachDBのSQLユーザー名。本例ではemqx_user
    • Passwordemqx_userのパスワード
    • Enable TLS:暗号化接続を確立する場合はトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
  6. 詳細設定(任意):接続プールサイズ、アイドルタイムアウト、リクエストタイムアウトなどの追加接続プロパティを設定できます。詳細はシンクの機能を参照してください。

  7. Test Connectivityをクリックし、EMQXが指定された設定でCockroachDBクラスターに正常に接続できることを確認します。

  8. Createをクリックしてコネクターを保存します。

  9. 作成後は以下のいずれかを選択できます。

    • Back to Connector Listをクリックして全コネクター一覧に戻る
    • Create Ruleをクリックして、このコネクターを使ったルールをすぐに作成し、CockroachDBへのデータ転送を設定する

    詳細な例は以下を参照してください。

メッセージ保存用CockroachDBシンクのルール作成

このセクションでは、ダッシュボードでソースMQTTトピックt/#からのメッセージを処理し、処理済みデータを設定済みシンク経由でCockroachDBのt_mqtt_msgテーブルに保存するルールの作成方法を示します。

  1. ダッシュボードのIntegration -> Rulesページに移動します。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力し、SQLエディターにルールを入力します。ここでは、t/#トピックのMQTTメッセージをCockroachDBに保存するため、ルールのSELECT部分でSQLテンプレートで使用するすべての変数を含むフィールドを選択してください。ルールSQLは以下の通りです。

    sql
    SELECT
    *
    FROM
    "t/#"

    TIP

    初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。

    • Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理されたデータをCockroachDBに送信します。
  4. Type of ActionドロップダウンからCockroachDBを選択し、ActionドロップダウンはデフォルトのCreate Actionのままにするか、既存のCockroachDBアクションを選択します。本例では新規シンクを作成してルールに追加します。

  5. シンクの名前と説明を入力します。

  6. Connectorドロップダウンから、先ほど作成したmy_cockroachdbを選択します。新しいコネクターを作成するにはドロップダウン横のボタンをクリックします。設定パラメーターはCockroachDBコネクターの作成を参照してください。

  7. SQLテンプレートを設定します。以下のSQL文を使ってデータを挿入します。

    注意:これはプリプロセス済みSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
    INSERT INTO t_mqtt_msg(msgid, sender, topic, qos, payload, arrived) VALUES(
      ${id},
      ${clientid},
      ${topic},
      ${qos},
      ${payload},
      TO_TIMESTAMP((${timestamp} :: bigint)/1000)
    )
  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  9. 詳細設定(任意):詳細はシンクの機能を参照してください。

  10. Createをクリックする前に、Test ConnectivityでシンクがCockroachDBクラスターに接続できるかテストできます。

  11. Createボタンをクリックしてシンク設定を完了します。新しいシンクがAction Outputsに追加されます。

  12. Create Ruleページで設定内容を確認し、Saveボタンをクリックしてルールを生成します。

ルール作成が成功すると、Integration -> Rulesページで新規ルールを確認でき、**Action (Sink)**タブで新規CockroachDBシンクも確認できます。

また、Integration -> Flow Designerでトポロジーを確認でき、t/#トピックのメッセージがルールmy_ruleで解析されてCockroachDBに書き込まれている様子を可視化できます。

イベント記録用CockroachDBシンクのルール作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みシンク経由でCockroachDBのemqx_client_eventsテーブルに保存するルール作成方法を示します。

手順はメッセージ保存用CockroachDBシンクのルール作成とほぼ同様ですが、SQLテンプレートとSQLルールが異なります。

オンライン/オフライン状態記録用のSQLルールは以下の通りです。

sql
SELECT
  *
FROM
  "$events/client_connected", "$events/client_disconnected"

イベント記録用のSQLテンプレートは以下の通りです。

注意:これはプリプロセス済みSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

sql
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  ${clientid},
  ${event},
  TO_TIMESTAMP((${timestamp} :: bigint)/1000)
)

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello CockroachDB" }'

2つのシンクの稼働状況を確認します。メッセージ保存用シンクでは新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。イベント記録用シンクでは2件のイベントレコードがあります。

t_mqtt_msgデータテーブルにデータが書き込まれているか確認します。

bash
emqx_data=# select * from t_mqtt_msg;
 id |              msgid               | sender | topic | qos | retain |            payload
        |       arrived
----+----------------------------------+--------+-------+-----+--------+-------------------------------+---------------------
  1 | 0005F298A0F0AEE2F443000012DC0002 | emqx_c | t/1   |   0 |        | { "msg": "hello CockroachDB" } | 2023-01-19 07:10:32
(1 row)

emqx_client_eventsテーブルにデータが書き込まれているか確認します。

bash
emqx_data=# select * from emqx_client_events;
 id | clientid |        event        |     created_at
----+----------+---------------------+---------------------
  3 | emqx_c   | client.connected    | 2023-01-19 07:10:32
  4 | emqx_c   | client.disconnected | 2023-01-19 07:10:32
(2 rows)