Skip to content

メッセージキュー

EMQX 6.0で導入されたメッセージキュー機能は、MQTTのサブスクライブ/パブリッシュパターンを耐久性のあるキューセマンティクスで拡張し、信頼性の高い非同期メッセージ配信を可能にします。RabbitMQのようなエンタープライズグレードのメッセージキューで一般的に見られる機能を、追加のインフラなしでネイティブMQTTの機能に付加します。

本ページでは、EMQXのメッセージキュー機能の設計動機、主要概念、内部アーキテクチャ、メッセージフロー、実際の利用シナリオについて包括的に解説します。

メッセージキューとは?

EMQXのメッセージキューは、サブスクライバーの有無に依存せずMQTTメッセージを保存する名前付きの耐久性のあるサーバー側バッファです。各キューは一意のキュー名で識別され、トピックフィルターはどのパブリッシュされたメッセージをキューに格納するかを定義します(キューの識別子としては機能しません)。設定されたトピックフィルターに一致するメッセージは、キューの保持および配信ポリシーに従って自動的に永続化されます。

従来のMQTTの動作と異なり、メッセージキューはクライアントがオンラインでなくてもメッセージを保持します。クライアントは特別なサブスクリプション形式 $queue/<name> または $queue/<name>/<topic_filter> をサブスクライブすることで、これらのメッセージを消費できます。

メッセージキューのルーティング概要

なぜメッセージキューを使うのか?

MQTTは軽量で広く採用されているパブリッシュ/サブスクライブプロトコルですが、デフォルトの動作ではメッセージ配信がサブスクライバーのオンライン状態に強く依存しており、非同期や遅延消費のシナリオでは制約があります。

MQTTの制約

MQTTは共有サブスクリプション$share/{group}/topic)を通じてキューのような機能を一部サポートしていますが、以下の制約があります。

  • オンラインのサブスクライバーがいない場合、メッセージは保持されません。
  • TTL(有効期限)、キューサイズ制限、オーバーフロー制御の組み込みサポートがありません。
  • キーごとに最新値のみを保持するようなメッセージの重複排除機能がありません。
  • キューの明示的なライフサイクル管理がありません。

これらの制約により、以下のようなパターンの実装が困難です。

  • デバイスがオンラインになる前にコマンドを送信する。
  • 常時接続していないワーカーにタスクを送る。
  • 最新の状態や設定更新のみを保持する。

メッセージキューによるMQTTの拡張

メッセージキューはEMQXのMQTTプロトコルを拡張し、サブスクライバーのオンライン状態に関わらずメッセージを永続化して後続処理を可能にします。主な特徴は以下の通りです。

  • クライアントがオフラインでもメッセージを永続化:キューは厳密な順序付けを保証しませんが、信頼性の高い非同期配信を実現し、軽量MQTT通信とエンタープライズ向けメッセージングのギャップを埋めます。
  • 明示的なキュー宣言とプロパティ設定:各キューはTTL、サイズ制限、配信戦略などを設定でき、メッセージの保持・配信を細かく制御可能です。
  • オプションの最新値セマンティクス:同一キーのメッセージは古いものを上書きし、最新の状態や設定更新のみを保持できます。

メッセージキューの概念

  • キュー名
    キューを識別するMQTTトピックまたはトピックフィルター。マッチするトピックにパブリッシュされたメッセージは自動的にキューに格納されます。

    メッセージキューを明示的に識別する一意の識別子です。

    キュー名に使用できる文字は以下に限定されます。

    • 英数字(A–Za–z0–9
    • アンダースコア(_
    • ハイフン(-
    • ドット(.

    TIP

    EMQX 6.1.1以降、キューはトピックフィルターではなく名前で指定します。トピックフィルターはキューの設定の一部であり、識別子ではありません。

  • メッセージの永続化
    サブスクライバーが接続していなくてもメッセージは保持されます。デフォルトでは最新値セマンティクスが適用されます。キー式がない通常のキューでは、受信順にメッセージが保存されます。

    devices/+/command のようなMQTTトピックフィルターで、どのパブリッシュメッセージをキューに書き込むかを決定します。設定されたフィルターに一致するメッセージのみがキューに格納されます。1つのメッセージが複数のキューに一致し、複数のキューに書き込まれることもあります。

    TIP

    トピックフィルターは名前付きキューの設定メタデータであり、キュー作成後に変更できません。

  • キューサブスクリプション

    キューからメッセージを消費するための特別なMQTTサブスクリプションです。クライアントは以下の形式でサブスクライブします。

    SUBSCRIBE $queue/<name>
    SUBSCRIBE $queue/<name>/<topic_filter>
    • <name> はキュー名(必須)。
    • <topic_filter> は既存キューにサブスクライブする場合は省略可能。
    • 自動作成が有効な場合、$queue/<name>/<topic_filter> により指定されたトピックフィルターでキューを新規作成できます。

    キューサブスクリプションは通常のMQTTサブスクリプションとは独立して動作し、メッセージキューのコンシューマーメカニズムで処理されます。

  • 最新値セマンティクス

    キュー宣言時にキューキー式を設定することで有効化できるオプション機能です。EMQXはメッセージごとにキューキーを抽出し、同じキーの新しいメッセージが未消費の既存メッセージを上書きします。状態管理や設定更新のように最新値のみが重要なケースに適しています。

    詳細はキューキー式をご覧ください。

  • キュー宣言

    耐久性のあるキューを作成し、トピックフィルター、配信戦略、保持制限、キー式などの設定で動作を定義するプロセスです。

  • キュー削除

    キューとその保存されたメッセージおよび関連状態をすべて削除する操作です。

  • キューのプロパティ

    メッセージ保持時間や配信戦略など、キューの動作を制御するカスタマイズ可能な設定です。

EMQXのメッセージキュー機能は疎結合な拡張として実装されており、内部フックを使ってパブリッシュやサブスクライブ操作をインターセプトします。これらのフックはレジストリやストレージ層と連携し、メッセージの永続化と配信を信頼性高く行います。

メッセージキュー内のすべてのメッセージは、パブリッシュやサブスクライブ時のQoSレベルに関わらずQoS 1(少なくとも1回配信)で配信されます。これにより信頼性の高い配信とキューの配信動作の統一が保証されます。

  • メッセージ永続化

    サブスクライバーが接続していなくてもメッセージは保持されます。デフォルトでは最新値セマンティクスが適用されます。キー式を設定しない通常のキューでは、受信順にメッセージが保存されます。

  • メッセージキューメッセージDB
    キューにパブリッシュされた実際のメッセージを保存し、EMQXの耐久ストレージ上に構築されています。

EMQXのメッセージキュー機能は疎結合な拡張として実装され、内部フックを使ってパブリッシュおよびサブスクライブ操作をインターセプトします。これらのフックはレジストリやストレージ層と連携し、メッセージの永続化と配信を信頼性高く実現します。

  • メッセージキューコンシューマー
    キューからメッセージを取得し、設定されたディスパッチ戦略に基づいて接続中のサブスクライバーに配信します。

  • メッセージキューサブスクリプションレジストリ
    どのチャネル(クライアント)がどのキューにサブスクライブしているかを追跡し、各チャネルのコンテキストにサブスクリプション状態を保存します。

  • メッセージキューレジストリ:すべてのメッセージキューのライフサイクルを管理し、キューの作成、削除、検索を担当します。

  • メッセージキューメッセージDB:キューにパブリッシュされた実際のメッセージを保存し、EMQXの耐久ストレージ上に構築されています。

  • メッセージキュー状態ストレージ:消費進捗やキューメタデータ(TTL、プロパティなど)を永続化します。

  • メッセージキューコンシューマー:キューからメッセージを取得し、接続されたサブスクライバーに配信戦略に従って配信します。

  • メッセージキューサブスクリプションレジストリ:どのチャネル(クライアント)がどのキューにサブスクライブしているかを追跡し、各チャネルのコンテキストにサブスクリプション状態を保存します。

  • メッセージキューフック:パブリッシュおよびサブスクライブイベントにフックし、メッセージをキューやコンシューマーにルーティングします。

メッセージキューデータフローダイアグラム

以下の図は主要コンポーネント間のデータフローを示しています。

message-queue-data-flow

パブリッシュのワークフロー

  1. クライアントが some/topic のような通常のトピックにメッセージをパブリッシュします。
  2. 内部のMQフックがトリガーされ、メッセージを処理します。
  3. フックはメッセージキューレジストリを参照し、パブリッシュされたトピックにマッチするキューを検索します。
  4. マッチするキューがあれば、メッセージはキューのメッセージDBに書き込まれます。

サブスクライブと消費のワークフロー

  1. クライアントが $queue/<name> または $queue/<name>/<topic_filter> を使ってキューにサブスクライブします。
  2. MQフックがトリガーされ、サブスクリプションを処理します。
  3. フックはキュー名でキューを解決し、クライアントセッションのコンテキスト内にサブスクリプションを初期化し、メッセージキューコンシューマーへの接続を確立します。
  4. キューに対応するコンシューマープロセスが存在しない場合、新たにメッセージキューコンシューマーが起動されます。
  5. コンシューマーはメッセージ消費の進捗を復元し、メッセージDBからデータの取得を開始します。
  6. コンシューマーは設定された配信戦略に従い、受信したメッセージをサブスクライバーのクライアントセッションに配信します。
  7. サブスクライバーのクライアントセッションは標準のMQTTメカニズムを通じてクライアントにメッセージを届けます。

メッセージキューのコア機能

EMQXのメッセージキュー機能は、信頼性が高く疎結合で設定可能なメッセージ配信を実現する一連のコア機能を提供します。

  • メッセージのエンキュー

    キューの設定トピックフィルターにマッチするトピックにパブリッシュされたメッセージは自動的にキューに格納されます。

    キューキー式(最新値セマンティクス)を設定している場合、EMQXは各メッセージに対して式を評価します。

    • キーが抽出できれば、同じキーの未消費メッセージを置き換えます。
    • 最新値キューでキーが評価できない場合、そのメッセージは破棄されます。
  • メッセージのデキュー

    サブスクライブしたクライアントは設定された配信戦略に従ってキューからメッセージを受け取ります。メッセージキュー内のすべてのメッセージはQoS 1(少なくとも1回配信)で配信され、信頼性を確保します。クライアントがメッセージをアックすると、キューから削除されます。

    • キーが導出されれば、同じキーの未消費メッセージを置き換えます。
    • 最新値キューでキーが評価できなければ、そのメッセージは破棄されます。

    メッセージをサブスクライバー間でどのように分配するかを定義できます。

    • random:ランダムに分配

    • round_robin:順番にローテーション

    • least_inflight:処理中メッセージが少ないサブスクライバーを優先

    • random:ランダムに配布

    • round_robin:利用可能なサブスクライバー間で順番に配布

    • least_inflight:進行中メッセージ数が少ないサブスクライバーを優先

    キューの作成、更新、削除、照会などのフルライフサイクル操作はREST APIで利用可能です。

利用ケース

メッセージキューは、デバイスやコンシューマーが常時オンラインでないIoTやイベント駆動型アプリケーションで重要な信頼性の高い非同期メッセージングパターンを実現します。

  • デバイスコマンドキューイング:クラウドアプリケーションがIoTデバイス向けコマンドをキューに蓄積し、デバイスがオフラインでもコマンドが失われないようにする。
  • バッチ処理:大規模データや処理を小さなタスクに分割し、ワーカークライアントに並列または遅延処理で配布する。
  • センサーデータ処理:高頻度のセンサーデータを一時的にキューに蓄積し、後でバッチ処理や集約、解析を行う。
  • 最新設定の配信:デバイスが常に最新の設定コマンドを取得・処理するようにし、同じ設定項目/キーの古い未処理コマンドはキュー内で上書きまたは無効化する。

関連機能のリファレンス

メッセージキューはMQTTを基盤とし、EMQXの他のメッセージング機能を補完します。

  • 共有サブスクリプション:複数サブスクライバー間でメッセージを分配しますが、クライアントがオンラインでない場合はメッセージを保持しません。
  • 保持メッセージ:トピックごとに最後のメッセージを保持しますが、新規サブスクライバーに1つの保持メッセージのみを配信します。
  • MQTT耐久セッション:個々のクライアントのセッション状態(サブスクリプションやQoS 1/2メッセージ)を再接続間で保持します。
  • ルールエンジン:SQLライクなルールでキュー内メッセージをフィルタリング・処理し、変換や転送を行います。

互換性に関する注意

以下はEMQX 6.1.1で導入された互換性に関するポイントです。

名前付きキュー

EMQX 6.1.1以降、すべてのキューは明示的に名前付きリソースとなり、キューの識別は一意の名前に基づきます。トピックフィルターは識別子ではありません。

旧キュー

以前に作成された名前なしキューは、トピックフィルターから派生した名前が自動的に割り当てられます。

派生名の形式:

/<topic_filter>

この派生名は既存の $q/<topic_filter> サブスクリプションとの後方互換性を維持します。

非推奨のプレフィックス

$q プレフィックスは旧サブスクリプションで引き続きサポートされますが非推奨です。

新規環境では以下を使用してください。

$queue/<name>

共有サブスクリプション制限

メッセージキューが有効な場合、$queue/ プレフィックスはキューサブスクリプション専用となり、共有サブスクリプションでは使用できません。

次のステップ

メッセージキューの基本を理解したら、実際の利用方法を学びましょう。