Skip to content

メッセージ再送信

メッセージ再送信はMQTTプロトコル仕様の一部です。

このプロトコルでは、通信当事者であるサーバーおよびクライアントが送信するPUBLISHパケットは、それぞれのQoS(サービス品質)レベルの要件を満たす必要があります。具体的には以下の通りです:

  • QoS 1: メッセージが少なくとも1回は配信されることを意味します。 つまり、送信者はピアからの確認を受け取らない限り、常にメッセージを再送信します。これにより、同じQoS 1メッセージがMQTTプロトコルの上位層(サービスのアプリケーション層)で複数回受信される可能性があります。
  • QoS 2: メッセージが正確に1回だけ配信されることを意味します。 つまり、上位層ではメッセージが1回だけ受信されます。

QoS 1およびQoS 2のPUBLISHパケットはMQTTプロトコルスタック層で再送信されますが、以下の点に注意してください:

  • QoS 1メッセージの再送信が発生した場合、再送信されたPUBLISHパケットもMQTTプロトコルスタックの上位層で受信されます。
  • QoS 2メッセージは何度再送信されても、MQTTプロトコルスタックの上位層では1つのPUBLISHパケットのみ受信されます。

基本設定

メッセージが再送信されるシナリオは主に2つあります:

  1. PUBLISHパケットをピアに送信した後、指定された時間内に応答がない場合、パケットを再送信する。
  2. セッションを維持している間にクライアントが再接続した場合、EMQXは未応答のメッセージを自動的に再送信し、正しいQoS処理を保証する。

設定ファイルで以下のように設定可能です:

設定項目オプション値デフォルト値説明
retry_intervalduration-30sタイムアウト間隔を待ち、応答がなければメッセージを再送信する間隔

一般的には上記内容を理解していれば十分です。

EMQXがMQTTプロトコルの再送信をどのように処理するかの詳細は、本記事の後半をご参照ください。

プロトコル仕様と設計

再送信対象

まず、EMQXの再送信メカニズム設計を理解する前に、QoS 1およびQoS 2の送信プロセスを理解している必要があります。未確認の場合は以下をご参照ください。
MQTTv3.1.1 - QoS 1: At least once delivery
MQTTv3.1.1 - QoS 2: Exactly once delivery

ここでは簡単に異なるQoSにおける再送信対象を説明します。

QoS 1

QoS 1はメッセージが少なくとも1回配信されることを要求するため、送信者が確認メッセージを受け取るまで、MQTTプロトコル層でメッセージが継続的に再送信される可能性があります。

処理の概略は以下の通りです:

               PUBLISH
#1 Sender  --------------->  Receiver       (*)
               PUBACK
#2 Sender  <---------------  Receiver
  • 2つのパケットが関与し、送信者と受信者でそれぞれ1回ずつ送信動作が行われます。両パケットは同じPacketIdを持ちます。
  • 行末に*が付いている場合、送信者は確認メッセージの待機がタイムアウトした際に再送信を開始する可能性があります。

つまり、QoS 1メッセージはPUBLISHメッセージのみ再送信されます。

QoS 2

QoS 2はメッセージが正確に1回だけ配信されることを要求するため、より複雑な処理が必要です。処理の概略は以下の通りです:

               PUBLISH
#1 Sender  --------------->  Receiver       (*)
               PUBREC
#2 Sender  <---------------  Receiver
               PUBREL
#3 Sender  --------------->  Receiver       (*)
               PUBCOMP
#4 Sender  <---------------  Receiver
  • 4つのパケットが関与し、送信者と受信者でそれぞれ2回ずつ送信動作が行われます。これら4つのパケットはすべて同じPacketIdを持ちます。
  • 行末に*が付いている場合、送信者は確認メッセージの待機がタイムアウトした際に再送信を開始する可能性があります。

つまり、QoS 2メッセージはPUBLISHパケットとPUBRELパケットのみ再送信されます。

まとめると:

  • 再送信動作は、メッセージ送信後に指定時間内に期待する応答が受信できなかった場合にトリガーされます。
  • 再送信対象は以下の3種類のみです:
    • QoS 1のPUBLISHパケット
    • QoS 2のPUBLISHパケット
    • QoS 2のPUBRELパケット

EMQXがPUBLISHメッセージの受信者として動作する場合、再送信操作は不要です。

インフライトウィンドウと最大受信値

この概念の定義と説明については、Inflight Window and Message Queueをご参照ください。

これらの概念を導入する目的は以下の理解のためです:

  1. EMQXが送信者として動作する場合、再送信されるメッセージはインフライトウィンドウに格納されているメッセージである必要がある。
  2. EMQXが受信者として動作し、送信者がメッセージを再送信した場合:
    • QoS 1では、EMQXは直接PUBACKで応答する。
    • QoS 2では、EMQXは最大受信メッセージキューに格納されたPUBLISHまたはPUBRELパケットを解放する。

メッセージの順序

上記の概念は理解しておくべきですが、最も注意すべきはメッセージが再送信された後のメッセージ順序の変化、特にQoS 1メッセージの場合です。例を示します:

現在のインフライトウィンドウサイズが2に設定されているとし、EMQXがクライアントに対して4つのQoS 1メッセージを配信しようとします。途中でクライアントプログラムやネットワークに問題が発生した場合、送信プロセスは以下のようになります:

#1  [4,3,2,1 || ]   ----->   []
#2  [4,3 || 2, 1]   ----->   [1, 2]
#3  [4 || 3, 2]     ----->   [1, 2, 3]
#4  [4 || 3, 2]     ----->   [1, 2, 3, 2, 3]
#5  [ || 4]         ----->   [1, 2, 3, 2, 3, 4]
#6  [ || ]          ----->   [1, 2, 3, 2, 3, 4]

このプロセスは6ステップあり、左側はEMQXのメッセージキューとインフライトウィンドウを||で区切って示しています。右側はクライアントが受信したメッセージの順序を示しています。各ステップの意味は以下の通りです:

  1. ブローカーは4つのメッセージをメッセージキューに格納する。
  2. ブローカーは順に12を送信し、インフライトウィンドウに格納する。クライアントはメッセージ1にのみ応答し、送信ストリームに問題が発生して以降の応答は送信されない。
  3. ブローカーはメッセージ1の応答を受け取り、インフライトウィンドウから1を削除し、3を送信する。引き続き23の応答を待つ。
  4. 応答待ちがタイムアウトし、ブローカーはメッセージ23を再送信する。クライアントは再送信されたメッセージ23を受信し正常に応答する。
  5. ブローカーはインフライトウィンドウから23を削除し、メッセージ4を送信する。クライアントはメッセージ4を受信し応答する。
  6. これで全メッセージ処理が完了する。クライアントが受信したメッセージの順序は[1, 2, 3, 2, 3, 4]であり、順にMQTTプロトコルスタックの上位層に通知される。

重複したメッセージが存在しますが、これはプロトコル仕様に完全に準拠しています。各メッセージの最初の出現は順序通りであり、繰り返し受信されるメッセージ23には再送信であることを示す識別ビットが付与されます。

MQTTプロトコルとEMQXはこのトピックをOrdered Topicとして扱います。詳細は以下を参照してください:
MQTTv3.1.1 - Message ordering

これにより、同じトピックかつQoSレベルでメッセージが順序通りに配信および応答されることが保証されます。

なお、すべてのトピックのQoS 1およびQoS 2メッセージを厳密に順序付けたい場合は、インフライトウィンドウの最大長を1に設定する必要がありますが、クライアントのスループットは低下します。

関連設定

以下は上記メカニズムで使用されるすべての設定項目です。すべて設定ファイルに含まれています:

設定項目オプション値デフォルト値説明
mqueue_store_qos0booltrue, falsetrueQoS 0メッセージをメッセージキューに保存するかどうか
max_mqueue_leninteger>= 01000メッセージキューの長さ
max_inflightinteger>= 00インフライトウィンドウのサイズ。デフォルト0は無制限を意味する
max_awaiting_relinteger>= 00最大受信数。デフォルト0は無制限を意味する
await_rel_timeoutduration> 0300sMax Receiveでの解放待ちの最大タイムアウト値。超過するとメッセージは破棄される