Durable Storage の設計
EMQX 6.0 では、MQTT メッセージ配信の高い信頼性と永続性を保証するために設計された、専用のデータベース抽象化レイヤーである Optimized Durable Storage(DS)を導入しました。DS はストリーミングサービス(Kafka など)とキー・バリュー・ストアの強みを組み合わせ、MQTT データの保存、再生、管理に最適化された堅牢で高性能な基盤を提供します。
アーキテクチャ:バックエンドとストレージ階層
Durable Storage は実装に依存しない設計で、バックエンドという概念を用いて異なるデータベース管理システムにまたがってデータを保存可能にしています。
組み込みバックエンド
EMQX はサードパーティサービスに依存しない2つの組み込みバックエンドを提供しています:
builtin_localバックエンドは RocksDB をストレージエンジンとして使用し、単一ノードのデプロイ向けです。builtin_raftバックエンドはbuiltin_localを拡張し、クラスタリングと異なるサイト間でのデータレプリケーションをサポートします。
データストレージ階層
EMQX の組み込み耐久性機能を支えるデータベースストレージエンジンは、階層的な構造でデータを整理しています。以下の図は、EMQX クラスター全体に分散された Durable Storage データベースの配置を示しています。

内部的に、DS は水平スケーラビリティと時間的パーティショニングの両方を考慮した多層階層構造でデータを管理します。この構造はアプリケーションからは透過的であり、分散した EMQX ノード間で効率的なデータ管理を実現します。
DS の階層構造は以下のように表現できます:
Database (DB)
データベースはデータの最上位の論理コンテナです。各 DS データベースは独立しており、自身のシャード、スラブ、ストリームを管理し、必要に応じて作成、管理、削除が可能です。例として:
- Sessions DB は耐久セッション状態を保存します。
- Messages DB は対応する MQTT メッセージデータを保持します。
単一の EMQX クラスターは複数の DS データベースをホストできます。
Shard
シャードは Durable Storage データベースの水平パーティションです。データはパブリッシャーのクライアント ID に基づいてシャード間に分散され、並列処理と高可用性を実現します。各 EMQX ノードは1つ以上のシャードをホストでき、シャードの総数は EMQX の初回起動時に設定される n_shards パラメータで決まります。
シャードはレプリケーションの基本単位でもあります。各シャードは durable_storage.messages.replication_factor 設定に従い複数ノードにレプリケートされ、すべてのレプリカが同一のメッセージセットを保持することで冗長性とフォールトトレランスを確保します。
Generation
ジェネレーションは論理的かつ時間ベースのデータベースパーティションです。異なる期間に書き込まれたデータは別々のジェネレーションにまとめられます。新しいメッセージは常に現在のジェネレーションに書き込まれ、古いジェネレーションは不変かつ読み取り専用になります。EMQX は以下の主な目的で定期的に新しいジェネレーションを作成します:
- 後方互換性とデータマイグレーション: 新しいデータは改善されたエンコーディングなどで新ジェネレーションに追記され、古いジェネレーションは不変のまま読み取り専用となります。
- 時間ベースのデータ保持: 各ジェネレーションが特定の時間範囲に対応するため、期限切れデータはジェネレーション単位で効率的に削除可能です。
ジェネレーションはスラブと概念的に関連しますが、物理的なストレージ単位ではありません。代わりに、各シャード内のスラブを整理する時間的境界を定義します。
また、ジェネレーションは設定されたストレージレイアウトに応じて内部構造やデータ保存方法が異なる場合があります。現時点では DS は高スループットのワイルドカードおよび単一トピックサブスクライブに最適化された単一のレイアウトをサポートしています。将来的には異なるワークロード向けの追加レイアウトが導入される予定です。新しいジェネレーションで使用されるレイアウトは durable_storage.messages.layout パラメータで設定し、各レイアウトエンジンは独自の設定オプションを提供します。
Slab
スラブはシャード ID とジェネレーション ID の両方で識別される物理的なデータパーティションです。各スラブは1つ以上の Durable Storage ストリームの耐久コンテナとして機能します。スラブ内のすべてのデータは同一のエンコーディングスキーマを共有し、追加のメタデータ保存を不要にします。スラブ内では原子性と一貫性が保証されます。
例:shard 2, gen 3 はそのジェネレーションの時間範囲内に書き込まれたすべてのストリームを格納する特定のスラブを表します。
Stream
Durable Storage ストリームは各スラブ内のバッチ処理およびシリアライズの論理単位です。ストリームは類似したトピック構造を持つ Topic–Timestamp–Value (TTV) の組をグループ化し、時間順かつ決定論的なチャンクでの読み取りを可能にします。1つの Durable Storage ストリームは複数のトピックのメッセージを含む場合があり、異なるストレージレイアウトはトピックをストリームにマッピングするための異なる戦略を適用します。
Durable Storage ストリームは Durable Storage におけるサブスクライブおよびイテレーションの基本単位でもあり、ワイルドカードトピックフィルターの効率的な処理と順序付けられたデータの一貫した再生を可能にします。耐久セッションはストリームからバッチ単位でメッセージを読み取り、バッチサイズは durable_sessions.batch_size 設定で制御されます。
Topic–Timestamp–Value
最小の保存単位で、単一の MQTT メッセージを表します。各 TTV は以下を含みます:
- Topic: MQTT のセマンティクスに準拠。
- Timestamp: 書き込み時刻または論理的順序キー。
- Value: 任意のバイナリデータ。
Durable Storage データベースグループ
EMQX 6.0.2 以降、Durable Storage はリソース管理と運用安全性向上のためにデータベースグループの概念を導入しました。
データベースグループは、ノード上の1つ以上の Durable Storage データベースに対して、ディスク容量やメモリバッファなどのストレージリソースを統合的に管理します。デフォルトでは、各 Durable Storage データベースは自身のデータベースグループに割り当てられ、その名前はデータベース名と同じであり、従来の動作を維持します。
データベースグループはデータの構造やアクセス方法を変更しません。論理データモデル(シャード、ジェネレーション、スラブ、ストリーム)はそのままです。リソース管理のためのガバナンス層として機能し、単一データベースのグループであってもクォータ制御やリソース会計の一貫した拡張可能な境界を確立します。
設計動機
Durable Storage の永続データは RocksDB の SST(Stored String Table)ファイルに保存されます。書き込み先行ログはサイズが制限されますが、SST ファイルは新規データの書き込みに伴い無制限に増大する可能性があります。
データベースグループは、同一ノード上で稼働する複数の Durable Storage データベースの RAM とディスク使用量を明示的に制御し、リソース消費の境界を明確にするために導入されました。
データベースグループは以下の課題に対応します:
- 複数データベースで共通のディスク使用制限を共有可能にする
- データ永続化前の書き込み受け入れ制御を強制する
- グループ単位の可観測性の基盤を提供する
データベースグループは、特定ノード上にレプリカを持つすべての Durable Storage シャードのリソース使用を制限します。RAM 制限はノードローカルで適用され、グループの総メモリ消費に直接影響します。ディスク制限はローカルレプリカが書き込む SST ファイルに適用されます。データはレプリケートされるため、ディスク使用量は論理的データサイズではなく物理的ストレージ消費を表します。
データベースグループモデル
各 Durable Storage データベースは必ず1つのデータベースグループに属します。複数のデータベースが同一グループに属することも可能で、グループ内のすべてのデータベースは同一のストレージバックエンドを使用しなければなりません。
データベースグループは以下の共有リソースを所有・管理します:
- SST ファイルのディスク使用量(ソフトクォータ)
- RocksDB の書き込みバッファ(memtable)メモリ
- RocksDB のバックグラウンドスレッドプール
リソース使用状況はデータベースやシャード単位ではなく、グループ単位で追跡・制御されます。
概念的には、データベースグループは以下の階層を導入します:
Database Group
└── Database (DB)
└── Shards
└── Slabs
└── Streams
└── TTVストレージクォータ
Durable Storage はディスク使用量制御の主な手段としてソフトクォータを使用します。
ソフトクォータの適用
ストレージクォータは Durable Storage リーダーが書き込みトランザクションを受け入れる前に適用されます:
- 書き込みを含むトランザクションが送信されると、リーダーはデータベースグループの現在の SST ディスク使用量をチェックします。
- クォータを超過する場合、リーダーはトランザクションを拒否します。受け入れた場合はすべてのレプリカに一貫してレプリケート・適用されます。
- 読み取り専用トランザクションは引き続き受け入れられます。
- データ削除のみのトランザクションも受け入れられます。
書き込みパス
DS へのデータ書き込みは、append-only モードまたは ACID トランザクションのいずれかを使用できます。
Append-Only モード
このモードはデータの追記のみをサポートし、高スループットシナリオで最小限のオーバーヘッドを提供します。
ACID トランザクション
トランザクションは 楽観的同時実行制御(OCC) に基づき、クライアントが通常は競合しないデータサブセットを操作すると仮定します。競合が発生した場合、1つのトランザクションのみがコミットに成功し、他は中止され再試行されます。
トランザクションの流れ:
- 開始: クライアントプロセス(Tx)がリーダーノードにトランザクションコンテキスト(リーダーの term と最後にコミットされたシリアル番号を含む)作成を要求します。
- 操作: Durable Storage トランザクションは Erlang 関数で表現されます。この関数内でクライアントはデータを読み取り(アクセスしたトピックと時間範囲の情報がトランザクションコンテキストに追加される)、書き込みや削除をスケジュールできます。コミット前条件(特定の TTV の存在・非存在チェック)も設定可能です。読み取りは即時実行され、書き込み・削除はコミット・レプリケーション完了時に実際に反映されます。
- 送信と検証: クライアントは操作リストをリーダーに送信します。
- リーダーは最新のデータスナップショットに対してコミット前条件を検証します。
- 読み取りが最近の書き込みと競合しないかもチェックします。
- 「調理(準備)」とログ記録: 成功するとリーダーはトランザクションを「調理」します:
- 書き込まれる各 TTV をストリームに割り当て、必要に応じて新規ストリームを作成します。
- すべてのレプリカに決定論的に適用可能な低レベルのストレージ変異リストを作成します。
- コミット: 「調理済み」トランザクションのバッチが Raft ログ(
builtin_raft)または RocksDB 書き込み先行ログ(WAL)に追加されます。 - 結果通知: 成功時にトランザクションプロセスに通知されます。競合があればトランザクションは中止され再試行されます。
書き込みフラッシュ制御:
バッファを Raft ログにフラッシュする頻度は以下で制御されます:
flush_interval:調理済みトランザクションがバッファ内に留まる最大時間max_items:保留中トランザクションの最大数idle_flush_interval:一定時間新規データが追加されない場合の早期フラッシュ
以下は builtin_raft バックエンド内のトランザクションライフサイクルの概要です。

読み取りパス
DS からのデータ読み取りはストリームを中心に行われます。
- MQTT トピックのデータにアクセスするには、まず
get_streamsAPI を使ってトピックに関連付けられたストリームの一覧を取得します。この間接的な手法により、DS は類似トピックをグループ化しメタデータ量を最小化します。次に、各ストリームに対して指定した開始時刻で イテレーター を作成します。イテレーターはストリーム内の読み取り位置を追跡する小さなデータ構造です。 nextAPI を使ってデータを読み取り、データチャンクと次のチャンクを指す更新済みイテレーターが返されます。
ワイルドカードトピックフィルターによる読み取り
ワイルドカードトピックフィルターへの効率的なサブスクライブを可能にするため、DS は類似構造のトピックを同じストリームにグループ化します。これは Learned Topic Structure(LTS)アルゴリズムを用い、トピックを 静的 部分と 可変 部分に分割することで実現されます。
- 例: クライアントが
metrics/<hostname>/cpu/socket/1/core/16にデータをパブリッシュする場合、十分なデータがあれば LTS アルゴリズムは静的トピック部分をmetrics/+/cpu/socket/+/core/+と推定し、ホスト名、ソケット、コアを可変部分として扱います。 - 利点: これにより
metrics/my_host/cpu/#やmetrics/+/cpu/socket/1/core/+といった効率的なクエリが可能になります。
リアルタイムサブスクリプション
読み取り側はサブスクリプション機構を使ってリアルタイムにデータにアクセスすることも可能です。subscribe API はイテレーターに基づき、クライアントがポーリングする代わりに DS がデータをプッシュします。
DS は2つのサブスクライバーのプールを管理しています:
- キャッチアップサブスクライバー は過去データを読み取り、終端に達するとリアルタイムサブスクライバーになります。
- リアルタイムサブスクリプション はイベントベースで、新規データが DS に書き込まれたときのみアクティブになります。
両プールはストリームとトピックごとにサブスクライバーをグループ化し、複数サブスクライバーへのリソースを共有します。この手法により、ディスク読み取り時の IOPS を節約し、リモートクライアントへの送信時のネットワーク帯域も削減します。メッセージのバッチ、サブスクリプション ID のリスト、スパースなディスパッチマトリックスがクラスター内のリモートノードに送信され、そこからローカルクライアントにメッセージが配信されます。

さらなる情報
Durable Storage は EMQX の高信頼性および永続性関連機能の中核データ基盤として機能し、上位レイヤーの機能に対して統一されたストレージ、再生、一貫性保証を提供します。主な機能には以下があります:
- MQTT 耐久セッション:セッション状態と未配信メッセージを永続化する DS ベースの仕組み。
- メッセージキュー:EMQX クラスター全体で順序付けられたメッセージ配信、メッセージ再生、高可用性を提供する組み込みメッセージキュー機能。
- 共有サブスクリプション:同一グループ内の複数サブスクライバー間でメッセージを分散するロードバランシングサブスクリプション機構。