Skip to content

MQTT Streams

EMQX 6.1では、MQTTのリアルタイムなパブリッシュ/サブスクライブモデルを拡張し、永続的でリプレイ可能なメッセージストリームを実現するストリーミングおよびリプレイ機能であるMQTT Streamsを導入しました。これにより、Kafkaのようなストリーミング機能を提供しつつ、MQTTのセマンティクスを維持します。

本ページでは、EMQXにおけるMQTT Streams機能の設計動機、主要概念、内部アーキテクチャ、メッセージフロー、および実際の適用シナリオについて包括的に解説します。

MQTT Streamとは何か?

MQTT Streamは、名前付きの論理リソースであり、設定されたトピックフィルターにマッチするMQTTメッセージを継続的に収集します。メッセージはストリームの保持ポリシーに従って永続的に保存され、後からサブスクライブしたクライアントによってリプレイ可能です。

ストリームは一意の名前で識別されます。トピックフィルターはストリームの設定の一部であり、識別子ではありません。

各ストリームは以下を持ちます:

  • 一意の名前
  • 設定されたトピックフィルター
  • 保持ポリシー(時間ベースおよび/またはサイズベース)
  • オプションのラストバリューセマンティクス
  • 明示的なライフサイクル(作成、更新、削除)

なぜMQTT Streamsを使うのか?

MQTTはリアルタイムメッセージングに最適化されていますが、以下のような制約があります:

  • メッセージは通常オンラインのサブスクライバーにのみ配信される。
  • 過去データのリプレイはネイティブにサポートされていない。
  • 過去データの再処理には外部システムが必要。
  • 順序付けられたリプレイ可能なメッセージ履歴の維持が困難。

MQTT Streamsは、永続的なメッセージ保存とリプレイ機能でMQTTを拡張します。これにより、MQTTクライアントのパブリッシュやサブスクライブの方法を変えずに、過去のメッセージを読み取ったりデバイスの最新状態を取得したりできます。

MQTT Streamsの主要概念

  • MQTT Stream

    名前で識別・アドレス指定され、明示的なライフサイクルで管理される論理リソースです。アクティブな間は、設定された時間またはサイズの制限内でマッチするメッセージを継続的に保存します。保存されたメッセージは、パブリッシャー側の変更なしにサブスクライバーによってリプレイ可能です。

    ストリーム名に使用できる文字は以下のみです:

    • 英数字(A–Za–z0–9
    • アンダースコア(_
    • ハイフン(-
    • ドット(.

    2種類のストリームタイプをサポートしています:

    • レギュラーストリーム:履歴データを上書きせずにすべてのマッチするメッセージを保存します。stream-offsetサブスクリプションプロパティを使って、指定したタイムスタンプやオフセットからリプレイ可能です。
    • ラストバリューストリームラストバリューセマンティクスを有効にします。同じストリームキーを持つメッセージは新しいものが古いものを上書きし、各キーに対応する最新のメッセージのみを保持します。詳細はストリームキー式をご覧ください。
  • トピックフィルター

    sensors/+/dataのようなMQTTトピックフィルターで、どのパブリッシュされたメッセージをストリームに取り込むかを決定します。マッチするメッセージのみが取り込まれ、1つのメッセージが複数のストリームに属することもあります。

    TIP

    トピックフィルターはストリームの識別子ではなく、名前付きストリームの設定メタデータです。

  • ストリームサブスクリプション

    ストリームからメッセージを消費するための特別なMQTTサブスクリプションです。クライアントは以下のいずれかの形式でサブスクライブします:

    SUBSCRIBE $stream/<name>
    SUBSCRIBE $stream/<name>/<topic_filter>

    ここで:

    • <name> はストリーム名(必須)。
    • <topic_filter> は既存ストリームにサブスクライブする際は省略可能。
    • オートクリエーションが有効な場合、$stream/<name>/<topic_filter>はストリームが存在しなければ指定されたトピックフィルターで作成します。

    ストリームサブスクリプションは通常のMQTTサブスクリプションとは独立して動作し、External Subscription機構を通じて配信されます。

  • ストリームオフセット(リプレイ開始位置)

    リプレイ開始位置はトピックパスではなく、MQTT 5のユーザーサブスクリプションプロパティstream-offsetで指定します。

    stream-offsetプロパティはリプレイ開始位置を決定します。例:

    • タイムスタンプ
    • 論理オフセット
    • 最も古い位置や最新位置などの特別な位置(対応している場合)

    この設計により、トピック文字列からのオフセット解析が不要となり、MQTT 5プロパティによるリプレイ制御と整合します。

  • キー式

    各受信メッセージに対して評価されるユーザー定義式で、キーを抽出します。式はメッセージの内容やメタデータを参照可能です。抽出されたキーはストレージパーティション内でのメッセージ順序保証に使われます。ラストバリューセマンティクスが有効な場合、キーは上書き範囲を定義し、同じキーの新しいメッセージが古いものを置き換えます。

MQTT Streamsのアーキテクチャ

MQTT Streamsは、ブローカーコアと疎結合で連携し、既存のインフラを再利用する独立したEMQXアプリケーションとして実装されています。EMQXとの統合は内部フックとExternal Subscriptionフレームワークを通じて行われます。

TIP

External Subscriptionは、ライブMQTTパブリッシュ経路外から発生するメッセージソースをMQTTクライアントセッションに接続し、クライアントの動作を変えずに標準MQTTサブスクリプションを通じてメッセージを配信可能にするEMQXの仕組みです。

主なコンポーネント

  • Streams Registry:MQTTストリームのライフサイクルを管理し、ストリーム名、トピックフィルター、保持ポリシー、キー式などのメタデータを保持します。効率的なストリーム検索のためにMnesiaテーブルを使用します。
  • Streams Message Database:ストリームメッセージの永続ストレージを提供し、EMQXのパーシステントストレージ上に構築されています。メッセージを永続化し、保持制限を適用し、ラストバリューセマンティクスが有効な場合は適用し、保持ポリシーに従って効率的なメッセージ取得をサポートします。
  • Streams ExtSub Handler:メッセージストリームをMQTTクライアントセッションと統合します。パーシステントストレージからメッセージを取得し、External Subscriptionフレームワークを通じてサブスクライブクライアントに配信します。

MQTT Streamsデータフローダイアグラム

以下の図はMQTT Streamsコンポーネント間のデータフローを示しています:

streams_data_flow

パブリッシュフロー

  1. クライアントがMQTTトピックにメッセージをパブリッシュします。
  2. MQTT Streamsフックがトリガーされ、パブリッシュ処理を開始します。
  3. フックはStreams Registryに問い合わせ、メッセージトピックにマッチするストリームを特定します。
  4. マッチした各ストリームに対してメッセージを書き込み、パーシステントストレージに永続化します。

サブスクライブおよび消費フロー

  1. クライアントがストリームトピック($stream/<name>または$stream/<name>/<topic_filter>)にサブスクライブし、オプションでstream-offsetサブスクリプションプロパティを含めます。

    非推奨

    旧形式の$s/<offset>/<topic_filter>は後方互換のためサポートされていますが非推奨です。詳細は互換性の注意点をご覧ください。

  2. External Subscriptionフレームワークがサブスクリプションを処理し、ストリームトピック用のStreams ExtSubハンドラーを初期化します。

  3. ハンドラーは指定されたstream-offsetと保持ルールに従い、パーシステントストレージからメッセージを取得します。

  4. 取得したメッセージはExternal Subscriptionフレームワークに渡されます。

  5. ExtSubアプリケーションが標準MQTT配信を通じてクライアントにメッセージを届けます。

MQTT Streamsのコア機能

MQTT Streamsは、メッセージの保存、順序付け、保持、リプレイ消費のための配信方法を定義する一連の基本機能を提供します。

  • オフセットベースのリプレイ

    リプレイ開始位置はトピックパスではなくstream-offsetサブスクリプションプロパティで指定します。指定されたオフセット以前にパブリッシュされたメッセージはスキップされます。

  • 保持

    ストリームの保持ポリシーによりメッセージリプレイが制限されます。メッセージは時間またはサイズで制限された期間保持されます。期限切れメッセージは消費済みかどうかに関わらず自動的に削除されます。

  • キー単位の順序保証

    MQTT Streamsは単一のグローバルな配信順序を保証しません。同じキーを持つメッセージは常にパブリッシュ順に配信されますが、異なるキーのメッセージは任意の順序で配信される可能性があります。

  • ラストバリューセマンティクス

    ストリームはラストバリューセマンティクスを有効にできます。同じキーを持つメッセージは古いものを上書きし、最新の値のみを保持します。キーが解決できないメッセージは通常通り保存されます。

  • MQTTネイティブ配信

    ストリームメッセージは標準のMQTTメカニズムを使って配信されます。パブリッシャーは動作を変更する必要がありません。サブスクライバーへのメッセージ配信はExternal Subscriptionを通じて統合されています。

互換性の注意点

既存のデプロイメントに関する互換性の考慮事項を説明します。

名前付きストリーム

  • すべてのストリームは明示的に名前付きリソースとなりました。
  • ストリーム名は許可された文字セットルールに従う必要があります。

旧ストリーム

以前に作成された名前なしストリームには、トピックフィルターから派生した名前が自動的に割り当てられます。

派生名は/<topic_filter>となります。

非推奨のプレフィックス

ストリームサブスクライブ用の$sプレフィックスは後方互換のため引き続きサポートされていますが非推奨です。

新規デプロイメントでは$stream/<name>を使用してください。

典型的なユースケース

  • 過去データのリプレイ:過去のMQTTイベントを再処理し、デバッグや新しいビジネスロジックに活用。
  • 時系列解析:センサーデータを保存・リプレイし、分析や予知保全に利用。
  • イベントソーシング:すべての状態変化を不変のイベントログとして永続化。
  • IoTデジタルツイン:物理デバイスの最新状態をデジタル形式で維持。
  • 設定同期:デバイスが常に最新の設定を受け取ることを保証。

次のステップ

MQTT Streamsの基本を理解したら、実践方法を学びましょう: