MQTT Streams クイックスタート
このページでは、EMQX 6.1 の MQTT Streams 機能の使い方を説明します。MQTTX を使ってクライアントをシミュレートし、EMQX ダッシュボードからストリームを作成・管理し、メッセージの保存、再生、圧縮の動作を体験します。
目的
このクイックスタートでは、EMQX MQTT Streams が以下を実現する方法を示します。
- サブスクライバーの有無にかかわらずメッセージを永続化する
- タイムスタンプに基づく再生をサポートする
- 状態指向のメッセージングのための Last-Value セマンティクスを有効にする
$stream/プレフィックスのサブスクライブによるストリームの自動作成を可能にする
前提条件
開始前に以下を準備してください。
- EMQX 6.1 以上が稼働していること
- MQTTX(または MQTT 5.0 対応のクライアント)
- EMQX ダッシュボードへのアクセス(デフォルト:
http://localhost:18083)
MQTT Streams 基本機能のテスト(レギュラーストリーム)
このセクションでは、MQTT Streams がメッセージを保存し、コンシューマーが過去のデータを再生できることを示します。
前提条件
開始前に、MQTT Streams 機能が有効であり、自動作成の設定が本例に影響しないことを確認してください。
左メニューの Streams に移動します。
メッセージストリームが無効の場合は、Settings をクリックします。Management -> MQTT Settings -> Streams ページにリダイレクトされます。
Enable Streams スイッチをオンにします。
自動作成設定を確認し、Regular Stream が使用されるようにします。
- Enable Auto Create Streams が無効、または
- Auto Create Stream Type が Regular Stream に設定されていること。
これにより、ストリームが Last-Value ストリームとして自動作成されることを防ぎます。Last-Value ストリームはキーごとに最新のメッセージのみを保持します。
変更した場合は、Save Changes をクリックして適用します。

ステップ 1: 名前付きストリームの作成
左メニューの Streams に移動します。
ページ上の Create Stream をクリックするか、右上の Create をクリックします。
Create Stream ダイアログで以下の設定を行います。
- Name:
my_stream - Topic Filter:
demo/stream - Last-Value Semantics: 無効
- Stream Key Expression:
message.from
他のオプションはデフォルトのままにします。
- Name:
Create をクリックします。

ステップ 2: メッセージのパブリッシュ
MQTTX CLI を使ってパブリッシャークライアントをシミュレートします。
MQTTX CLI がインストールされていることを確認します。詳細は インストール を参照してください。
EMQX に接続します。
bashmqttx conn -h 'localhost' -p 1883トピック
demo/streamに QoS 1 で複数のメッセージをパブリッシュします。例:
bashmqttx pub -t 'demo/stream' -h 'localhost' -p 1883 -q 1 -m '{"value": 1}' mqttx pub -t 'demo/stream' -h 'localhost' -p 1883 -q 1 -m '{"value": 2}' mqttx pub -t 'demo/stream' -h 'localhost' -p 1883 -q 1 -m '{"value": 3}'期待される出力:
bash✔ Connected ✔ Message published
レギュラーストリームなので、すべてのパブリッシュされたメッセージは保持ポリシーに従ってストリームに保存されます。
ステップ 3: ストリームからすべてのメッセージを再生
MQTTX CLI を使い、MQTT 5 のサブスクリプションユーザープロパティ stream-offset を設定して、ストリームの先頭からメッセージを再生します。
mqttx sub -t \$stream/my_stream -q 1 -h localhost -up "stream-offset: 0"期待される動作:
これまでにパブリッシュされたすべてのメッセージがパブリッシュ順に受信されます。
topic: demo/stream, qos: 0, size: 10B, userProperties: [
{ key: 'key', value: 'mqttx_28c50267' },
{ key: 'ts', value: '1772161077594532' }
]
{"value": 1}
topic: demo/stream, qos: 0, size: 10B, userProperties: [
{ key: 'key', value: 'mqttx_1989d120' },
{ key: 'ts', value: '1772161084921509' }
]
{"value": 2}
topic: demo/stream, qos: 0, size: 10B, userProperties: [
{ key: 'key', value: 'mqttx_085ea00d' },
{ key: 'ts', value: '1772161094020511' }
]
{"value": 3}これにより以下が確認できます。
- ストリームがレギュラーストリームであること
stream-offsetサブスクリプションプロパティが正しく機能していること- メッセージが圧縮や上書きされていないこと
異なる位置からのメッセージ再生
MQTT Streams では、サブスクライブ時に stream-offset を指定することで、メッセージ再生の開始位置を制御できます。
この例では、特定の時点以降にパブリッシュされたメッセージのみを再生する方法を示します。
ステップ 1: 現在のタイムスタンプを取得
新しいメッセージをパブリッシュする前に、現在の Unix タイムスタンプ(マイクロ秒単位)を記録します。
ミリ秒単位の現在時刻は以下で取得可能です。
Linux / macOS:
bashdate +%s000JavaScript:
javascriptDate.now()
例:
1772162409000この値に 1000 を掛けてマイクロ秒単位に変換し、再生開始位置として保存します。
ステップ 2: 新しいメッセージをパブリッシュ
ストリームにさらにメッセージをパブリッシュします。
mqttx pub -t 'demo/stream' -h 'localhost' -p 1883 -q 1 -m '{"value": 4}'
mqttx pub -t 'demo/stream' -h 'localhost' -p 1883 -q 1 -m '{"value": 5}'ステップ 3: 記録したタイムスタンプから再生
保存したタイムスタンプを stream-offset として指定し、ストリームにサブスクライブします。
mqttx sub -t \$stream/my_stream -q 1 -h localhost -up "stream-offset: 1772162409000000"期待される動作:
この時刻以降にパブリッシュされたメッセージのみがサブスクライバーに配信されます。
topic: demo/stream, qos: 1, size: 12B, userProperties: [
{ key: 'key', value: 'mqttx_a5508c54' },
{ key: 'ts', value: '1772163340159513' }
]
{"value": 4}
topic: demo/stream, qos: 1, size: 12B, userProperties: [
{ key: 'key', value: 'mqttx_e0848366' },
{ key: 'ts', value: '1772163350666523' }
]
{"value": 5}異なるコンシューマーは同じストリームを異なる位置から独立して再生できます。
- あるコンシューマーは先頭(
earliest)から再生 - 別のコンシューマーは特定のタイムスタンプから再生
- さらに別のコンシューマーは新しいメッセージのみ(
latest)を受信
各コンシューマーの再生位置は独立しており、他のサブスクライバーに影響を与えません。
これにより、MQTT Streams におけるコンシューマー制御の再生が示されます。
Last-Value セマンティクスのテスト
このセクションでは、Last-Value MQTT ストリームがキーごとに最新のメッセージのみを保持し、状態を表現するのに適していることを示します。
ステップ 1: 既存ストリームの削除
- ダッシュボードの Streams に移動します。
- ストリーム
my_streamを探します。 - Delete をクリックし、確認します。
ステップ 2: Last-Value メッセージストリームの作成
- Streams ページで Create をクリックします。
- 以下の設定を行います。
- Name:
device_stream - Topic Filter:
device/state - Data Retention Period:
7日 - Last-Value Semantics: 有効
- Stream Key Expression:
message.from
- Name:
- Create をクリックします。
キー式が message.from に設定されているため、同じキーのストリーム内で最新のメッセージのみが保持されます。
ステップ 3: 状態更新のパブリッシュ
同じクライアント ID -i device-1 からメッセージをパブリッシュします。
mqttx pub -t 'device/state' -h 'localhost' -p 1883 -q 1 -i device-1 -m '{"status": "online"}'
mqttx pub -t 'device/state' -h 'localhost' -p 1883 -q 1 -i device-1 -m '{"status": "offline"}'ストリームキー式がメッセージメタデータのクライアント ID をキーとして抽出するため、両メッセージは同じストリームキーを共有し、2つ目のメッセージが1つ目を上書きします。
ステップ 4: ストリームへのサブスクライブ
ストリームにサブスクライブし、先頭から再生します。
mqttx sub -t '$stream/device_stream' -h 'localhost' -p 1883 -q 1 -up "stream-offset: 0"期待される動作:
最新のメッセージのみが配信されます。
topic: device/state, qos: 1, size: 21B, userProperties: [
{ key: 'key', value: 'device-1' },
{ key: 'ts', value: '1772173666097076' }
]
{"status": "offline"}これにより、MQTT Streams が Last-Value セマンティクスを使った状態指向メッセージングをサポートしていることが示されます。
ストリームの自動作成
EMQX の MQTT Streams は、クライアントが $stream/ プレフィックス付きトピックをサブスクライブすると、自動的にストリームを作成できます。これにより、ダッシュボードで手動作成することなくストリームを動的にプロビジョニングできます。
このセクションでは、自動作成を有効にしてテストする方法を示します。
ダッシュボードの Management -> MQTT Settings -> Streams に移動します。
Enable Auto Create Streams がオンになっていることを確認します。
ストリームタイプを選択します。
- Regular Stream
- Last-Value Stream
自動作成で有効にできるタイプは一度に1つだけです。
他のオプションはデフォルトのままにします。
Save Changes をクリックします。
以下のコマンドでサブスクライブし、自動作成をトリガーします。
bashmqttx sub -h localhost -p 1883 -q 1 -t '$stream/auto_stream/demo/auto' -up "stream-offset: earliest"手動作成ストリームとは異なり、自動作成ストリームはサブスクリプションにトピックフィルター(この例では
demo/auto)が必要です。ストリームが存在しない場合、EMQX は以下を実行します。
auto_streamという名前の新しいストリームを作成- トピックフィルターを
demo/autoに設定 - 設定された自動作成タイプ(レギュラーまたはラストバリュー)を適用
ダッシュボードの Streams ページで自動作成が反映されていることを確認します。ストリーム一覧に「auto_stream」が表示されます。
