Skip to content

MQTT 共有サブスクライブ

EMQX は MQTT の共有サブスクライブ機能を実装しています。共有サブスクライブとは、複数のサブスクライバー間で負荷分散を実現するためのサブスクライブモードです。クライアントは複数のサブスクリプショングループに分けられ、メッセージはすべてのサブスクリプショングループに転送されますが、各グループ内のクライアントには同時に1つのメッセージしか届きません。元のトピックにプレフィックスを付けることで、複数のサブスクライバーに対して共有サブスクライブを有効にできます。EMQX は共有サブスクライブのプレフィックスとして、グループ用の $share/<group-name>/ とグループ用でない $queue/ の2種類の形式をサポートしています。

共有サブスクライブの2つのプレフィックス形式の例は以下の通りです。

プレフィックス形式プレフィックス実際のトピック名
グループ用共有サブスクライブ$share/abc/t/1$share/abc/t/1
グループ用でない共有サブスクライブ$queue/t/1$queue/t/1

クライアントツールを使って EMQX に接続し、このメッセージングサービスを試すことができます。本節では共有サブスクライブの仕組みを紹介し、MQTTX DesktopMQTTX CLI を使ってクライアントをシミュレートし、共有サブスクライブ機能を試す方法を説明します。

グループ用共有サブスクライブ

元のトピックに $share/<group-name> のプレフィックスを付けることで、サブスクライバーのグループ単位で共有サブスクライブを有効にできます。グループ名は任意の文字列を指定可能です。EMQX は異なるグループに同時にメッセージを転送し、同じグループに属するサブスクライバーは負荷分散されてメッセージを受信します。

例えば、サブスクライバー s1s2s3 がグループ g1 のメンバーで、サブスクライバー s4s5 がグループ g2 のメンバーであり、すべてのサブスクライバーが元のトピック t1 をサブスクライブしているとします。共有サブスクライブのトピックは $share/g1/t1$share/g2/t1 となります。EMQX が元のトピック t1 にメッセージ msg1 をパブリッシュすると:

  • EMQX は msg1 をグループ g1g2 の両方に送信します。
  • s1s2s3 のうちのいずれか1つだけが msg1 を受信します。
  • s4s5 のうちのいずれか1つだけが msg1 を受信します。
グループ用共有サブスクライブ

グループ用でない共有サブスクライブ

$queue/ プレフィックス付きの共有サブスクライブトピックは、グループに属さないサブスクライバー向けです。これは $share プレフィックス付き共有サブスクライブトピックの特別なケースと考えられます。すべてのサブスクライバーが $share/$queue のようなサブスクリプショングループに属していると理解できます。

グループ用でない共有サブスクライブ

共有サブスクライブとセッション

クライアントがパーシステントセッションを持ち、共有サブスクライブにサブスクライブしている場合、クライアントが切断している間もセッションは共有サブスクライブトピックにパブリッシュされたメッセージを受信し続けます。クライアントが長時間切断状態にあり、メッセージのパブリッシュ頻度が高い場合、セッション状態内の内部メッセージキューがオーバーフローする可能性があります。この問題を避けるために、共有サブスクライブにはクリーンセッション(clean_session=true)の使用を推奨します。クリーンセッションはクライアント切断後すぐに期限切れになります。

MQTT v5 を使用するクライアントでは、セッション有効期限を短く設定(0以外の場合)することが望ましいです。これにより、クライアントは一時的に切断しても、切断期間中にパブリッシュされたメッセージを再接続時に受信できます。セッションが期限切れになると、送信キュー内の QoS1 と QoS2 のメッセージや、インフライトキュー内の QoS1 メッセージは同じグループ内の他のセッションに再配信されます。最後のセッションが期限切れになると、すべての保留中メッセージは破棄されます。

パーシステントセッションの詳細は、MQTT Persistent Session and Clean Session Explained をご参照ください。

共有サブスクライブ戦略の設定

EMQX は共有サブスクリプショングループ内でメッセージをどのようにサブスクライバーに分配するかを細かく制御できます。この動作は共有サブスクライブ戦略によって定義され、メッセージを受け取るサブスクライバーの選択アルゴリズムを決定します。この戦略を調整することで、異なるワークロード、クライアントの配置パターン、クラスター構成に応じてメッセージフローを最適化できます。

共有サブスクライブ分配戦略

共有サブスクライブ分配戦略は、EMQX が共有サブスクリプショングループ内のサブスクライバーにメッセージをどのように配信するかを決定します。

mqtt.shared_subscription_strategy オプションで設定可能です。

戦略説明
randomグループ内のサブスクライバーをランダムに選択してメッセージを配信します。全体的に均等な分配を実現しますが、決定論的ではありません。
round_robin(デフォルト)各共有サブスクリプショングループ内で順番にサブスクライバーにメッセージを配信します。EMQX は各パブリッシャークライアントごとに別々のラウンドロビン位置を保持するため、異なるパブリッシャーからのメッセージが同じサブスクライバーに連続して配信されることがあります。
round_robin_per_groupround_robin と似ていますが、ラウンドロビンの進行状況を各ノードで独立して追跡します。これにより、クラスター内の異なるノードからパブリッシュされたメッセージが同じサブスクライバーに配信される可能性があります。各ノードが別々のパブリッシャーロードを処理するクラスター展開に有用です。
stickyサブスクライバーが切断またはセッション終了するまで、同じサブスクライバーに継続的にメッセージを配信します。この戦略は、可能な限りメッセージフローを単一のサブスクライバーに保つことで、不要な再処理やクライアント間の状態共有を回避します。
初期サブスクライバーは mqtt.shared_subscription_initial_sticky_pick 設定によって決定されます。
localメッセージが処理されるノードに接続されているサブスクライバーへの配信を優先します。ローカルサブスクライバーが存在しない場合は、クラスター内のサブスクライバーからランダムに選択します。これによりノード間のトラフィックとレイテンシを削減します。
hash_clientidパブリッシャーのクライアントIDのハッシュを使い、そのパブリッシャーからのすべてのメッセージを同じサブスクライバーに一貫してルーティングします。クライアント単位のデータストリームに対して決定論的なルーティングを提供します。
hash_topicパブリッシュされたトピックのハッシュを使い、同じトピック名のすべてのメッセージを同じサブスクライバーにルーティングします。トピックベースのメッセージシャーディングやステートフル処理に有用です。

EMQX はラウンドロビンの位置やスティッキーサブスクライバーの割り当てなどの分配状態をパブリッシャー接続ごとに追跡します。パブリッシャークライアントが切断して再接続すると、この状態はリセットされ再初期化されます。

初期スティッキーピック

共有サブスクライブ戦略が sticky に設定されている場合、EMQX は mqtt.shared_subscription_initial_sticky_pick に基づいてメッセージを受け取る初期サブスクライバーを決定します。

この設定は、EMQX がローカルサブスクライバーを優先するか、決定論的ルーティングのためにハッシュを使うか、ランダムに選択するかを制御します。

ダッシュボードでの分配戦略設定

EMQX ダッシュボードで以下の手順で設定できます:

  1. Management -> MQTT Settings -> General に移動します。
  2. Allowed Shared Subscription が有効になっていることを確認します。
  3. ドロップダウンリストから希望の Shared Subscription Strategy を選択します。デフォルトは round_robin です。
  4. 選択した戦略が sticky の場合は、適切な Shared Subscription Initial Sticky Pick 方法を選択します。デフォルトは random です。
  5. Save Changes をクリックします。

これらの設定は即時反映され、クライアント側の設定変更なしに EMQX のメッセージ分配ロジックを調整できます。

MQTTX Desktop で共有サブスクライブを試す

前提条件

以下の手順では、元のトピックに $share プレフィックスを付けて、異なるグループのサブスクライバーが同じトピックの共有サブスクライブを行い、共有サブスクライブからメッセージを受信する方法を示します。

本デモでは、1つのクライアント接続 demo をパブリッシャーとして作成し、トピック t/1 にメッセージをパブリッシュします。さらに、4つのクライアント接続をサブスクライバーとして作成し、Subscriber1Subscriber2Subscriber3Subscriber4 とします。サブスクライバーはグループ ab に分けられ、両グループともトピック t/1 をサブスクライブします。

  1. EMQX と MQTTX Desktop を起動し、New Connection をクリックしてパブリッシャー用のクライアント接続を作成します。

    • Name フィールドに Demo と入力します。
    • Host にローカルホストの 127.0.0.1 を入力します(本デモの例として)。
    • 他の設定はデフォルトのままにして Connect をクリックします。

    TIP

    MQTT 接続の作成に関する詳細な手順は MQTTX Desktop を参照してください。

    新規接続の一般設定
  2. New Connection をクリックして、4つのサブスクライバー用接続を作成します。Name をそれぞれ Subscriber1Subscriber2Subscriber3Subscriber4 に設定します。

  3. Connections ペインで各 Subscriber 接続を順に選択し、New Subscription をクリックして共有サブスクライブを作成します。以下のルールに従って Topic テキストボックスに正しいトピックを入力します。

    複数のサブスクライバーでグループを形成するには、サブスクライブするトピック t/1 の前にグループ名 {group} を追加します。全員が同じトピックをサブスクライブするには、グループ名の前に $share プレフィックスを追加します。

    New Subscription ウィンドウで:

    • Subscriber1Subscriber2Topic$share/a/t/1 を設定します。
    • Subscriber3Subscriber4Topic$share/b/t/1 を設定します。

    これらの例のトピックにおいて:

    • プレフィックス $share は共有サブスクライブであることを示します。
    • {group}ab で、任意の名前に変更可能です。
    • t/1 は元のトピック名です。

    他の設定はデフォルトのままにして Confirm ボタンをクリックします。

    新規共有サブスクライブ
  4. 先に作成した Demo 接続をクリックします。

    • トピック t/1 でメッセージを送信します。グループ a のクライアント Subscriber1 とグループ b のクライアント Subscriber4 がメッセージを受信するはずです。

      共有サブスクライブでメッセージ受信1
    • 同じメッセージを再度送信します。グループ a のクライアント Subscriber2 とグループ b のクライアント Subscriber3 がメッセージを受信するはずです。

      共有サブスクライブでメッセージ受信2

TIP

共有サブスクライブのメッセージがパブリッシュされると、EMQX は異なるグループに同時にメッセージを転送しますが、同じグループ内のサブスクライバーのうち1つだけがメッセージを受信します。

MQTTX CLI で共有サブスクライブを試す

  1. 4つのサブスクライバーを2つのグループに分けてトピック t/1 をサブスクライブします:

    bash
    # クライアント A と B はトピック `$share/my_group1/t/1` をサブスクライブ
    mqttx sub -t '$share/my_group1/t/1' -h 'localhost' -p 1883
    
    ## クライアント C と D はトピック `$share/my_group2/t/1` をサブスクライブ
    mqttx sub -t '$share/my_group2/t/1' -h 'localhost' -p 1883
  2. 新しいクライアントを使って、ペイロード 1234 の4つのメッセージを元のトピック t/1 にパブリッシュします:

    bash
    mqttx pub -t 't/1' -m '1' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '2' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '3' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '4' -h 'localhost' -p 1883
  3. 各サブスクリプショングループ内のクライアントが受信するメッセージを確認します:

    • サブスクリプショングループ1(A と B)とサブスクリプショングループ2(C と D)は同時にメッセージを受信します。
    • 同じグループ内のサブスクライバーのうち1つだけがメッセージを受信します。