MQTT Streams Quick Start
This page walks you through how to use the MQTT Streams feature in EMQX 6.1. You’ll use MQTTX to simulate clients, create and manage streams from the EMQX Dashboard, and experience how messages can be stored, replayed, and compacted.
Objectives
This quick start demonstrates how EMQX MQTT Streams can:
- Persist messages independently of subscriber availability
- Support timestamp-based replay
- Enable Last-Value semantics for state-oriented messaging
Prerequisites
Before starting, ensure you have:
- EMQX 6.1+ running
- MQTTX (or any MQTT 5.0-capable client)
- Access to the EMQX Dashboard (default:
http://localhost:18083)
Test MQTT Streams Basic Features (Regular Stream)
This section demonstrates how MQTT Streams stores messages and allows consumers to replay historical data.
Prerequisite
Before starting, ensure that the MQTT Streams feature is enabled and that the auto-creation behavior will not interfere with this example.
Go to Message Stream in the left menu.
If Message Stream is disabled, click Settings. You will be redirected to the Management -> MQTT Settings -> Message Stream page.
Toggle the Enable Message Stream switch on.
Verify the auto-create settings to ensure a regular Message Stream is used:
Enable Auto Create Message Stream is disabled, or
Auto Create Message Stream Type is set to Regular Message Stream
This prevents the stream from being auto-created as a Last-Value Message Stream, which would retain only the most recent message per key.
If you make any changes, click Save Changes to apply them.

Step 1: Create an MQTT Stream
Navigate to Message Stream in the left menu.
Click Create Stream on the page, or click Create in the upper-right corner.
In the Create Message Stream dialog, configure the following settings:
- Topic Filter:
demo/stream - Data Retention Period:
1day - Last-Value Semantics: Disabled
- Stream Key Expression:
message.from
- Topic Filter:
Click Create.

Step 2: Publish Messages
Use MQTTX to simulate a client acting as a publisher:
Open MQTTX and create a client (for example,
publisher).Connect to EMQX (
mqtt://localhost:1883).Publish several messages to the topic
demo/streamwith QoS 1.Examples:
Topic: demo/stream QoS: 1 Payload: {"value": 1} Payload: {"value": 2} Payload: {"value": 3}
Since this is a regular stream, all messages are stored in the stream.
Step 3: Replay All Messages from the Stream
Now simulate a consumer that replays stored messages.
Open a second MQTTX client (for example,
consumer).Connect to EMQX.
Subscribe to the stream topic using the earliest timestamp:
Topic: $s/0/demo/stream QoS: 1
Expected Behavior: You should receive all previously published messages, in publish order:
{"value": 1}
{"value": 2}
{"value": 3}This confirms that:
- The stream is a regular stream.
- Timestamp-based replay is working correctly.
- No messages were compacted or overwritten.

Replay Messages from Different Positions
MQTT Streams allow consumers to control where message replay starts by specifying a timestamp when subscribing.
Publish additional messages from the
publisherclient:{"value": 4} {"value": 5}In a new MQTTX client, subscribe to the stream using a later timestamp:
Topic: $s/1766477011000/demo/stream QoS: 1In this example,
1766477011000is a Unix timestamp in milliseconds. Only messages published at or after this time are delivered to the subscriber.TIP
- The timestamp must be a Unix timestamp in milliseconds.
- Use
0to replay from the earliest available message. - Use a later timestamp to replay only newer messages.
You can obtain the current timestamp in milliseconds using:
Linux / macOS:
date +%s000JavaScript:
Date.now()
Click Confirm. Only messages published at or after the specified timestamp are delivered.

This demonstrates consumer-controlled replay, where different consumers can independently read the same stream from different points in time without affecting each other.
Test Last-Value Semantics
This section demonstrates how Last-Value MQTT streams keep only the latest message per key, which is useful for representing state.
Step 1: Delete the Existing Stream
- Navigate to Message Stream in the Dashboard.
- Locate the stream with the topic filter
demo/stream. - Click Delete and confirm.
Step 2: Create a Last-Value Message Stream
- Click Create on the Message Stream page.
- Configure the following settings:
- Topic Filter:
device/state - Data Retention Period:
1day - Last-Value Semantics: Enabled
- Stream Key Expression:
message.from
- Topic Filter:
- Click Create.
The stream is now configured to retain only the latest message in streams with the same key.
Step 3: Publish State Updates
In MQTTX, use a client with ID
device-1.Publish messages to
device/state:Topic: device/state QoS: 1 Payload: {"status": online}Publish another message from the same client:
json{"status": offline}
Because the Stream Key Expression is message.from, both messages share the same key. The second message overwrites the first.
Step 4: Subscribe to the Stream
Open a new MQTTX client (for example,
monitor).Subscribe to the stream topic:
Topic: $s/0/device/state QoS: 1
Expected Behavior:
Only the most recent message is delivered:
{"status": offline}This demonstrates how MQTT Streams support state-oriented messaging patterns using Last-Value semantics.
