MQTT Stream Design and Implementation
This page explains how MQTT Stream works internally. It covers the end-to-end data pipeline, Stream plugin architecture, custom plugin development, and the query channel protocol.
It is intended for developers and system integrators who need to understand the internal design or extend MQTT Stream beyond default behavior.
End-to-End Data Pipeline
Once MQTT Stream is enabled, messages published to a configured topic enter an independent data processing pipeline. Normal MQTT forwarding is unaffected at every stage.
Startup: Configuration-Driven Initialization
MQTT Stream data paths are fully initialized at startup based on configuration. The system does not create or modify pipelines at runtime.
During startup:
- Exchange definitions, topic filters, Stream plugin types, and Ringbus parameters are parsed from configuration.
- An Exchange instance and a Ringbus buffer are created for each configured data path.
- Local communication channels between the broker and Exchange are established.
- Each data path is bound to its configured Stream plugin.
This model keeps the pipeline structure stable at runtime, avoiding resource churn and ensuring predictable performance under high throughput.
Message Capture: Broker and Exchange Separation
The broker and Exchange have strictly separate responsibilities.
The broker handles MQTT protocol processing and real-time message forwarding. When a PUBLISH message matches a configured topic, the broker side-copies it into the MQTT Stream pipeline without altering the original message or forwarding path.
Each side-copied message is assigned a monotonically increasing millisecond timestamp at capture time. This timestamp serves as the message's global key throughout the pipeline (for buffering, persistence, and querying), eliminating the need for sorting or deduplication later.
Exchange receives the side-copied messages and is solely responsible for buffering, batching, persistence, and historical queries. The broker remains unaware of these operations.
Ringbus Buffering and Batch Triggering
Writing high-frequency messages to disk one by one would severely limit throughput. MQTT Stream uses Ringbus as an in-memory first-stage buffer to absorb write pressure.
Ringbus accepts messages sequentially by timestamp up to a configured capacity. It is intentionally ignorant of encoding and storage semantics: it exists solely to accept data quickly and hand it off in batches.
When the buffer reaches capacity, the fullOp configuration determines what happens. The recommended mode is FULL_RETURN (2 - Return to AIO):
- The entire batch is handed off to the upper processing layer at once.
- Ringbus clears itself immediately and resumes accepting new messages.
- All encoding and persistence logic stays in the upper layer, keeping Ringbus simple and making it easy to support multiple backends.
Stream Plugin Encoding and Parquet Persistence
Once a batch is returned from Ringbus, the pipeline:
- Normalizes the batch into a
stream_data_instructure (payload, length, and timestamp key per message). - Calls
stream_encode(streamType, ...)to dispatch to the configured Stream plugin. - The plugin encodes the batch into a storage-ready representation (for example, a columnar Parquet structure).
- The encoded result is written asynchronously to local Parquet files.
Using Parquet as the storage format, batch-oriented writes maximize columnar compression efficiency, reduce file fragmentation, and enable fast time-range scans.
Query Path
Historical queries are handled through Exchange's local query interface and never pass through the MQTT forwarding path. A query spans both data still in memory (Ringbus) and data already persisted to disk. Exchange merges these sources and passes them to the Stream plugin for decoding before returning results to the caller.
Stream Plugin Architecture
Stream plugins are the single source of data semantics in MQTT Stream. The core pipeline does not assume any fixed storage format or query protocol: all of that is delegated to plugins.
Each plugin implements three interfaces:
| Interface | Invocation point | Purpose |
|---|---|---|
encode | Ringbus full, before write | Encode a batch of messages into a persistence-ready structure |
cmd_parser | Query start | Parse a query command string into structured parameters |
decode | Query result assembly | Decode persisted records into the final result format |
MQTT Stream includes two built-in plugins:
- RAW Stream (
streamType = 0): General-purpose. Passes MQTT payloads with minimal transformation. Suitable for high-frequency data without complex semantic requirements. - SPI Stream (
streamType = 1): Targets automotive and SDV scenarios. Supports custom key structures and query protocols.
The streamType field in the Exchange configuration determines which plugin is used for the entire data path: encoding, persistence, query parsing, and result decoding.
Encoding Flow
When Ringbus triggers FULL_RETURN (or when hook_last_flush is called explicitly to flush remaining data), the batch is passed to flush_smsg_to_disk(), which:
- Builds a
stream_data_infrom the returned messages, usingnni_msg_get_timestamp()as the key for each entry. - Calls
stream_encode(streamType, sdata), dispatching into the plugin'sencodeimplementation. For RAW,raw_encodeconvertsstream_data_intostream_data_outand callsparquet_data_allocto build the columnar structure. - The encoded result (
parquet_data*) is passed toparquet_object_alloc+parquet_write_batch_asyncfor asynchronous disk write.
Decoding Flow
When a query command arrives at Exchange:
stream_cmd_parser(streamType, keystr)parses the command string into acmd_datastructure containingis_sync,start_key,end_key, andschema. For RAW Stream, the default schema is{"ts", "data"}(seeraw_stream.c::parse_input_cmd).- Exchange retrieves matching records from Ringbus in-memory via
exchange_client_get_msgs_fuzz, and from Parquet files viaparquet_get_data_packets_in_range_by_column. - For each
parquet_data_ret*,stream_decode(streamType, ...)is called to convert the columnar data into astream_decoded_data, a continuous buffer suitable for the caller. - Results are assembled into NNG messages and returned incrementally (async) or in aggregate (sync).
Custom Stream Plugin Development
Custom Stream plugins can be implemented without modifying the core pipeline. A plugin needs to implement three functions and register them at startup.
Interface Structs
/* Input to encode: one batch of messages from Ringbus */
struct stream_data_in {
void **datas; /* payload pointer for each message */
uint64_t *keys; /* timestamp key for each message */
uint32_t *lens; /* payload length for each message */
uint32_t len; /* number of messages in the batch */
};
/* Output of encode: columnar structure for Parquet persistence */
struct stream_data_out {
uint32_t col_len; /* number of columns */
uint32_t row_len; /* number of rows */
uint64_t *ts; /* timestamp column */
char **schema; /* column names */
parquet_data_packet ***payload_arr; /* per-column payload */
};
/* Output of decode: continuous buffer returned to the caller */
struct stream_decoded_data {
void *data; /* decoded buffer */
uint32_t len; /* buffer length */
};
/* Output of cmd_parser: structured query parameters */
struct cmd_data {
bool is_sync; /* true = sync, false = async */
uint64_t start_key; /* query start timestamp (ms) */
uint64_t end_key; /* query end timestamp (ms) */
uint32_t schema_len; /* number of requested columns */
char **schema; /* column names to return */
};Plugin Implementation and Registration
Implement the three interface functions and register them at startup using stream_register:
void *my_encode(void *data); /* data: stream_data_in* → return: parquet_data* */
void *my_decode(void *data); /* data: parquet_data_ret* → return: stream_decoded_data* */
void *my_cmd_parser(void *data); /* data: const char* → return: cmd_data* */
int my_stream_init() {
int ret = 0;
char *name = malloc(strlen("my_stream") + 1);
if (name == NULL) {
return -1;
}
strcpy(name, "my_stream");
/* Use an ID that does not conflict with built-ins: 0 = RAW, 0x1 = SPI */
ret = stream_register(name, 0x2, my_decode, my_encode, my_cmd_parser);
if (ret != 0) {
free(name);
return -1;
}
return 0;
}After registration, set streamType = 2 in the corresponding Exchange configuration to activate the plugin for that data path.
Where Each Interface Is Invoked
encode: Called inwebhook_post.c::send_exchange_cb/hook_last_flushafterRB_FULL_RETURNis triggered, or whenhook_last_flushis called explicitly to flush remaining data. Receives astream_data_inbatch and must return an encoded structure for Parquet persistence.cmd_parser: Called inexchange_server.c::query_cbwhen a query command arrives over the NNG pair0 socket. Receives the raw command string and must return acmd_data.decode: Called inquery_send_sync/query_send_asyncfor eachparquet_data_ret*retrieved from Parquet. Must return astream_decoded_datawith a continuous result buffer.
Query Channel and Protocol
EMQX Edge exposes a local NNG channel for querying Exchange data. The source repository includes a reference client at nng/demo/exchange_consumer/exchange_consumer.c.
This client:
- Connects to the Exchange Server via
nng_pair0at the address specified byexchange_url(default:tcp://127.0.0.1:10000) - Sends a single command string as the request payload
- Reads reply frames until a 2-byte EOF marker (
0x0B 0xAD, seeexchange_server.c::query_send_eof) is received
Each frame before EOF is one batch of decoded query results.
RAW Stream Data Layout
With RAW Stream (streamType = 0), data is stored as key/value pairs:
- key: 64-bit monotonically increasing millisecond timestamp, set at capture time via
nng_msg_set_timestamp - value: the original MQTT payload, stored in the
datacolumn of the Parquet file
Query Command Format
RAW Stream uses the following command format:
sync-<start_key>-<end_key>
async-<start_key>-<end_key>sync: all results for the key range are returned before the connection closesasync: Exchange splits and schedules result delivery incrementally; suited for large time rangesstart_key/end_key: millisecond timestamps;end_keymust be ≥start_key
Example: query records within a 5-second window:
$ ./demo/exchange_consumer/exchange_consumer "sync-1700000000000-1700000005000"
Received 1234 bytes
Received 5678 bytes
...raw_cmd_parser parses this into is_sync = true, start_key = 1700000000000, end_key = 1700000005000. Exchange reads all matching records from Parquet and Ringbus, decodes them, and returns frames until EOF.
To retrieve all persisted data:
$ ./demo/exchange_consumer/exchange_consumer "sync-0-9223372036854775807"Custom plugins can extend this by implementing their own cmd_parser to support compound conditions, custom schemas, and pagination.