Skip to content

データ統合

EMQXは、MQTTプロトコルを通じてIoTデバイスを接続し、リアルタイムでメッセージを送受信するMQTTメッセージングプラットフォームです。これを基盤として、EMQXのデータ統合は外部データシステムとの接続を導入し、デバイスと他の業務システムをシームレスに統合できるようにします。

データ統合では、SinkおよびSourceコンポーネントを用いて外部データシステムと接続します。SinkはMySQL、Kafka、HTTPサービスなどの外部データシステムへメッセージを送信するために使用され、SourceはMQTT、Kafka、GCP PubSubなどの外部データシステムからメッセージを受信するために使用されます。

この仕組みにより、EMQXは単なるIoTデバイス間のメッセージ送信を超え、デバイスが生成するデータを業務エコシステム全体に有機的に統合します。これにより、IoTアプリケーションの適用シナリオが広がり、デバイスと業務システム間の連携が豊かで多様になります。

注意

  • EMQX v5.4.0以降、従来のデータブリッジはデータフローの方向に応じて分割され、それぞれSinkとSourceに名称変更されました。

  • 現時点でEMQXがSourceとしてサポートする外部データシステムは以下の通りです:

    • MQTTサービス
    • Kafka
    • GCP PubSub

本ページでは、SinkとSourceの概要、動作原理、対応する外部データシステム、主な機能、管理方法について包括的に解説します。

動作原理

EMQXのデータ統合は標準機能として提供されています。MQTTメッセージングプラットフォームとして、EMQXはMQTTプロトコルを介してIoTデバイスからデータを受信します。組み込みのルールエンジンの助けを借りて、受信したデータはルールエンジンに設定されたルールに従って処理されます。ルールは処理済みデータを設定されたSink/Sourceを通じて外部データシステムへ転送するアクションをトリガーします。Dashboard上のルールFlowデザイナーを使って、コーディング不要でルール作成、アクションの紐付け、Sink/Sourceの作成が簡単に行えます。

組み込みルールエンジン

様々なIoTデバイスやシステムからのデータソースは、多種多様なデータ型やフォーマットを持ちます。EMQXはSQLベースの強力な組み込みルールエンジンを備えており、データ処理と配信の中核コンポーネントです。ルールエンジンは条件判定、文字列操作、データ型変換、圧縮/解凍など多彩な機能を持ち、複雑なデータを柔軟に処理できます。

クライアントが特定のイベントをトリガーしたり、メッセージがEMQXに到達した際に、ルールエンジンは事前定義されたルールに従いリアルタイムでデータを処理します。データ抽出、フィルタリング、付加情報の付与、フォーマット変換などを行い、処理済みデータを指定されたSinkへ転送します。

ルールエンジンの詳細な動作についてはルールエンジン章をご参照ください。

Sink

Sinkはルールのアクションに追加されるデータ出力コンポーネントです。デバイスがイベントをトリガーしたりメッセージがEMQXに到着すると、システムは該当ルールをマッチングして実行し、データをフィルタリング・処理します。ルールエンジンで処理されたデータは指定されたSinkに転送されます。Sink内では${var}${.var}構文を用いてデータから変数を抽出し、SQL文やデータテンプレートを動的に生成するなどの処理を設定可能です。その後、対応するコネクターを通じて外部データシステムへデータを送信し、メッセージの保存、データ更新、イベント通知などの操作を実現します。

Sinkでサポートされる変数抽出構文は以下の通りです:

  • ${var}:ルールの出力結果から変数を抽出する構文です。例:${topic}。ネストした変数を抽出したい場合はドット.で区切り、${payload.temp}のように指定します。抽出対象の変数が出力結果に含まれない場合は文字列undefinedが返されます。
  • ${.}, ${.var}${.}はルールの全出力結果を含むJSON文字列を抽出し、${.var}${var}と同義です。

Source

Sourceはデータ入力コンポーネントであり、ルールのデータソースとして機能し、ルールのSQLで選択されます。

SourceはMQTTやKafkaなどの外部データシステムからメッセージをサブスクライブまたはコンシュームします。コネクターを通じて新しいメッセージが到着すると、ルールエンジンは該当ルールをマッチング・実行し、データをフィルタリング・処理します。処理済みデータは指定されたEMQXトピックにパブリッシュされ、クラウドコマンド配信などの操作を可能にします。

対応する統合先

EMQXは以下の種類のデータシステムとのデータ統合をサポートしています:

デフォルト

クラウド

TSDB

SQL

NoSQL

メッセージキュー

その他

Sinkの特徴

Sinkは以下の機能により使い勝手を向上させ、データ統合のパフォーマンスと信頼性をさらに高めます。すべてのSinkがこれらの機能を完全に実装しているわけではありません。各Sinkの対応状況はそれぞれのドキュメントをご確認ください。

非同期リクエストモード

非同期リクエストモードは、メッセージのパブリッシュ・サブスクライブ処理がSinkの実行速度に影響されないよう設計されています。ただし、非同期リクエストモードを有効にすると、サブスクライバーがメッセージを受信した時点で、外部データシステムへの書き込みがまだ完了していない場合があります。

EMQXはデータ処理効率を高めるため、非同期リクエストモードをデフォルトで有効にしています。メッセージの配信タイミングに厳密な要件がある場合は、非同期リクエストモードを無効にしてください。

max_inflightパラメータも非同期リクエスト時のメッセージ順序に影響します。一部のSinkにはこのパラメータがあり、非同期モードで同一MQTTクライアントからのメッセージを順序通りに処理する必要がある場合は、この値を1に設定する必要があります。

バッチモード

バッチモードは複数のデータエントリをまとめて外部データ統合システムに書き込む機能です。バッチモードを有効にすると、EMQXは各リクエストのデータ(単一エントリ)を一時的に蓄積し、一定時間経過または一定数のデータが蓄積されたタイミングでまとめて書き込みます(いずれも設定可能)。

利点:

  • 書き込み効率の向上:単一メッセージ書き込みに比べ、バッチモードではデータベースシステムがメッセージをキャッシュまたは事前処理できるため、書き込み効率が向上します。
  • ネットワークレイテンシの低減:バッチ書き込みはネットワーク送信回数を減らすため、レイテンシを低減します。

課題:

即時書き込みの遅延:設定された時間またはエントリ数に達するまで書き込みが保留されるため、遅延が発生します。これらの設定はパラメータで調整可能です。

バッファキュー

バッファキューはSinkのフォールトトレランス機能を提供し、データ安全性向上のために有効化が推奨されます。

各リソース接続(MQTT接続ではありません)にはバッファキュー長(容量サイズ)があり、これを超えたデータはFIFO原則に従い破棄されます。

バッファファイルの場所

Kafka Sinkの場合、ディスクキャッシュファイルはdata/kafkaに保存されます。その他のSinkはdata/bufsに保存されます。

実運用ではdataディレクトリを高性能ディスクにマウントすることでスループットを向上できます。

プリペアドステートメント

MySQLやPostgreSQLなどのSQLデータベースでは、SQLテンプレートはフィールド変数を明示的に指定せずに事前処理実行されます。

直接SQLを実行する場合は、topicとpayloadを文字列型、qosを整数型としてシングルクォートで囲む必要があります:

sql
INSERT INTO msg(topic, qos, payload) VALUES('${topic}', ${qos}, '${payload}');

しかし、プリペアドステートメントをサポートするSinkでは、SQLテンプレートは必ずクォートなしで記述します:

sql
INSERT INTO msg(topic, qos, payload) VALUES(${topic}, ${qos}, ${payload});

これによりフィールド型の自動推論が可能となり、SQLインジェクション防止によるセキュリティ強化も実現します。

フォールバックアクション

EMQX 5.9.0以降、任意のアクションに対してフォールバックアクションのセットを定義可能です。プライマリアクションがメッセージ処理に失敗した場合、これらのフォールバックアクションがトリガーされます。これにより、メッセージを別のSinkや再パブリッシュアクションにリダイレクトすることで、データ信頼性と可観測性を向上させます。

フォールバックアクションの用途例:

  • 失敗したメッセージをバックアップデータシステム(別のSinkなど)に転送
  • 監視用トピックへ失敗メッセージを再パブリッシュしトラブルシューティングやアラートに活用
  • プライマリアクションの一時的障害によるデータ損失を最小化

主な特徴

  • フォールバックアクションはプライマリアクションがメッセージ処理に失敗した場合のみトリガーされます。失敗には配信エラー、バッファオーバーフロー、リクエストTTL切れが含まれます。
  • フォールバックアクションは自身の設定に関わらず常に非同期リクエストモードで動作します。
  • 定義されたすべてのフォールバックアクションは同時にトリガーされ、EMQXは順次実行や最初の成功で停止しません。
  • フォールバックアクションは通常のアクションと同じバッファリング機構を共有し、リクエストTTLまたはバッファオーバーフローまでリトライされます。
  • フォールバックアクションはさらに別のフォールバックアクションをトリガーしません。フォールバックアクション自身が失敗しても、その設定されたフォールバックは起動しません。
  • フォールバックアクションによるメッセージ処理は、プライマリアクションや元のルールのメトリクスに影響を与えません。

フォールバックアクションの定義例

HTTPアクションmy_httpに対してフォールバックアクションを定義し、既存のMQTTアクションfallbackを利用する場合の設定例です。

hcl
actions {
  http {
    my_http {
      fallback_actions = [
        {kind = reference, type = mqtt, name = fallback},
        {
          kind = republish,
          args = {
            topic = "fallback/republish/topic"
            qos = 1
            payload = "${payload}"
          }
        }
      ]
      # その他の設定は省略
    }
  }
  mqtt {
    fallback {
      fallback_actions = [
        {kind = reference, type = mqtt, name = another_fallback}
      ]
      # その他の設定は省略
    }
  }
}

この例では:

  • HTTPアクションmy_httpが失敗した場合、メッセージは以下に処理されます:
    • MQTTアクションfallbackに転送
    • トピックfallback/republish/topicへ再パブリッシュ
  • fallbackが失敗しても、fallback内で定義されたフォールバックアクションanother_fallbackはトリガーされません。フォールバックアクションは再帰的な連鎖をサポートしません。
  • ただしfallbackが別ルールのプライマリアクションとしてトリガーされ失敗した場合は、そのフォールバックanother_fallbackが適用されます。

Sinkのステータスと統計情報

Dashboard上でSinkの稼働状況や統計情報を確認し、正常に動作しているかを把握できます。

稼働ステータス

Sinkは以下のステータスを持ちます:

  • connecting:ヘルスプローブ実施前の初期状態で、ブリッジが外部データシステムへの接続を試みている段階
  • connected:Sinkが正常に接続され稼働中。ヘルスプローブ失敗時は障害の程度に応じてconnectingまたはdisconnectedに遷移する場合あり
  • disconnected:ヘルスプローブに失敗し非正常状態。設定により自動的に再接続を試みることがある
  • stopped:手動で無効化された状態
  • inconsistent:クラスター内のノード間でSinkステータスに不一致がある状態

稼働統計

EMQXはデータ統合の稼働統計を以下のカテゴリで提供します:

  • Matched(カウンター)
  • Sent Successfully(カウンター)
  • Sent Failed(カウンター)
  • Dropped(カウンター)
  • Late Reply(カウンター)
  • Inflight(ゲージ)
  • Queuing(ゲージ)
データブリッジのメトリクス

Matched

matchedはSinkにルーティングされたリクエスト/メッセージ数をカウントします。状態に関わらずカウントされます。各メッセージは他のメトリクスで最終的に計上されるため、matched = success + failed + inflight + queuing + late_reply + droppedとなります。

Sent Successfully

successは外部データシステムに正常に受信されたメッセージ数をカウントします。retried.successsuccessのサブカウントで、少なくとも1回再配信されたメッセージ数を追跡します。したがってretried.success <= successです。

Sent Failed

failedは外部データシステムへの受信に失敗したメッセージ数をカウントします。retried.failedfailedのサブカウントで、少なくとも1回再配信されたメッセージ数を追跡します。したがってretried.failed <= failedです。

Dropped

droppedは配信試行なしで破棄されたメッセージ数をカウントします。複数の詳細カテゴリがあり、それぞれ破棄理由を示します。計算式はdropped = dropped.expired + dropped.queue_full + dropped.resource_stopped + dropped.resource_not_foundです。

  • expired:キューイング中にメッセージのTTLが切れた
  • queue_full:最大キューサイズに達しメモリオーバーフロー防止のため破棄
  • resource_stopped:Sinkが停止中に配信を試みたメッセージ
  • resource_not_found:Sinkが存在しない状態で配信を試みたメッセージ。稀に発生し、通常はSink削除時の競合状態による

Late Reply

late_replyはメッセージ送信を試みたが、基盤ドライバーからの応答がメッセージTTL切れ後に届いた場合にインクリメントされます。

TIP

late_replyはメッセージの送信成功・失敗を示すものではありません。外部データシステムへの挿入成功、挿入失敗、接続タイムアウトなど不明な状態を示します。

Inflight

inflightはバッファリング層内で現在送信中で外部データシステムからの応答待ちのメッセージ数を示すゲージです。

Queuing

queuingはバッファリング層に受信済みでまだ外部データシステムへ送信されていないメッセージ数を示すゲージです。