# MQTT Stream User Guide

This page describes how to configure and operate MQTT Stream, including flow setup, data persistence, and historical queries.

For internal architecture and Stream plugin development, refer to [MQTT Stream Design and Implementation](mqtt-stream-design.md).

## How MQTT Stream Works

Once MQTT Stream is configured, eligible MQTT messages are collected via a side channel without affecting normal message forwarding and are routed into a local data pipeline.

From a user’s perspective, the workflow is as follows:

1. MQTT messages are ingested and forwarded normally by EMQX Edge.
2. Messages matching configured topics are copied to the internal Exchange.
3. Messages are first buffered in memory using a Ring Queue.
4. When the buffer reaches a configured threshold, batch processing is triggered. The batch is encoded by a Stream plugin and written to local storage.
5. Historical data can later be queried and replayed by time range.

The design goals of this workflow are:

- **High performance**: avoid per-message disk writes and reduce I/O overhead
- **High reliability**: preserve data even during network failures
- **Queryability**: enable time-range-based retrieval of historical data

## Configure MQTT Stream

Each configured stream is called a Flow and defines a complete data pipeline: which MQTT topic to capture, how to buffer the data, and where to persist it. You can configure a Flow through the Dashboard or by editing the configuration file directly.

### Configure via Dashboard

In the Dashboard, navigate to **MQTT Stream** and click the **Flow** tab. Click **Add** to open the Add Flow form.

::: tip Note
Configuration changes take effect only after restarting EMQX Edge.
:::

Fill in the form fields as described below.

#### Basic

| Field | Description |
|---|---|
| **Instance Name** | Optional display name for this flow instance. |
| **Flow URL** | Internal communication address for the flow. This is not the broker address used by MQTT clients. Keep the default (`tcp://127.0.0.1:10000`) unless instructed otherwise. |
| **Query Frequency** | Limits the query frequency for this flow. The value N means up to N queries every 5 seconds. For example, `5` allows up to 5 queries every 5 seconds. Leave empty to use the system default. |

#### Flow

| Field | Description |
|---|---|
| **Topic** | The MQTT topic to capture. Only messages published to this topic enter the MQTT Stream pipeline. The `/` character is not supported. |
| **Stream Type** | The Stream plugin used for data encoding and decoding. `0` = RAW Stream; `1` = SPI Stream; `2` = CANP Stream. If a custom plugin is registered on the backend, enter its plugin ID. |

#### Ring Queue

| Field | Description |
|---|---|
| **Capacity** | Maximum number of messages the ring queue can hold. When this threshold is reached, the Full Operation behavior is triggered. |
| **Full Operation** | Defines what happens when the ring queue reaches capacity. Currently only `2 - Return to AIO and Write to File` is supported and cannot be changed. |

#### Parquet

| Field | Description |
|---|---|
| **Compression** | Compression algorithm for Parquet files. `zstd` is the default and recommended for a good balance of speed and compression ratio. Other options: `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`. |
| **Directory** | Directory where Parquet files are stored. Required. Defaults to `/tmp/edge-parquet`. The directory is created automatically if it does not exist. |
| **File Name Prefix** | Optional prefix for generated Parquet file names. Use normal filename characters and avoid path separators. |
| **File Count** | Maximum number of Parquet files to retain. Once the limit is reached, file rotation applies. |
| **Total Written File Size** | Total size limit for all written Parquet files. Select the unit (MB, GB, etc.) from the dropdown. |

#### Parquet Encryption

When **Enable Encryption** is toggled on, an encryption block is generated for the Parquet configuration to control file encryption. The following fields become available:

| Field | Description |
|---|---|
| **Key ID** | Key retrieval metadata used to identify the encryption key in use. |
| **Encryption Key** | Encryption key for Parquet files. Raw AES keys and backend-supported Base64/wrapped keys are accepted. |
| **Encryption Type** | The encryption algorithm. Supported values: `AES_GCM_CTR_V1` and `AES_GCM_V1`. Defaults to `AES_GCM_V1` semantics. |

![Add Flow form](./assets/mqtt-stream-add-flow.png)

### Save and Restart

Click **Save** to save the Flow configuration, then restart EMQX Edge for the changes to take effect. Once restarted:

- The MQTT broker begins listening for client connections.
- The Exchange, Ring Queue, and persistence components are initialized automatically.
- The MQTT Stream data pipeline becomes active.

### Configure via Config File

As an alternative to the Dashboard, you can configure MQTT Stream directly in the NanoMQ configuration file. This is the typical approach for production deployments or containerized environments where configuration is managed as code.

The following example configures an Exchange with a Ring Queue and Parquet persistence for the `canudp` topic:

```hocon
exchange_client.mq1 {
    exchange_url = "tcp://127.0.0.1:10000"
    limit_frequency = 5

    exchange {
        topic = "canudp"
        name = "canudp"
        streamType = 0

        ringbus {
            name = "ringbus"
            cap = 1000
            fullOp = 2  # Only supported value: 2 = RB_FULL_RETURN (return all messages to upper layer)
        }

        parquet {
            compress = zstd
            dir = "/tmp/edge-parquet"
            file_name_prefix = "edge"
            file_count = 1500
            file_size = 100MB
        }
    }
}
```

Key configuration parameters are summarized below.

| Parameter | Description |
|---|---|
| `exchange_url` | Internal address for the Exchange server. Must match the Flow URL configured in the Dashboard. |
| `exchange.topic` | The MQTT topic to capture. |
| `exchange.streamType` | Stream encoding type. `0` = RAW, `1` = SPI, `2` = CANP. |
| `ringbus.cap` | Ring Queue capacity in number of messages. |
| `ringbus.fullOp` | Behavior when the Ring Queue is full. Must be set to `2` (Return to AIO and Write to File). |
| `parquet.dir` | Directory where Parquet files are written. EMQX Edge creates it automatically if it does not exist. |
| `parquet.file_name_prefix` | Optional prefix for generated Parquet file names. Use normal filename characters and avoid path separators. |

After editing the configuration file, restart EMQX Edge for the changes to take effect.

## Stream Types

The `streamType` parameter determines how MQTT message payloads are encoded when written to Parquet, and defines the resulting Parquet schema. Three built-in stream types are available:

| streamType | Name | Parquet Schema | Use Case |
|:---:|---|---|---|
| `0` | RAW | `ts`, `data` | General MQTT message capture; payload stored as-is |
| `1` | SPI | `ts` + dynamic columns per packet type ID (4-digit hex) | Vehicle SPI bus data with multiple ECU packet types |
| `2` | CANP | `ts` + dynamic columns per CAN ID (4-digit hex) | CAN/CAN-FD bus data with multiple CAN IDs |

### RAW Stream (streamType = 0)

RAW is the default stream type. Each MQTT message is written as a single row with two fixed columns:

- `ts`: message timestamp (uint64, milliseconds)
- `data`: raw payload bytes (binary)

Use RAW when you do not need to parse message structure and want to store payloads verbatim.

### SPI Stream (streamType = 1)

SPI Stream parses vehicle SPI bus frames and stores each packet type as a separate Parquet column. The expected frame format is:

```text
| 0x55 (1B) | type+id (2B) | update+len (1B) | payload (len B) |
              type (4 bit) | id (12 bit) | update (1 bit) | len (7 bit)
```

The Parquet schema consists of `ts` plus one column per packet type ID observed in the data. Column names are 4-digit hex strings, for example `0055`, `0057`. Use SPI Stream for in-vehicle gateways collecting data from multiple ECUs over a shared SPI bus.

### CANP Stream (streamType = 2)

CANP Stream parses CAN protocol frames and stores each CAN ID as a separate Parquet column. The expected frame format is:

```text
| ts (8B) | len (4B) | [ tsdiff (1B) | busid (1B) | canid (2B) | len (2B) | payload (len B) ] ... |
```

The Parquet schema consists of `ts` plus one column per CAN ID observed in the data. Column names are 4-digit hex strings, for example `0123`, `0456`. CANP Stream also supports a v2.1 compact format that packs multiple frames sharing the same CAN ID into a single cell. Use CANP Stream for CAN/CAN-FD bus data acquisition on vehicle edge nodes.

## Verify MQTT Stream

This section validates MQTT Stream by sending test data and observing persisted output.

### Send Test Data

Publish messages to the configured topic using MQTTX CLI. The example below publishes messages to the `canudp` topic using a single persistent connection:

```bash
mqttx bench pub -t "canudp" -h 127.0.0.1 -p 1883 -m "message" -L 1500 -c 1
```

The message count must exceed the Ring Queue **Capacity** value to trigger a batch flush to disk. Using a single persistent connection (`-c 1`) avoids connection churn that can interfere with the Ring Queue pipeline. Replace `canudp` with the topic configured in your Flow.

### Parquet File Naming

Generated Parquet files follow this naming convention:

```text
{prefix}_{topic}-{start_ts}~{end_ts}_{seq}_{hash}.parquet
```

| Field | Description |
|---|---|
| `prefix` | Value of `file_name_prefix` in the configuration. |
| `topic` | The Exchange topic name. |
| `start_ts` | Timestamp of the earliest message in the file (milliseconds). |
| `end_ts` | Timestamp of the latest message in the file (milliseconds). |
| `seq` | Monotonically increasing file sequence number. |
| `hash` | Content hash for deduplication and integrity verification. |

Example:

```text
edge_canudp-1781145590395~1781145948876_0_3ff7819a161625c40d495038fd85c248.parquet
```

### Observe Logs and Persisted Data

When the Ring Queue reaches capacity:

- Batch processing is triggered.
- The buffered batch is encoded and written to local Parquet files.

Relevant log entries confirm that the MQTT Stream pipeline has been activated.

In the configured directory, Parquet files are generated. Each file typically corresponds to one batch returned from the Ring Queue.

### Verification Checklist

MQTT Stream is functioning correctly when:

- MQTT messages are published successfully.
- The Ring Queue reaches its threshold and triggers batch processing.
- Parquet files appear in the configured directory.

Parquet files can be inspected using tools such as:

- `parquet-tools`
- Pandas / PyArrow (Python)
- Apache Spark or Flink

## Query Historical Data

After data has been persisted, MQTT Stream supports time-range-based historical queries for troubleshooting, replay, and analysis. Queries are performed through the Dashboard and do not affect live MQTT message forwarding.

### Query Data in the Dashboard

In the Dashboard, navigate to **MQTT Stream** and click the **Query** tab.

Fill in the query fields:

| Field | Description |
|---|---|
| **Topic** | The topic to query. Required. The `/` character is not supported. |
| **Time Range** | Required. Select a start and end time using the date-time picker, or choose a preset: Last 5 minutes, Last 15 minutes, Last 1 hour, Last 24 hours, or Last 7 days. |
| **Schema** | Optional. Specifies which columns to return as a comma-separated list. Leave empty to return all available columns. Available columns depend on the stream type configured for the topic. |

Click **Exact Search** to run the query. Results matching the specified topic and time range are displayed in the results area.

![Query results](./assets/mqtt-stream-query-results.png)

Payloads are shown as Base64-encoded strings. Click **View** on any row and switch to **Decoded Text** to see the original message content.

![Payload Viewer](./assets/mqtt-stream-payload-viewer.png)

#### Schema Values by Stream Type

Schema is a column filter: when set, only the specified columns are returned. When left empty, all available columns are returned.

The `ts` column (millisecond timestamp) is always available regardless of stream type. Additional columns depend on the `streamType` of the Flow:

| Stream Type | Available Columns | Example Schema Value |
|---|---|---|
| RAW (`0`) | `ts`, `data` | `data` or `ts,data` |
| SPI (`1`) | `ts` + packet type IDs present in the data (4-digit hex) | `ts,0055,0057` |
| CANP (`2`) | `ts` + CAN IDs present in the data (4-digit hex) | `ts,0123,0456` |

For SPI and CANP stream types, the available column names depend on what packet type IDs or CAN IDs were observed when the data was written. To discover available columns, run a query without specifying a schema first and inspect the returned field names.

### Query via REST API

Historical data can also be queried programmatically using the REST API.

**Endpoint**

```bash
GET /api/v4/mqtt_stream
```

**Required Parameters**

| Parameter | Type | Description |
|---|---|---|
| `topic` | String | The MQTT topic to query. Must match the Exchange topic name. |
| `start_ts` | uint64 | Start of the time range, inclusive (milliseconds). |
| `end_ts` | uint64 | End of the time range, inclusive (milliseconds). |

**Optional Parameters**

| Parameter | Type | Default | Description |
|---|---|---|---|
| `schema` | String | All columns | Comma-separated list of columns to return, for example `data` or `ts,0055`. |
| `limit` | uint64 | `10000` | Maximum number of records to return. |
| `offset` | uint64 | `0` | Pagination offset. |

**Response**

```json
{
    "count": 20,
    "dataArray": [
        {"ts": 1781145590395, "data": "bWVzc2FnZSAx"},
        {"ts": 1781145596370, "data": "bWVzc2FnZSAy"}
    ]
}
```

- `count`: total number of records returned after applying `offset` and `limit`.
- `dataArray`: array of result rows. Column values are Base64-encoded. If `schema` is specified, only the requested columns are returned.

**Example Requests**

Query all columns for a RAW stream:

```bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=1781145590395&end_ts=1781145948876"
```

Query specific SPI packet type columns:

```bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=1781145590395&end_ts=1781145948876&schema=ts,0055,0057"
```

Paginate results (second page of 100 records):

```bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=0&end_ts=9999999999999&limit=100&offset=100"
```

**Error Codes**

| HTTP Status | Meaning |
|---|---|
| `200 OK` | Query succeeded. |
| `204 No Content` | No data matched the specified time range. |
| `400 Bad Request` | Missing required parameters, invalid timestamp format, or invalid schema value. |

### Typical Use Cases

- Replaying historical data after device or system failures.
- Compensating for data gaps caused by network outages.
- Exporting locally persisted data for offline analysis.

## Troubleshooting

**EMQX Edge fails to start with `NULL s->ex_node!`**

The `ringbus.name` or `ringbus.cap` field is missing or empty in the configuration. Both fields are required. A missing `name` causes Ring Queue initialization to fail, which in turn prevents the Exchange from binding correctly. Set both fields and restart.

**Parquet files are not generated**

Check the following:

1. EMQX Edge was compiled with `-DENABLE_PARQUET=ON`.
2. The `parquet.dir` path is writable.
3. If `parquet.file_name_prefix` is set, it contains only normal filename characters and no path separators.
4. The number of messages published exceeds `ringbus.cap`. The Ring Queue does not flush until it is full.

**REST API returns `400 Bad Request`**

Confirm that `topic`, `start_ts`, and `end_ts` are all provided and that `start_ts` is less than or equal to `end_ts`. The `topic` value must exactly match the Exchange topic name configured in the Flow.