Skip to content

MQTT 統合

このページでは、MQTT を使用してデバイスを EMQX Fleets と統合する方法について説明します。接続設定、必要なサブスクライブ、パブリッシュ形式、およびシャドウ同期、コマンド、ジョブのフローをカバーしています。

前提条件

  • デバイスが Fleets のデプロイメントで Thing として登録されていること。
  • EMQX ブローカーの接続情報(ホスト、ポート、認証情報)を EMQX ブローカーのデプロイメントから取得していること。
  • デバイスプラットフォームに対応した MQTT クライアントライブラリが利用可能であること。

接続設定

標準の MQTT を使用して EMQX ブローカーに接続します。

パラメーター
HostEMQX ブローカーのデプロイメントから取得
PortEMQX ブローカーのデプロイメントから取得(通常は MQTT は 1883、TLS 経由の MQTT は 8883)
Client IDThing 登録時に設定した Thing の MQTT クライアント ID
Username / Passwordブローカーの 認証 設定で構成されたもの
Clean Sessionfalse(再接続時にキューイングされたメッセージを受信するため推奨)

TIP

永続セッション(clean session = false)を使用すると、デバイスがオフライン中にパブリッシュされたシャドウのデルタ通知、コマンドリクエスト、ジョブ通知を受信できます。

必須サブスクライブ

接続後、以下のトピックをサブスクライブします。{thingName} は Thing の名前に置き換えてください。

text
$emqx/things/{thingName}/shadow/update/delta
$emqx/things/{thingName}/shadow/update
$emqx/things/{thingName}/jobs/notify
$emqx/things/{thingName}/jobs/notify-next
$emqx/things/{thingName}/jobs/get/accepted
$emqx/things/{thingName}/jobs/get/rejected
$emqx/things/{thingName}/jobs/start-next/accepted
$emqx/things/{thingName}/jobs/start-next/rejected
$emqx/things/{thingName}/jobs/+/get/accepted
$emqx/things/{thingName}/jobs/+/get/rejected
$emqx/things/{thingName}/jobs/+/update/accepted
$emqx/things/{thingName}/jobs/+/update/rejected
$emqx/commands/things/{thingName}/executions/+/request

デバイス状態の報告

シャドウ更新トピックにパブリッシュして、デバイスの現在の状態を報告します(QoS 1)。

text
Topic: $emqx/things/{thingName}/shadow/update

ペイロード:

json
{
  "state": {
    "reported": {
      "temperature": 21.5,
      "humidity": 58,
      "targetTemperature": 22
    }
  },
  "clientToken": "optional-token-for-correlation",
  "version": 3
}
  • state.reported: 必須。デバイスの現在の状態をフラットまたはネストされた JSON オブジェクトで表現します。
  • clientToken: 任意。レスポンスでエコーバックされる相関用の任意の文字列です。
  • version: 任意。楽観的ロック用のシャドウバージョン。指定し、かつサーバーのバージョンと一致しない場合は更新が拒否されます。省略または 0 にすると自動インクリメントされます。

イベントのパブリッシュ

以下のトピックにイベントをパブリッシュします(QoS 1)。

text
Topic: $emqx/things/{thingName}/events/{eventType}

ペイロード:

json
{
  "eventType": "highTemperature",
  "severity": "warn",
  "data": {
    "currentTemp": 45.2
  }
}
  • ペイロード内の eventType はトピックの {eventType} セグメントと一致する必要があります。
  • severity: infowarnerror のいずれか。
  • timestamp: 任意(Unix ミリ秒)。省略した場合はブローカーの取り込み時間が使用されます。

シャドウ同期

Desired State の更新

クラウドが Desired State を更新すると、Fleets はデルタトピックにパブリッシュします。

text
Topic: $emqx/things/{thingName}/shadow/update/delta

ペイロード:

json
{
  "version": 4,
  "timestamp": 1748390400,
  "state": {
    "targetTemperature": 24
  },
  "metadata": {
    "targetTemperature": { "timestamp": 1748390400 }
  }
}
  • state: Desired と Reported が異なるフィールドのみを含みます。
  • timestamp: Unix 秒。

デルタ値をデバイスに適用し、その後シャドウ更新トピックを通じて更新済み状態を報告してください。

起動手順

起動時または再接続時に、デバイスがオフライン中に設定された Desired State があるかどうかを確認するために現在のシャドウを取得します。

text
GET /api/v1/thing-datas/{thingName}/shadow
Authorization: Basic <apiKey:apiSecret>

保留中のデルタがあれば適用し、通常の動作に入ります。

完全なフローダイアグラムについては Device Shadow Sync を参照してください。

コマンド {#commands}

コマンドの受信

デバイスにコマンドが送信されると、Fleets は以下にパブリッシュします。

text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/request

ペイロード:

json
{
  "commandId": "...",
  "action": "setTemperature",
  "params": { "value": 24 },
  "timestamp": 1748390400,
  "ttl": 30
}
  • action: 実行するコマンド名。
  • params: コマンドの入力パラメーター。
  • ttl: コマンドのタイムアウト秒数。

コマンドレスポンスの送信

アクション実行後、レスポンスをパブリッシュします(QoS 1)。

text
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/response

ペイロード:

json
{
  "status": "SUCCEEDED",
  "result": {
    "currentTemperature": 24
  }
}
  • status: SUCCEEDEDFAILEDREJECTEDTIMED_OUTCANCELED のいずれか。
  • result: 任意。出力データを含む任意の JSON オブジェクト。
  • MQTT ペイロードに thingNameexecutionId を含めないでください。Fleets はトピックパスからこれらを抽出します。

ttl 内にレスポンスが受信されない場合、実行ステータスは TIMED_OUT になります。

ジョブ {#jobs}

ジョブはプルベースのプロトコルに従います。デバイスは保留中のジョブをポーリングし、進捗を報告します。

ジョブ通知の受信

ジョブがデバイスにキューイングされると、Fleets は以下にパブリッシュします。

text
Topic: $emqx/things/{thingName}/jobs/notify

この通知を受信したら、保留中ジョブリストをリクエストします。

text
Topic: $emqx/things/{thingName}/jobs/get
Payload: {"thingName": "{thingName}", "clientToken": "req-1"}

レスポンスは以下で受信します。

text
$emqx/things/{thingName}/jobs/get/accepted

ジョブの開始

次の保留中ジョブを開始するには:

text
Topic: $emqx/things/{thingName}/jobs/start-next

ペイロード:

json
{
  "thingName": "{thingName}",
  "statusDetails": { "progress": "starting" },
  "stepTimeoutInMinutes": 60,
  "clientToken": "req-2"
}
  • statusDetails: 任意。実行の初期ステータス詳細。
  • stepTimeoutInMinutes: 任意。ステップのタイムアウト(1~10080分)。

ジョブドキュメントは以下で受信します。

text
$emqx/things/{thingName}/jobs/start-next/accepted

ジョブステータスの報告

ジョブ実行後(または進捗更新時)にパブリッシュします。

text
Topic: $emqx/things/{thingName}/jobs/{jobId}/update

ペイロード:

json
{
  "thingName": "{thingName}",
  "status": "SUCCEEDED",
  "statusDetails": {
    "progress": "done"
  },
  "expectedVersion": 1,
  "clientToken": "req-3"
}
  • status: IN_PROGRESSSUCCEEDEDFAILEDREJECTED のいずれか。
  • statusDetails: 任意。実行状態に関する任意のキー・バリュー詳細。
  • expectedVersion: 任意。楽観的ロック。指定し、かつサーバーバージョンと一致しない場合は更新が拒否されます。

次のステップ