Skip to content

MQTT Stream User Guide

This page describes how to configure and operate MQTT Stream, including flow setup, data persistence, and historical queries.

For internal architecture and Stream plugin development, refer to MQTT Stream Design and Implementation.

How MQTT Stream Works

Once MQTT Stream is configured, eligible MQTT messages are collected via a side channel without affecting normal message forwarding and are routed into a local data pipeline.

From a user’s perspective, the workflow is as follows:

  1. MQTT messages are ingested and forwarded normally by EMQX Edge.
  2. Messages matching configured topics are copied to the internal Exchange.
  3. Messages are first buffered in memory using a Ring Queue.
  4. When the buffer reaches a configured threshold, batch processing is triggered. The batch is encoded by a Stream plugin and written to local storage.
  5. Historical data can later be queried and replayed by time range.

The design goals of this workflow are:

  • High performance: avoid per-message disk writes and reduce I/O overhead
  • High reliability: preserve data even during network failures
  • Queryability: enable time-range-based retrieval of historical data

Configure MQTT Stream

Each configured stream is called a Flow and defines a complete data pipeline: which MQTT topic to capture, how to buffer the data, and where to persist it. You can configure a Flow through the Dashboard or by editing the configuration file directly.

Configure via Dashboard

In the Dashboard, navigate to MQTT Stream and click the Flow tab. Click Add to open the Add Flow form.

Note

Configuration changes take effect only after restarting EMQX Edge.

Fill in the form fields as described below.

Basic

FieldDescription
Instance NameOptional display name for this flow instance.
Flow URLInternal communication address for the flow. This is not the broker address used by MQTT clients. Keep the default (tcp://127.0.0.1:10000) unless instructed otherwise.
Query FrequencyLimits the query frequency for this flow. The value N means up to N queries every 5 seconds. For example, 5 allows up to 5 queries every 5 seconds. Leave empty to use the system default.

Flow

FieldDescription
TopicThe MQTT topic to capture. Only messages published to this topic enter the MQTT Stream pipeline. The / character is not supported.
Stream TypeThe Stream plugin used for data encoding and decoding. 0 = RAW Stream; 1 = SPI Stream; 2 = CANP Stream. If a custom plugin is registered on the backend, enter its plugin ID.

Ring Queue

FieldDescription
CapacityMaximum number of messages the ring queue can hold. When this threshold is reached, the Full Operation behavior is triggered.
Full OperationDefines what happens when the ring queue reaches capacity. Currently only 2 - Return to AIO and Write to File is supported and cannot be changed.

Parquet

FieldDescription
CompressionCompression algorithm for Parquet files. zstd is the default and recommended for a good balance of speed and compression ratio. Other options: uncompressed, snappy, gzip, brotli, lz4.
DirectoryDirectory where Parquet files are stored. Required. Defaults to /tmp/edge-parquet. The directory is created automatically if it does not exist.
File Name PrefixOptional prefix for generated Parquet file names. Use normal filename characters and avoid path separators.
File CountMaximum number of Parquet files to retain. Once the limit is reached, file rotation applies.
Total Written File SizeTotal size limit for all written Parquet files. Select the unit (MB, GB, etc.) from the dropdown.

Parquet Encryption

When Enable Encryption is toggled on, an encryption block is generated for the Parquet configuration to control file encryption. The following fields become available:

FieldDescription
Key IDKey retrieval metadata used to identify the encryption key in use.
Encryption KeyEncryption key for Parquet files. Raw AES keys and backend-supported Base64/wrapped keys are accepted.
Encryption TypeThe encryption algorithm. Supported values: AES_GCM_CTR_V1 and AES_GCM_V1. Defaults to AES_GCM_V1 semantics.

Add Flow form

Save and Restart

Click Save to save the Flow configuration, then restart EMQX Edge for the changes to take effect. Once restarted:

  • The MQTT broker begins listening for client connections.
  • The Exchange, Ring Queue, and persistence components are initialized automatically.
  • The MQTT Stream data pipeline becomes active.

Configure via Config File

As an alternative to the Dashboard, you can configure MQTT Stream directly in the NanoMQ configuration file. This is the typical approach for production deployments or containerized environments where configuration is managed as code.

The following example configures an Exchange with a Ring Queue and Parquet persistence for the canudp topic:

hocon
exchange_client.mq1 {
    exchange_url = "tcp://127.0.0.1:10000"
    limit_frequency = 5

    exchange {
        topic = "canudp"
        name = "canudp"
        streamType = 0

        ringbus {
            name = "ringbus"
            cap = 1000
            fullOp = 2  # Only supported value: 2 = RB_FULL_RETURN (return all messages to upper layer)
        }

        parquet {
            compress = zstd
            dir = "/tmp/edge-parquet"
            file_name_prefix = "edge"
            file_count = 1500
            file_size = 100MB
        }
    }
}

Key configuration parameters are summarized below.

ParameterDescription
exchange_urlInternal address for the Exchange server. Must match the Flow URL configured in the Dashboard.
exchange.topicThe MQTT topic to capture.
exchange.streamTypeStream encoding type. 0 = RAW, 1 = SPI, 2 = CANP.
ringbus.capRing Queue capacity in number of messages.
ringbus.fullOpBehavior when the Ring Queue is full. Must be set to 2 (Return to AIO and Write to File).
parquet.dirDirectory where Parquet files are written. EMQX Edge creates it automatically if it does not exist.
parquet.file_name_prefixOptional prefix for generated Parquet file names. Use normal filename characters and avoid path separators.

After editing the configuration file, restart EMQX Edge for the changes to take effect.

Stream Types

The streamType parameter determines how MQTT message payloads are encoded when written to Parquet, and defines the resulting Parquet schema. Three built-in stream types are available:

streamTypeNameParquet SchemaUse Case
0RAWts, dataGeneral MQTT message capture; payload stored as-is
1SPIts + dynamic columns per packet type ID (4-digit hex)Vehicle SPI bus data with multiple ECU packet types
2CANPts + dynamic columns per CAN ID (4-digit hex)CAN/CAN-FD bus data with multiple CAN IDs

RAW Stream (streamType = 0)

RAW is the default stream type. Each MQTT message is written as a single row with two fixed columns:

  • ts: message timestamp (uint64, milliseconds)
  • data: raw payload bytes (binary)

Use RAW when you do not need to parse message structure and want to store payloads verbatim.

SPI Stream (streamType = 1)

SPI Stream parses vehicle SPI bus frames and stores each packet type as a separate Parquet column. The expected frame format is:

text
| 0x55 (1B) | type+id (2B) | update+len (1B) | payload (len B) |
              type (4 bit) | id (12 bit) | update (1 bit) | len (7 bit)

The Parquet schema consists of ts plus one column per packet type ID observed in the data. Column names are 4-digit hex strings, for example 0055, 0057. Use SPI Stream for in-vehicle gateways collecting data from multiple ECUs over a shared SPI bus.

CANP Stream (streamType = 2)

CANP Stream parses CAN protocol frames and stores each CAN ID as a separate Parquet column. The expected frame format is:

text
| ts (8B) | len (4B) | [ tsdiff (1B) | busid (1B) | canid (2B) | len (2B) | payload (len B) ] ... |

The Parquet schema consists of ts plus one column per CAN ID observed in the data. Column names are 4-digit hex strings, for example 0123, 0456. CANP Stream also supports a v2.1 compact format that packs multiple frames sharing the same CAN ID into a single cell. Use CANP Stream for CAN/CAN-FD bus data acquisition on vehicle edge nodes.

Verify MQTT Stream

This section validates MQTT Stream by sending test data and observing persisted output.

Send Test Data

Publish messages to the configured topic using MQTTX CLI. The example below publishes messages to the canudp topic using a single persistent connection:

bash
mqttx bench pub -t "canudp" -h 127.0.0.1 -p 1883 -m "message" -L 1500 -c 1

The message count must exceed the Ring Queue Capacity value to trigger a batch flush to disk. Using a single persistent connection (-c 1) avoids connection churn that can interfere with the Ring Queue pipeline. Replace canudp with the topic configured in your Flow.

Parquet File Naming

Generated Parquet files follow this naming convention:

text
{prefix}_{topic}-{start_ts}~{end_ts}_{seq}_{hash}.parquet
FieldDescription
prefixValue of file_name_prefix in the configuration.
topicThe Exchange topic name.
start_tsTimestamp of the earliest message in the file (milliseconds).
end_tsTimestamp of the latest message in the file (milliseconds).
seqMonotonically increasing file sequence number.
hashContent hash for deduplication and integrity verification.

Example:

text
edge_canudp-1781145590395~1781145948876_0_3ff7819a161625c40d495038fd85c248.parquet

Observe Logs and Persisted Data

When the Ring Queue reaches capacity:

  • Batch processing is triggered.
  • The buffered batch is encoded and written to local Parquet files.

Relevant log entries confirm that the MQTT Stream pipeline has been activated.

In the configured directory, Parquet files are generated. Each file typically corresponds to one batch returned from the Ring Queue.

Verification Checklist

MQTT Stream is functioning correctly when:

  • MQTT messages are published successfully.
  • The Ring Queue reaches its threshold and triggers batch processing.
  • Parquet files appear in the configured directory.

Parquet files can be inspected using tools such as:

  • parquet-tools
  • Pandas / PyArrow (Python)
  • Apache Spark or Flink

Query Historical Data

After data has been persisted, MQTT Stream supports time-range-based historical queries for troubleshooting, replay, and analysis. Queries are performed through the Dashboard and do not affect live MQTT message forwarding.

Query Data in the Dashboard

In the Dashboard, navigate to MQTT Stream and click the Query tab.

Fill in the query fields:

FieldDescription
TopicThe topic to query. Required. The / character is not supported.
Time RangeRequired. Select a start and end time using the date-time picker, or choose a preset: Last 5 minutes, Last 15 minutes, Last 1 hour, Last 24 hours, or Last 7 days.
SchemaOptional. Specifies which columns to return as a comma-separated list. Leave empty to return all available columns. Available columns depend on the stream type configured for the topic.

Click Exact Search to run the query. Results matching the specified topic and time range are displayed in the results area.

Query results

Payloads are shown as Base64-encoded strings. Click View on any row and switch to Decoded Text to see the original message content.

Payload Viewer

Schema Values by Stream Type

Schema is a column filter: when set, only the specified columns are returned. When left empty, all available columns are returned.

The ts column (millisecond timestamp) is always available regardless of stream type. Additional columns depend on the streamType of the Flow:

Stream TypeAvailable ColumnsExample Schema Value
RAW (0)ts, datadata or ts,data
SPI (1)ts + packet type IDs present in the data (4-digit hex)ts,0055,0057
CANP (2)ts + CAN IDs present in the data (4-digit hex)ts,0123,0456

For SPI and CANP stream types, the available column names depend on what packet type IDs or CAN IDs were observed when the data was written. To discover available columns, run a query without specifying a schema first and inspect the returned field names.

Query via REST API

Historical data can also be queried programmatically using the REST API.

Endpoint

bash
GET /api/v4/mqtt_stream

Required Parameters

ParameterTypeDescription
topicStringThe MQTT topic to query. Must match the Exchange topic name.
start_tsuint64Start of the time range, inclusive (milliseconds).
end_tsuint64End of the time range, inclusive (milliseconds).

Optional Parameters

ParameterTypeDefaultDescription
schemaStringAll columnsComma-separated list of columns to return, for example data or ts,0055.
limituint6410000Maximum number of records to return.
offsetuint640Pagination offset.

Response

json
{
    "count": 20,
    "dataArray": [
        {"ts": 1781145590395, "data": "bWVzc2FnZSAx"},
        {"ts": 1781145596370, "data": "bWVzc2FnZSAy"}
    ]
}
  • count: total number of records returned after applying offset and limit.
  • dataArray: array of result rows. Column values are Base64-encoded. If schema is specified, only the requested columns are returned.

Example Requests

Query all columns for a RAW stream:

bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=1781145590395&end_ts=1781145948876"

Query specific SPI packet type columns:

bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=1781145590395&end_ts=1781145948876&schema=ts,0055,0057"

Paginate results (second page of 100 records):

bash
curl -i --basic -u admin:public -X GET \
  "http://localhost:8081/api/v4/mqtt_stream?topic=canudp&start_ts=0&end_ts=9999999999999&limit=100&offset=100"

Error Codes

HTTP StatusMeaning
200 OKQuery succeeded.
204 No ContentNo data matched the specified time range.
400 Bad RequestMissing required parameters, invalid timestamp format, or invalid schema value.

Typical Use Cases

  • Replaying historical data after device or system failures.
  • Compensating for data gaps caused by network outages.
  • Exporting locally persisted data for offline analysis.

Troubleshooting

EMQX Edge fails to start with NULL s->ex_node!

The ringbus.name or ringbus.cap field is missing or empty in the configuration. Both fields are required. A missing name causes Ring Queue initialization to fail, which in turn prevents the Exchange from binding correctly. Set both fields and restart.

Parquet files are not generated

Check the following:

  1. EMQX Edge was compiled with -DENABLE_PARQUET=ON.
  2. The parquet.dir path is writable.
  3. If parquet.file_name_prefix is set, it contains only normal filename characters and no path separators.
  4. The number of messages published exceeds ringbus.cap. The Ring Queue does not flush until it is full.

REST API returns 400 Bad Request

Confirm that topic, start_ts, and end_ts are all provided and that start_ts is less than or equal to end_ts. The topic value must exactly match the Exchange topic name configured in the Flow.