# MQTT Stream Quick Start

This quick start walks through a typical vehicle-edge deployment to demonstrate how to enable MQTT Stream in EMQX Edge, persist MQTT data locally, and verify historical queries.

After completing this guide, you will be able to confirm that:

- MQTT Stream is correctly enabled.
- MQTT messages for a specific topic are successfully persisted to disk.
- Historical data can be queried by time range.

## Scenario: Vehicle Edge Data Persistence and Replay

A vehicle-side edge deployment is a representative use case for MQTT Stream.

In a typical in-vehicle setup, an onboard gateway connects to multiple CAN / CAN-FD buses and continuously collects high-frequency sensor and diagnostic data from subsystems such as the battery, powertrain, and vehicle control units. This data is published as MQTT messages to a local EMQX Edge instance.

In this scenario, MQTT Stream is used to build a reliable local data pipeline on the vehicle:

- Side-capture MQTT messages for specific topics (for example, `canudp`)
- Buffer high-frequency data locally in batches
- Persist data to local disk in batch form
- Replay historical data by time window for troubleshooting and analysis

This quick start uses a minimal configuration to validate the complete workflow end to end.

## Prerequisites

Before you begin, make sure that:

- EMQX Edge 1.4 or later is installed and running. This guide uses Docker for the examples, but any supported installation method works.
- MQTTX CLI is installed. See [MQTTX CLI](https://mqttx.app/cli) for installation instructions.

## Add a Flow in the Dashboard

In the Dashboard, navigate to **MQTT Stream** and click the **Flow** tab. Click **Add** to open the Add Flow form.

Fill in the following fields:

**Basic**

- **Instance Name**: enter a name for this flow instance, such as `canudp-flow`.
- **Flow URL**: `tcp://127.0.0.1:10000` (keep the default)

**Flow**

- **Topic**: `canudp`
- **Stream Type**: `0`

**Ring Queue**

- **Capacity**: `10`
- **Full Operation**: `2 - Return to AIO and Write to File`

**Parquet**

- **Directory**: `/tmp/edge-parquet` (or any absolute path on the host)

Leave all other fields at their defaults. Click **Save**.

![quick_start_add_flow](./assets/quick-start-add-flow.png)

::: tip Note
Restart EMQX Edge after saving for the configuration to take effect. For example, if you are running EMQX Edge in Docker:

```bash
docker restart emqx-edge
```
:::

After restarting, Exchange, Ring Queue, and the persistence components are initialized automatically.

## Publish Test Data

Use MQTTX CLI to publish 20 test messages to the `canudp` topic:

```bash
mqttx bench pub -t "canudp" -h 127.0.0.1 -p 1883 -m "message" -L 20 -c 1
```

The message count must exceed the Ring Queue **Capacity** value to trigger a batch flush to disk. Using a single persistent connection (`-c 1`) avoids connection churn that can interfere with the Ring Queue pipeline.

## Verify Persisted Data

Once the Ring Queue threshold is reached, buffered messages are written to local Parquet files in batch. Check the configured directory for generated files:

```bash
docker exec emqx-edge ls /tmp/edge-parquet
```

The presence of Parquet files confirms that MQTT Stream is working correctly.

## Query Historical Data

Once data has been persisted, you can query it from the Dashboard.

Navigate to **MQTT Stream** and click the **Query** tab. Enter `canudp` in the **Topic** field, set a **Time Range** covering the period when you published test data, and leave **Schema** empty. Click **Exact Search**.

::: tip
If no results appear, widen the time range. The system clock on your EMQX Edge host may differ from your local time.
:::

![quick_start_query](./assets/mqtt-stream-query-results.png)

If results are returned, the full MQTT Stream pipeline is functioning correctly: from message capture and Ring Queue batching through Parquet persistence and Dashboard query. Payloads are displayed as Base64-encoded strings. Click **View** on any row and switch to **Decoded Text** to see the original message content.
