MQTT Integration
This page describes how to integrate a device with EMQX Fleets using MQTT. It covers connection setup, required subscriptions, publish formats, and the flows for shadow sync, commands, and jobs.
Prerequisites
- The device is registered as a Thing in your Fleets deployment.
- You have the EMQX Broker connection details (host, port, credentials) from your EMQX Broker deployment.
- An MQTT client library is available for your device platform.
Connection Setup
Connect to the EMQX Broker using standard MQTT:
| Parameter | Value |
|---|---|
| Host | From your EMQX Broker deployment |
| Port | From your EMQX Broker deployment (typically 1883 for MQTT, 8883 for MQTT over TLS) |
| Client ID | The Thing's MQTT Client ID (set when registering the Thing) |
| Username / Password | Configured in the Broker's Authentication settings |
| Clean Session | false (recommended, to receive queued messages on reconnect) |
TIP
Using a persistent session (clean session = false) ensures the device receives shadow delta notifications, command requests, and job notifications that were published while it was offline.
Required Subscriptions
Subscribe to the following topics after connecting, replacing {thingName} with your Thing's name:
$emqx/things/{thingName}/shadow/update/delta
$emqx/things/{thingName}/shadow/update
$emqx/things/{thingName}/jobs/notify
$emqx/things/{thingName}/jobs/notify-next
$emqx/things/{thingName}/jobs/get/accepted
$emqx/things/{thingName}/jobs/get/rejected
$emqx/things/{thingName}/jobs/start-next/accepted
$emqx/things/{thingName}/jobs/start-next/rejected
$emqx/things/{thingName}/jobs/+/get/accepted
$emqx/things/{thingName}/jobs/+/get/rejected
$emqx/things/{thingName}/jobs/+/update/accepted
$emqx/things/{thingName}/jobs/+/update/rejected
$emqx/commands/things/{thingName}/executions/+/requestReport Device State
Report the device's current state by publishing to the shadow update topic (QoS 1):
Topic: $emqx/things/{thingName}/shadow/updatePayload:
{
"state": {
"reported": {
"temperature": 21.5,
"humidity": 58,
"targetTemperature": 22
}
},
"clientToken": "optional-token-for-correlation",
"version": 3
}state.reported: Required. The device's current state as a flat or nested JSON object.clientToken: Optional. An arbitrary string echoed back in the response for correlation.version: Optional. The shadow version for optimistic locking. If provided and it does not match the server version, the update is rejected. Omit or set to0to auto-increment.
Publish Events
Publish events to (QoS 1):
Topic: $emqx/things/{thingName}/events/{eventType}Payload:
{
"eventType": "highTemperature",
"severity": "warn",
"data": {
"currentTemp": 45.2
}
}eventTypein the payload must match the{eventType}segment in the topic.severity:info,warn, orerror.timestamp: Optional (Unix milliseconds). If omitted, the broker ingestion time is used.
Shadow Sync
Desired State Updates
When the cloud updates the desired state, Fleets publishes to the delta topic:
Topic: $emqx/things/{thingName}/shadow/update/deltaPayload:
{
"version": 4,
"timestamp": 1748390400,
"state": {
"targetTemperature": 24
},
"metadata": {
"targetTemperature": { "timestamp": 1748390400 }
}
}state: Contains only the fields where desired and reported differ.timestamp: Unix seconds.
Apply the delta values to the device, then report the updated state back via the shadow update topic.
Startup Procedure
On startup or reconnect, fetch the current shadow to check for any desired state that was set while the device was offline:
GET /api/v1/thing-datas/{thingName}/shadow
Authorization: Basic <apiKey:apiSecret>Apply any pending delta before entering normal operation.
See Device Shadow Sync for a complete flow diagram.
Commands {#commands}
Receive a Command
When a command is sent to the device, Fleets publishes to:
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/requestPayload:
{
"commandId": "...",
"action": "setTemperature",
"params": { "value": 24 },
"timestamp": 1748390400,
"ttl": 30
}action: The command name to execute.params: Input parameters for the command.ttl: Seconds before the command times out.
Send a Command Response
After executing the action, publish the response (QoS 1):
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/responsePayload:
{
"status": "SUCCEEDED",
"result": {
"currentTemperature": 24
}
}status:SUCCEEDED,FAILED,REJECTED,TIMED_OUT, orCANCELED.result: Optional. Any JSON object with output data.- Do not include
thingNameorexecutionIdin the MQTT payload — Fleets extracts these from the topic path.
If no response is received within the ttl, the execution status becomes TIMED_OUT.
Jobs {#jobs}
Jobs follow a pull-based protocol. The device polls for pending jobs and reports progress.
Receive a Job Notification
When a job is queued for the device, Fleets publishes to:
Topic: $emqx/things/{thingName}/jobs/notifyOn receiving this notification, request the pending jobs list:
Topic: $emqx/things/{thingName}/jobs/get
Payload: {"thingName": "{thingName}", "clientToken": "req-1"}Read the response on:
$emqx/things/{thingName}/jobs/get/acceptedStart a Job
To start the next pending job:
Topic: $emqx/things/{thingName}/jobs/start-nextPayload:
{
"thingName": "{thingName}",
"statusDetails": { "progress": "starting" },
"stepTimeoutInMinutes": 60,
"clientToken": "req-2"
}statusDetails: Optional. Initial status details for the execution.stepTimeoutInMinutes: Optional. Step timeout in minutes (1–10080).
Read the job document on:
$emqx/things/{thingName}/jobs/start-next/acceptedReport Job Status
After executing the job (or during execution for progress updates), publish:
Topic: $emqx/things/{thingName}/jobs/{jobId}/updatePayload:
{
"thingName": "{thingName}",
"status": "SUCCEEDED",
"statusDetails": {
"progress": "done"
},
"expectedVersion": 1,
"clientToken": "req-3"
}status:IN_PROGRESS,SUCCEEDED,FAILED, orREJECTED.statusDetails: Optional. Arbitrary key-value details about the execution state.expectedVersion: Optional. Optimistic lock; if provided and it does not match the server version, the update is rejected.