Skip to content

データ統合

EMQXは、MQTTプロトコルを通じてIoTデバイスを接続し、リアルタイムでメッセージを送信するMQTTメッセージングプラットフォームです。これを基盤として、EMQXのデータ統合は外部データシステムとの接続を導入し、デバイスと他のビジネスシステムのシームレスな統合を可能にします。

データ統合はSinkおよびSourceコンポーネントを使用して外部データシステムと接続します。SinkはMySQL、Kafka、HTTPサービスなどの外部データシステムへメッセージを送信するために使用され、SourceはMQTT、Kafka、GCP PubSubなどの外部データシステムからメッセージを受信するために使用されます。

この仕組みにより、EMQXは単にIoTデバイス間のメッセージ送信を超えて、デバイスが生成するデータをビジネス全体のエコシステムに有機的に統合します。これにより、IoTアプリケーションの適用シナリオが広がり、デバイスとビジネスシステム間のやり取りが豊かで多様になります。

注意

  • EMQX v5.4.0以降、従来のデータブリッジはデータフローの方向に応じて分割され、SinkおよびSourceに名称変更されました。

  • 現時点でEMQXがSourceとしてサポートする外部データシステムは以下の通りです:

    • MQTTサービス
    • Kafka
    • GCP PubSub

本ページでは、SinkおよびSourceの動作原理、対応する外部データシステム、主な機能、管理方法などについて包括的に解説します。

動作原理

EMQXのデータ統合は標準機能として提供されています。MQTTメッセージングプラットフォームとして、EMQXはMQTTプロトコル経由でIoTデバイスからデータを受信します。内蔵のルールエンジンの助けを借りて、受信したデータはルールエンジンに設定されたルールによって処理されます。ルールは処理済みデータを設定されたSink/Sourceを通じて外部データシステムに転送するアクションをトリガーします。Dashboard上のルールFlowデザイナーを使えば、コーディング不要で簡単にルール作成、アクションの紐付け、Sink/Sourceの作成が可能です。

内蔵ルールエンジン

様々なIoTデバイスやシステムからのデータソースは多種多様なデータ型やフォーマットを持ちます。EMQXはSQLルールに基づく強力な内蔵ルールエンジンを備えており、これはデータ処理および配信の中核コンポーネントです。ルールエンジンは条件判定、文字列操作、データ型変換、圧縮/解凍など多彩な機能を持ち、複雑なデータを柔軟に処理できます。

クライアントが特定のイベントをトリガーしたり、メッセージがEMQXに到達すると、ルールエンジンは事前定義されたルールに従いリアルタイムでデータを処理します。データ抽出、フィルタリング、付加情報付与、フォーマット変換などを行い、処理済みデータを指定されたSinkへ転送します。

ルールエンジンの詳細な動作についてはルールエンジン章をご参照ください。

Sink

Sinkはルールのアクションに追加されるデータ出力コンポーネントです。デバイスがイベントをトリガーしたりメッセージがEMQXに到着すると、システムは該当するルールをマッチング・実行し、データをフィルタリング・処理します。ルールエンジンで処理されたデータは指定されたSinkに転送されます。Sink内では${var}${.var}構文を使ってデータから変数を抽出し、動的にSQL文やデータテンプレートを生成するなどの処理を設定できます。その後、対応するコネクターを通じて外部データシステムへデータを送信し、メッセージの保存、データ更新、イベント通知などの操作を実現します。

Sinkでサポートされる変数抽出構文は以下の通りです:

  • ${var}:ルールの出力結果から変数を抽出する構文です。例:${topic}。ネストした変数を抽出したい場合はドット.で区切ります。例:${payload.temp}。抽出対象の変数が出力結果に含まれない場合は文字列undefinedが返されます。
  • ${.}, ${.var}${.}はルールの全出力結果を含むJSON文字列を抽出し、${.var}${var}と同じ意味です。

Source

Sourceはデータ入力コンポーネントであり、ルールのデータソースとして機能し、ルールのSQLで選択されます。

SourceはMQTTやKafkaなどの外部データシステムからメッセージをサブスクライブまたはコンシュームします。コネクター経由で新しいメッセージが到着すると、ルールエンジンは該当するルールをマッチング・実行し、データをフィルタリング・処理します。処理済みデータは指定されたEMQXトピックにパブリッシュされ、クラウドコマンド配信などの操作が可能となります。

対応する統合先

EMQXは以下の種類のデータシステムとのデータ統合をサポートしています:

デフォルト

クラウド

TSDB

SQL

NoSQL

メッセージキュー

その他

Sinkの特徴

Sinkは以下の機能により使い勝手を向上させ、データ統合のパフォーマンスと信頼性をさらに強化します。すべてのSinkがこれらの機能を完全に実装しているわけではありません。各Sinkのドキュメントで対応状況をご確認ください。

非同期リクエストモード

非同期リクエストモードは、メッセージのパブリッシュ・サブスクライブ処理がSinkの実行速度に影響されるのを防ぐために設計されています。ただし、非同期リクエストモードを有効にすると、サブスクライバーがメッセージを受信したものの、外部データシステムへの書き込みがまだ完了していないケースが発生する可能性があります。

データ処理効率を高めるため、EMQXはデフォルトで非同期リクエストモードを有効にしています。メッセージの配信タイミングに厳密な要件がある場合は、非同期リクエストモードを無効にしてください。

max_inflightパラメータも非同期リクエスト時のメッセージ順序に影響します。一部のSinkに存在するこのパラメータは、非同期モードで同一MQTTクライアントからのメッセージを厳密に順序通りに処理する必要がある場合、必ず1に設定する必要があります。

バッチモード

バッチモードは複数のデータをまとめて外部データ統合システムに書き込むことを可能にします。バッチモードを有効にすると、EMQXは各リクエストのデータ(単一エントリ)を一時的に蓄積し、一定時間経過または一定数のデータが蓄積された後にまとめてターゲットデータシステムに書き込みます(いずれも設定可能)。

利点:

  • 書き込み効率の向上:単一メッセージの書き込みと比べ、バッチモードではデータベースシステムがメッセージをキャッシュまたは事前処理できるため、書き込み効率が向上します。
  • ネットワークレイテンシの低減:バッチ書き込みによりネットワーク送信回数が減るため、レイテンシが低減します。

課題:

書き込み遅延:設定された時間またはエントリ数に達するまでデータの書き込みが遅延します。これらの設定はパラメータで調整可能です。

バッファキュー

バッファキューはSinkの一定のフォールトトレランスを提供し、データ安全性向上のために有効化が推奨されます。

各リソース接続(MQTT接続ではありません)にはバッファキューの長さ(容量サイズ)があり、これを超えたデータはFIFO原則に従い破棄されます。

バッファファイルの場所

Kafka Sinkの場合、ディスクキャッシュファイルはdata/kafkaにあります。その他のSinkではdata/bufsにあります。

実運用ではdataディレクトリを高性能ディスクにマウントしてスループットを向上させることが可能です。

プリペアドステートメント

MySQLやPostgreSQLなどのSQLデータベースでは、SQLテンプレートがフィールド変数を明示的に指定せずに事前処理実行されます。

直接SQLを実行する場合、トピックとペイロードは文字列型としてシングルクォートで囲み、QoSは整数型として指定する必要があります:

sql
INSERT INTO msg(topic, qos, payload) VALUES('${topic}', ${qos}, '${payload}');

しかし、プリペアドステートメントをサポートするSinkでは、SQLテンプレートは必ずクォートなしで記述します:

sql
INSERT INTO msg(topic, qos, payload) VALUES(${topic}, ${qos}, ${payload});

これによりフィールド型の自動推論が可能となり、SQLインジェクション防止にも寄与してセキュリティが向上します。

フォールバックアクション

EMQX 5.9.0以降、任意のアクションに対してフォールバックアクション群を定義できます。プライマリアクションがメッセージ処理に失敗した場合、これらのフォールバックアクションがトリガーされます。この仕組みにより、メッセージを別のSinkや再パブリッシュアクションにリダイレクトしてデータ信頼性と可観測性を向上できます。

フォールバックアクションの用途例:

  • 失敗したメッセージをバックアップデータシステム(別のSinkなど)に転送
  • 失敗したメッセージを監視トピックに再パブリッシュしてトラブルシューティングやアラートに活用
  • プライマリアクションの一時的障害によるデータ損失を最小化

主な特徴

  • フォールバックアクションはプライマリアクションがメッセージ処理に失敗した場合のみトリガーされます。失敗には配信エラー、バッファオーバーフロー、リクエストTTL切れが含まれます。
  • フォールバックアクションは自身の設定に関わらず常に非同期リクエストモードで動作します。
  • 定義されたすべてのフォールバックアクションは同時にトリガーされます。EMQXは順次試行や最初の成功で停止しません。
  • フォールバックアクションは通常のアクションと同じバッファリング機構を共有し、メッセージはリクエストTTLまたはバッファオーバーフローまでリトライされます。
  • フォールバックアクションはさらにフォールバックアクションをトリガーしません。フォールバックアクション自身が失敗しても、そのフォールバックは発動しません。
  • フォールバックアクションによるメッセージ処理はプライマリアクションや元のルールのメトリクスに影響を与えません。

フォールバックアクションの定義例

HTTPアクションmy_httpに対してフォールバックアクションを定義し、既存のMQTTアクションfallbackを利用する場合の設定例です:

hcl
actions {
  http {
    my_http {
      fallback_actions = [
        {kind = reference, type = mqtt, name = fallback},
        {
          kind = republish,
          args = {
            topic = "fallback/republish/topic"
            qos = 1
            payload = "${payload}"
          }
        }
      ]
      # その他の設定は省略
    }
  }
  mqtt {
    fallback {
      fallback_actions = [
        {kind = reference, type = mqtt, name = another_fallback}
      ]
      # その他の設定は省略
    }
  }
}

この例では:

  • HTTPアクションmy_httpが失敗した場合、メッセージは
    • MQTTアクションfallbackに転送され
    • トピックfallback/republish/topicに再パブリッシュされます
  • fallbackも失敗した場合、fallbackに定義されたフォールバックアクションanother_fallbackはトリガーされません。フォールバックアクションの再帰的連鎖はサポートされていません。
  • ただし、fallbackが別のルールでプライマリアクションとしてトリガーされ失敗した場合は、そのフォールバックanother_fallbackが適用されます。

Sinkの状態と統計情報

Dashboard上でSinkの稼働状況や統計情報を確認し、Sinkが正常に動作しているか把握できます。

稼働状態

Sinkは以下の状態を持ちます:

  • connecting:ヘルスプローブが行われる前の初期状態で、外部データシステムへの接続を試みている段階です。
  • connected:Sinkが正常に接続され、正常稼働中です。ヘルスプローブが失敗すると、失敗の程度に応じてconnectingまたはdisconnectedに遷移する場合があります。
  • disconnected:ヘルスプローブに失敗し、非正常状態です。設定により自動的に再接続を試みることがあります。
  • stopped:手動で無効化された状態です。
  • inconsistent:クラスター内のノード間でSinkの状態に不整合が生じています。

稼働統計

EMQXはデータ統合の稼働統計を以下のカテゴリで提供します:

  • Matched(カウンター)
  • Sent Successfully(カウンター)
  • Sent Failed(カウンター)
  • Dropped(カウンター)
  • Late Reply(カウンター)
  • Inflight(ゲージ)
  • Queuing(ゲージ)
データブリッジのメトリクス

Matched

matchedはSinkにルーティングされたリクエスト/メッセージの数をカウントします。状態に関わらずカウントされます。各メッセージは最終的に他のメトリクスで計上されるため、matched = success + failed + inflight + queuing + late_reply + droppedとなります。

Sent Successfully

successは外部データシステムに正常に受信されたメッセージ数をカウントします。retried.successsuccessのサブカウントで、少なくとも1回リトライされたメッセージ数を追跡します。したがって、retried.success <= successです。

Sent Failed

failedは外部データシステムへの受信に失敗したメッセージ数をカウントします。retried.failedfailedのサブカウントで、少なくとも1回リトライされたメッセージ数を追跡します。したがって、retried.failed <= failedです。

Dropped

droppedは配信試行なしに破棄されたメッセージ数をカウントします。複数の詳細カテゴリがあり、それぞれ破棄理由を示します。計算式はdropped = dropped.expired + dropped.queue_full + dropped.resource_stopped + dropped.resource_not_foundです。

  • expired:キューイング中にメッセージのTTLが切れた場合
  • queue_full:キューの最大サイズに達し、メモリオーバーフロー防止のため破棄された場合
  • resource_stopped:Sinkが停止中に配信を試みた場合
  • resource_not_found:Sinkが存在しない状態で配信を試みた場合。稀に発生し、Sink削除時の競合状態が原因です。

Late Reply

late_replyはメッセージ送信を試みたものの、基盤ドライバーからの応答がメッセージTTL切れ後に届いた場合にインクリメントされます。

TIP

late_replyはメッセージが成功したか失敗したかを示すものではありません。不明な状態です。外部データシステムへの挿入に成功した可能性もあれば、失敗した可能性もあり、接続タイムアウトの可能性もあります。

Inflight

inflightはバッファリング層に存在し、外部データシステムからの応答を待っているメッセージ数を示すゲージです。

Queuing

queuingはバッファリング層に受信済みで、まだ外部データシステムに送信されていないメッセージ数を示すゲージです。