Skip to content

MQTT Integration

This page describes how to integrate a device with EMQX Fleets using MQTT. It covers connection setup, required subscriptions, publish formats, and the flows for shadow sync, commands, and jobs.

Prerequisites

  • The device is registered as a Thing in your Fleets deployment.
  • You have the EMQX Broker connection details (host, port, credentials) from your EMQX Broker deployment.
  • An MQTT client library is available for your device platform.

Connection Setup

Connect to the EMQX Broker using standard MQTT:

ParameterValue
HostFrom your EMQX Broker deployment
PortFrom your EMQX Broker deployment (typically 1883 for MQTT, 8883 for MQTT over TLS)
Client IDThe Thing's MQTT Client ID (set when registering the Thing)
Username / PasswordConfigured in the Broker's Authentication settings
Clean Sessionfalse (recommended, to receive queued messages on reconnect)

TIP

Using a persistent session (clean session = false) ensures the device receives shadow delta notifications, command requests, and job notifications that were published while it was offline.

Required Subscriptions

Subscribe to the following topics after connecting, replacing {thingName} with your Thing's name:

text
$emqx/things/{thingName}/shadow/update/delta
$emqx/things/{thingName}/shadow/update
$emqx/things/{thingName}/jobs/notify
$emqx/things/{thingName}/jobs/notify-next
$emqx/things/{thingName}/jobs/get/accepted
$emqx/things/{thingName}/jobs/get/rejected
$emqx/things/{thingName}/jobs/start-next/accepted
$emqx/things/{thingName}/jobs/start-next/rejected
$emqx/things/{thingName}/jobs/+/get/accepted
$emqx/things/{thingName}/jobs/+/get/rejected
$emqx/things/{thingName}/jobs/+/update/accepted
$emqx/things/{thingName}/jobs/+/update/rejected
$emqx/commands/things/{thingName}/executions/+/request

Report Device State

Report the device's current state by publishing to the shadow update topic (QoS 1):

text
Topic: $emqx/things/{thingName}/shadow/update

Payload:

json
{
  "state": {
    "reported": {
      "temperature": 21.5,
      "humidity": 58,
      "targetTemperature": 22
    }
  },
  "clientToken": "optional-token-for-correlation",
  "version": 3
}
  • state.reported: Required. The device's current state as a flat or nested JSON object.
  • clientToken: Optional. An arbitrary string echoed back in the response for correlation.
  • version: Optional. The shadow version for optimistic locking. If provided and it does not match the server version, the update is rejected. Omit or set to 0 to auto-increment.

Publish Events

Publish events to (QoS 1):

text
Topic: $emqx/things/{thingName}/events/{eventType}

Payload:

json
{
  "eventType": "highTemperature",
  "severity": "warn",
  "data": {
    "currentTemp": 45.2
  }
}
  • eventType in the payload must match the {eventType} segment in the topic.
  • severity: info, warn, or error.
  • timestamp: Optional (Unix milliseconds). If omitted, the broker ingestion time is used.

Shadow Sync

Desired State Updates

When the cloud updates the desired state, Fleets publishes to the delta topic:

text
Topic: $emqx/things/{thingName}/shadow/update/delta

Payload:

json
{
  "version": 4,
  "timestamp": 1748390400,
  "state": {
    "targetTemperature": 24
  },
  "metadata": {
    "targetTemperature": { "timestamp": 1748390400 }
  }
}
  • state: Contains only the fields where desired and reported differ.
  • timestamp: Unix seconds.

Apply the delta values to the device, then report the updated state back via the shadow update topic.

Startup Procedure

On startup or reconnect, fetch the current shadow to check for any desired state that was set while the device was offline:

text
GET /api/v1/thing-datas/{thingName}/shadow
Authorization: Basic <apiKey:apiSecret>

Apply any pending delta before entering normal operation.

See Device Shadow Sync for a complete flow diagram.

Commands {#commands}

Receive a Command

When a command is sent to the device, Fleets publishes to:

text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/request

Payload:

json
{
  "commandId": "...",
  "action": "setTemperature",
  "params": { "value": 24 },
  "timestamp": 1748390400,
  "ttl": 30
}
  • action: The command name to execute.
  • params: Input parameters for the command.
  • ttl: Seconds before the command times out.

Send a Command Response

After executing the action, publish the response (QoS 1):

text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/response

Payload:

json
{
  "status": "SUCCEEDED",
  "result": {
    "currentTemperature": 24
  }
}
  • status: SUCCEEDED, FAILED, REJECTED, TIMED_OUT, or CANCELED.
  • result: Optional. Any JSON object with output data.
  • Do not include thingName or executionId in the MQTT payload — Fleets extracts these from the topic path.

If no response is received within the ttl, the execution status becomes TIMED_OUT.

Jobs {#jobs}

Jobs follow a pull-based protocol. The device polls for pending jobs and reports progress.

Receive a Job Notification

When a job is queued for the device, Fleets publishes to:

text
Topic: $emqx/things/{thingName}/jobs/notify

On receiving this notification, request the pending jobs list:

text
Topic: $emqx/things/{thingName}/jobs/get
Payload: {"thingName": "{thingName}", "clientToken": "req-1"}

Read the response on:

text
$emqx/things/{thingName}/jobs/get/accepted

Start a Job

To start the next pending job:

text
Topic: $emqx/things/{thingName}/jobs/start-next

Payload:

json
{
  "thingName": "{thingName}",
  "statusDetails": { "progress": "starting" },
  "stepTimeoutInMinutes": 60,
  "clientToken": "req-2"
}
  • statusDetails: Optional. Initial status details for the execution.
  • stepTimeoutInMinutes: Optional. Step timeout in minutes (1–10080).

Read the job document on:

text
$emqx/things/{thingName}/jobs/start-next/accepted

Report Job Status

After executing the job (or during execution for progress updates), publish:

text
Topic: $emqx/things/{thingName}/jobs/{jobId}/update

Payload:

json
{
  "thingName": "{thingName}",
  "status": "SUCCEEDED",
  "statusDetails": {
    "progress": "done"
  },
  "expectedVersion": 1,
  "clientToken": "req-3"
}
  • status: IN_PROGRESS, SUCCEEDED, FAILED, or REJECTED.
  • statusDetails: Optional. Arbitrary key-value details about the execution state.
  • expectedVersion: Optional. Optimistic lock; if provided and it does not match the server version, the update is rejected.

Next Steps