# MQTT Integration

This page describes how to integrate a device with EMQX Fleets using MQTT. It covers connection setup, required subscriptions, publish formats, and the flows for shadow sync, commands, and jobs.

## Prerequisites

- The device is registered as a Thing in your Fleets deployment.
- You have the EMQX Broker connection details (host, port, credentials) from your EMQX Broker deployment.
- An MQTT client library is available for your device platform.

## Connection Setup

Connect to the EMQX Broker using standard MQTT:

| Parameter | Value |
|---|---|
| **Host** | From your EMQX Broker deployment |
| **Port** | From your EMQX Broker deployment (typically 1883 for MQTT, 8883 for MQTT over TLS) |
| **Client ID** | The Thing's MQTT Client ID (set when registering the Thing) |
| **Username / Password** | Configured in the Broker's **Authentication** settings |
| **Clean Session** | `false` (recommended, to receive queued messages on reconnect) |

::: tip

Using a persistent session (`clean session = false`) ensures the device receives shadow delta notifications, command requests, and job notifications that were published while it was offline.

:::

## Required Subscriptions

Subscribe to the following topics after connecting, replacing `{thingName}` with your Thing's name:

```text
$emqx/things/{thingName}/shadow/update/delta
$emqx/things/{thingName}/shadow/update
$emqx/things/{thingName}/jobs/notify
$emqx/things/{thingName}/jobs/notify-next
$emqx/things/{thingName}/jobs/get/accepted
$emqx/things/{thingName}/jobs/get/rejected
$emqx/things/{thingName}/jobs/start-next/accepted
$emqx/things/{thingName}/jobs/start-next/rejected
$emqx/things/{thingName}/jobs/+/get/accepted
$emqx/things/{thingName}/jobs/+/get/rejected
$emqx/things/{thingName}/jobs/+/update/accepted
$emqx/things/{thingName}/jobs/+/update/rejected
$emqx/commands/things/{thingName}/executions/+/request
```

## Report Device State

Report the device's current state by publishing to the shadow update topic (QoS 1):

```text
Topic: $emqx/things/{thingName}/shadow/update
```

Payload:
```json
{
  "state": {
    "reported": {
      "temperature": 21.5,
      "humidity": 58,
      "targetTemperature": 22
    }
  },
  "clientToken": "optional-token-for-correlation",
  "version": 3
}
```

- `state.reported`: Required. The device's current state as a flat or nested JSON object.
- `clientToken`: Optional. An arbitrary string echoed back in the response for correlation.
- `version`: Optional. The shadow version for optimistic locking. If provided and it does not match the server version, the update is rejected. Omit or set to `0` to auto-increment.

## Publish Events

Publish events to (QoS 1):

```text
Topic: $emqx/things/{thingName}/events/{eventType}
```

Payload:
```json
{
  "eventType": "highTemperature",
  "severity": "warn",
  "data": {
    "currentTemp": 45.2
  }
}
```

- `eventType` in the payload must match the `{eventType}` segment in the topic.
- `severity`: `info`, `warn`, or `error`.
- `timestamp`: Optional (Unix milliseconds). If omitted, the broker ingestion time is used.

## Shadow Sync

### Desired State Updates

When the cloud updates the desired state, Fleets publishes to the delta topic:

```text
Topic: $emqx/things/{thingName}/shadow/update/delta
```

Payload:
```json
{
  "version": 4,
  "timestamp": 1748390400,
  "state": {
    "targetTemperature": 24
  },
  "metadata": {
    "targetTemperature": { "timestamp": 1748390400 }
  }
}
```

- `state`: Contains only the fields where desired and reported differ.
- `timestamp`: Unix seconds.

Apply the delta values to the device, then report the updated state back via the shadow update topic.

### Startup Procedure

On startup or reconnect, fetch the current shadow to check for any desired state that was set while the device was offline:

```text
GET /api/v1/thing-datas/{thingName}/shadow
Authorization: Basic <apiKey:apiSecret>
```

Apply any pending delta before entering normal operation.

See [Device Shadow Sync](./device_shadow_sync.md) for a complete flow diagram.

## Commands {#commands}

### Receive a Command

When a command is sent to the device, Fleets publishes to:

```text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/request
```

Payload:
```json
{
  "commandId": "...",
  "action": "setTemperature",
  "params": { "value": 24 },
  "timestamp": 1748390400,
  "ttl": 30
}
```

- `action`: The command name to execute.
- `params`: Input parameters for the command.
- `ttl`: Seconds before the command times out.

### Send a Command Response

After executing the action, publish the response (QoS 1):

```text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/response
```

Payload:
```json
{
  "status": "SUCCEEDED",
  "result": {
    "currentTemperature": 24
  }
}
```

- `status`: `SUCCEEDED`, `FAILED`, `REJECTED`, `TIMED_OUT`, or `CANCELED`.
- `result`: Optional. Any JSON object with output data.
- Do not include `thingName` or `executionId` in the MQTT payload — Fleets extracts these from the topic path.

If no response is received within the `ttl`, the execution status becomes `TIMED_OUT`.

## Jobs {#jobs}

Jobs follow a pull-based protocol. The device polls for pending jobs and reports progress.

### Receive a Job Notification

When a job is queued for the device, Fleets publishes to:

```text
Topic: $emqx/things/{thingName}/jobs/notify
```

On receiving this notification, request the pending jobs list:

```text
Topic: $emqx/things/{thingName}/jobs/get
Payload: {"thingName": "{thingName}", "clientToken": "req-1"}
```

Read the response on:
```text
$emqx/things/{thingName}/jobs/get/accepted
```

### Start a Job

To start the next pending job:

```text
Topic: $emqx/things/{thingName}/jobs/start-next
```

Payload:
```json
{
  "thingName": "{thingName}",
  "statusDetails": { "progress": "starting" },
  "stepTimeoutInMinutes": 60,
  "clientToken": "req-2"
}
```

- `statusDetails`: Optional. Initial status details for the execution.
- `stepTimeoutInMinutes`: Optional. Step timeout in minutes (1–10080).

Read the job document on:
```text
$emqx/things/{thingName}/jobs/start-next/accepted
```

### Report Job Status

After executing the job (or during execution for progress updates), publish:

```text
Topic: $emqx/things/{thingName}/jobs/{jobId}/update
```

Payload:
```json
{
  "thingName": "{thingName}",
  "status": "SUCCEEDED",
  "statusDetails": {
    "progress": "done"
  },
  "expectedVersion": 1,
  "clientToken": "req-3"
}
```

- `status`: `IN_PROGRESS`, `SUCCEEDED`, `FAILED`, or `REJECTED`.
- `statusDetails`: Optional. Arbitrary key-value details about the execution state.
- `expectedVersion`: Optional. Optimistic lock; if provided and it does not match the server version, the update is rejected.

## Next Steps

- [HTTPS Integration](./https_integration.md)
- [Device Shadow Sync](./device_shadow_sync.md)
- [Device Shadow](../device_shadow.md)
