Skip to content

Durable Sessions の設定と管理

本ドキュメントでは、EMQX における MQTT Durable Sessions 機能の設定、管理、および最適化に関するリファレンスと手順を提供します。セッションおよびストレージの設定も含みます。

設定パラメータ

MQTT Durable Sessions の設定は主に以下の2つのカテゴリに分かれています。

  • durable_sessions:MQTT クライアントのセッションに関する設定で、耐久ストレージからのデータ消費方法やデータ保持パラメータを含みます。
  • durable_storage:MQTT メッセージデータを保持する耐久ストレージシステムの設定を管理します。

Durable Sessions の設定

Dashboard で Durable Sessions のパラメータを設定できます。Dashboard の左メニューから Management -> MQTT Settings をクリックし、Durable Session タブを選択してパラメータを設定してください。

ダッシュボードのセッション設定
パラメータDashboard UI 表示名説明
durable_sessions.enableEnable Durable Sessionsセッションの耐久性を有効化します。この設定はホットコンフィグレーションでは変更できず、設定ファイルで指定する必要があります。変更を反映するには EMQX ノードの再起動が必要です。
durable_sessions.message_retention_periodMessage Retention PeriodDurable Sessions 内の MQTT メッセージの保持期間を定義します。注意:このパラメータはグローバル設定です。
durable_sessions.batch_sizeMessage Query Batch SizeDurable Sessions がストレージから消費するメッセージのバッチ最大サイズを制御します。
durable_sessions.idle_poll_intervalIdel Poll IntervalDurable Sessions が新しいメッセージをストレージに問い合わせる頻度を制御します。新しいメッセージが見つかると、クライアントのインフライトキューに空きがあれば即座に次のバッチを取得します。
durable_sessions.heartbeat_intervalSession Heartbeat Intervalセッションメタデータを保存する間隔を指定します。
durable_sessions.renew_streams_interval-セッションが新しいストリームをストレージに問い合わせる頻度を定義します。
durable_sessions.session_gc_intervalSession GC Intervalセッションを巡回し、期限切れのセッションを削除する間隔を指定します。

以下のパラメータは ゾーンごとにオーバーライド可能です。

  • durable_sessions.enable
  • durable_sessions.batch_size
  • durable_sessions.idle_poll_interval
  • durable_sessions.renew_streams_interval

Durable Storage の設定

<DS> は「durable storage(耐久ストレージ)」のプレースホルダーです。現在、利用可能な <DS> のパラメータは message です。

パラメータ説明
durable_storage.<DS>.data_dirEMQX がデータを保存するファイルシステム上のディレクトリ。
durable_storage.<DS>.n_shardsシャード数
durable_storage.<DS>.n_sitesサイト数
durable_storage.<DS>.replication_factorレプリケーションファクター は各シャードのレプリカ数を決定します。
durable_storage.<DS>.local_write_bufferメッセージのバッファリングに関するパラメータを含みます。詳細は ローカル書き込みバッファの設定 を参照してください。
durable_storage.<DS>.layoutEMQX がディスク上にデータを配置する方法を制御するパラメータを含みます。詳細は ストレージレイアウトの設定 を参照してください。

ローカル書き込みバッファの設定

EMQX はクライアントからの MQTT メッセージを耐久ストレージにバッチ単位で書き込み、スループットを最大化します。バッチ処理は durable_storage.<DS>.layout 設定サブツリーの以下のパラメータで制御されます。

パラメータ説明
max_itemsバッファのサイズがこの値に達したときにフラッシュされます。
flush_intervalバッファに少なくとも1件のメッセージがある場合、この間隔でフラッシュされます。

ストレージレイアウトの設定

ストレージレイアウトは EMQX がディスク上にデータをどのように配置するかを決定します。durable_storage.<DS>.layout.type パラメータを設定することで、新しい 世代で使用されるレイアウトを変更できます。この変更は既存の世代には影響しません。各レイアウトタイプの設定は durable_storage.<DS>.layout サブツリーに含まれます。現在利用可能なレイアウトタイプは wildcard_optimized のみです。

wildcard_optimized レイアウトタイプの設定

wildcard_optimized レイアウトは、多数の MQTT トピックに対するワイルドカードサブスクライブのマッチングを最適化することを目的としています。トピック構造に関する知識を自律的に蓄積し、軽量な機械学習アルゴリズムを活用してクライアントがサブスクライブしそうなワイルドカードトピックフィルターを予測します。その後、これらのトピックを統合されたストリームに整理し、一度のスイープで効率的に消費できるようにします。

パラメータ説明
bits_per_topic_levelトピックレベルのハッシュサイズを決定します。
epoch_bitsメッセージタイムスタンプ(マイクロ秒単位)の下位ビットを用いて計算されるエポック内のメッセージオフセットのビット数を定義します。
topic_index_bytesストリーム識別子のバイトサイズを指定します。

エポックの設定

Wildcard-optimized ストリームはエポックと呼ばれる時間区間に分割されます。各エポック内のメッセージは一度のスイープで処理できるため、効率とスループットが向上します。ただし、エポックが大きいと現在のエポック内のメッセージを即時に消費できず、レイテンシが増加します。

各エポックの時間間隔は以下の式で計算できます:epoch length (μs) = 2 ^ epoch_bits

Epoch Bitsエポック長さ
12 μs
24 μs
10約1 ms
17約100 ms
20約1 秒
21約2 秒
24約17 秒

デフォルトで epoch_bits は 20(約1秒)に設定されており、レイテンシと効率のバランスを取っています。この値を調整することで、レイテンシとスループットのトレードオフを微調整できます。

CLI コマンド

耐久ストレージの管理に利用できる CLI コマンドは以下の通りです。

emqx_ctl ds info

耐久ストレージの状態概要を表示します。

例:

bash
$ emqx_ctl ds info

THIS SITE:
D8894F95DC86DFDB

SITES:
.------------------.-------------------.----------.
: Site             : Node              : Status   :
:------------------:-------------------:----------:
: 5C6028D6CE9459C7 : 'emqx@n2.local'   : up       :
: D8894F95DC86DFDB : 'emqx@n1.local'   : up       :
: F4E92DEA197C8EBC : 'emqx@n3.local'   : (x) down :
`------------------`-------------------`----------`

SHARDS:
.-------------.------------------.-------------.
: DB/Shard    : Replicas         : Transitions :
:-------------:------------------:-------------:
:-messages/0--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/1--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/10-:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/11-:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/12-:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/2--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/3--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/4--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/5--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/6--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/7--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/8--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
:-messages/9--:------------------:-------------:
:             : 5C6028D6CE9459C7 :             :
`-------------`------------------`-------------`

このコマンドの出力には以下が含まれます。

  • THIS SITE:ローカル EMQX ノードが管理するサイトの ID。
  • SITES:既知のすべてのサイトのリスト。EMQX ノード名とそのステータスを含みます。
  • SHARDS:耐久ストレージのシャードと、そのレプリカが存在するサイト ID のリスト。

emqx_ctl ds set-replicas <DS> <Site1> <Site2> ...

クラスタ内で耐久ストレージのレプリカを保持するサイトのリストを設定します。実行すると、シャードをサイト間で公平に割り当てる操作計画を作成し、バックグラウンドで実行を続けます。

重要なお知らせ

耐久ストレージのレプリカリストの更新は、大量のデータをサイト間でコピーする可能性があるためコストがかかる場合があります。

例:

bash
$ emqx_ctl ds set-replicas messages 5C6028D6CE9459C7 D8894F95DC86DFDB F4E92DEA197C8EBC
ok

このコマンド実行後、ds info の出力は以下のようになる場合があります。

bash
$ emqx_ctl ds info

THIS SITE:
D8894F95DC86DFDB

SITES:
.------------------.-------------------.----------.
: Site             : Node              : Status   :
:------------------:-------------------:----------:
: 5C6028D6CE9459C7 : 'emqx@n2.local'   : up       :
: D8894F95DC86DFDB : 'emqx@n1.local'   : up       :
: F4E92DEA197C8EBC : 'emqx@n3.local'   : up       :
`------------------`-------------------`----------`

SHARDS:
.-------------.------------------.--------------------.
: DB/Shard    : Replicas         : Transitions        :
:-------------:------------------:--------------------:
:-messages/0--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/1--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/10-:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             :                  : + D8894F95DC86DFDB :
:-messages/11-:------------------:-------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/2--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/3--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             :                  : + D8894F95DC86DFDB :
:-messages/4--:------------------:-------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/5--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/6--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             :                  : + D8894F95DC86DFDB :
:-messages/7--:------------------:-------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/8--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             : D8894F95DC86DFDB :                    :
:-messages/9--:------------------:--------------------:
:             : 5C6028D6CE9459C7 : + F4E92DEA197C8EBC :
:             :                  : + D8894F95DC86DFDB :
`-------------`------------------`--------------------`

新たに追加された REPLICA TRANSITIONS セクションは保留中の操作を示します。すべての操作が完了すると、このリストは空になります。

emqx_ctl ds join <DS> <Site> / emqx_ctl ds leave <DS> <Site>

これらのコマンドは、耐久ストレージのレプリカサイトリストにサイトを追加または削除します。set_replicas コマンドと似ていますが、一度に1サイトずつ更新します。

例:

bash
$ emqx_ctl ds join messages B2A7DBB2413CD6EE
ok

詳細は Add Sites および Remove Sites を参照してください。

REST API

組み込みの Durable Sessions の管理および監視に利用可能な REST API エンドポイントは以下の通りです。

  • /ds/sites:既知のサイト一覧を取得します。
  • /ds/sites/:site:サイトの情報(ステータス、現在管理している EMQX ノード名など)を取得します。
  • /ds/storages:耐久ストレージの一覧を取得します。
  • /ds/storages/:ds:耐久ストレージおよびそのシャードの情報を取得します。
  • /ds/storages/:ds/replicas:耐久ストレージのレプリカを保持するサイトの一覧取得および更新を行います。
  • /ds/storages/:ds/replicas/:site:特定サイトの耐久ストレージレプリカの追加または削除を行います。

詳細は EMQX OpenAPI スキーマを参照してください。

メトリクス

Durable Sessions に関連する Prometheus メトリクスは以下の通りです。

emqx_ds_egress_batches

耐久ストレージへのメッセージバッチの書き込みが成功するたびにインクリメントされます。

emqx_ds_egress_messages

耐久ストレージへのメッセージの書き込み成功数をカウントします。

emqx_ds_egress_bytes

耐久ストレージに書き込まれたペイロードデータの総量をカウントします。注意:このメトリクスはメッセージペイロードのみを対象としているため、実際の書き込みデータ量はこれより多い場合があります。

emqx_ds_egress_batches_failed

耐久ストレージへの書き込みが何らかの理由で失敗するたびにインクリメントされます。

emqx_ds_egress_flush_time

耐久ストレージへのバッチ書き込みにかかる時間(μs)のローリング平均です。レプリケーション速度の重要な指標です。

emqx_ds_store_batch_time

ローカルの RocksDB ストレージへのバッチ書き込みにかかる時間(μs)のローリング平均です。emqx_ds_egress_flush_time と異なり、ネットワークレプリケーションのコストを除外しているため、ディスク I/O の効率を示す重要な指標です。

emqx_ds_builtin_next_time

耐久ストレージからメッセージバッチを消費するのにかかる時間(μs)のローリング平均です。

emqx_ds_storage_bitfield_lts_counter_seek および emqx_ds_storage_bitfield_lts_counter_next

これらのカウンターは「wildcard optimized」ストレージレイアウト固有のもので、ローカルストレージからのデータ消費効率を測定します。seek 操作は一般に遅いため、emqx_ds_storage_bitfield_lts_counter_next の増加速度が seek より速いことが望ましいです。

durable_storage.messages.layout.epoch_bits パラメータを増やすことで、この比率を改善できます。

emqx_ds_raft_db_shards_num

DB が分割されているシャード数を示します。

emqx_ds_raft_db_sites_num

DS DB がレプリケートされている現在および割り当てられたサイト数を示すゲージです。

通常、現在のサイト数は割り当てられたサイト数と等しいはずです。長期間異なる場合は、レプリカ転送に問題がある可能性があります。

emqx_ds_raft_shard_replication_factor

DS DB シャードのレプリカセット内のレプリカ数を追跡します。

この数が設定されたレプリケーションファクターを下回ると、耐久性が危険にさらされます。レプリカをより多くのサイトに再配置することを検討してください。

emqx_ds_raft_db_shards_online_num

このノードでアクティブに管理されている DS DB シャード数を追跡します。

この数は、このノードに割り当てられているシャード数と等しいはずです。異なる場合は可用性に問題がある可能性があります。詳細はログを確認してください。

emqx_ds_raft_shard_transition_queue_len

DS DB シャードの保留中のレプリカセット遷移数を追跡します。

この数が長期間ゼロでない場合は、レプリカ転送に問題があります。

emqx_ds_raft_shard_transitions

DB シャードのレプリカセット遷移の開始/完了/スキップ/クラッシュした回数をカウントします。

クラッシュした遷移は常にゼロであるべきです。そうでない場合はログを確認してください。

emqx_ds_raft_shard_transition_errors

DB シャードのレプリカセット遷移のオーケストレーション中に発生した一時的なエラーの数をカウントします。

このカウンターが増加する場合は、レプリカ転送に問題があります。ログを確認してください。

emqx_ds_raft_snapshot_reads

シャードがスナップショットレプリケーションのソースであった際のスナップショット読み取りの開始/完了回数をカウントします。

emqx_ds_raft_snapshot_read_errors

ソース DS DB シャードでのスナップショット読み取り中に発生し、スナップショットレプリケーションが中断されたエラーの数をカウントします。

エラーは発生しないことが期待されます。原因はログで確認してください。

emqx_ds_raft_snapshot_read_chunks

スナップショット転送のソース DS DB シャードで読み取られ、受信側に転送された個々のチャンク数をカウントします。

emqx_ds_raft_snapshot_read_chunk_bytes

ソース DS DB シャードでチャンクとして読み取られたバイト数をカウントします。

emqx_ds_raft_snapshot_writes

シャードがスナップショットレプリケーションの受信側であった際のスナップショット書き込みの開始/完了回数をカウントします。

emqx_ds_raft_snapshot_write_errors

受信側 DS DB シャードへのスナップショット書き込み中に発生し、スナップショットレプリケーションが中断されたエラーの数をカウントします。

これも増加しないことが期待されます。詳細はログを確認してください。

emqx_ds_raft_snapshot_write_chunks

ソース DS DB シャードから受信し、受信側に書き込まれた個々のチャンク数をカウントします。

emqx_ds_raft_snapshot_write_chunk_bytes

受信側 DS DB シャードにチャンクとして書き込まれたバイト数をカウントします。

emqx_ds_raft_current_timestamp_us

シャードサーバーが現在レプリケートしている最新の操作タイムスタンプ(マイクロ秒単位)を追跡します。

通常、各レプリカは同じタイムスタンプを持つはずです。異なる場合はレプリケーションに問題があります。

emqx_ds_raft_rasrv_state_changes

Raft サーバーが候補者/フォロワー/リーダーに変わった回数をカウントします。

頻繁な状態変化は不安定の兆候です。ログを確認してください。