MQTT 統合
このページでは、MQTT を使用してデバイスを EMQX Fleets と統合する方法について説明します。接続設定、必要なサブスクライブ、パブリッシュ形式、およびシャドウ同期、コマンド、ジョブのフローをカバーしています。
前提条件
- デバイスが Fleets のデプロイメントで Thing として登録されていること。
- EMQX ブローカーの接続情報(ホスト、ポート、認証情報)を EMQX ブローカーのデプロイメントから取得していること。
- デバイスプラットフォームに対応した MQTT クライアントライブラリが利用可能であること。
接続設定
標準の MQTT を使用して EMQX ブローカーに接続します。
| パラメーター | 値 |
|---|---|
| Host | EMQX ブローカーのデプロイメントから取得 |
| Port | EMQX ブローカーのデプロイメントから取得(通常は MQTT は 1883、TLS 経由の MQTT は 8883) |
| Client ID | Thing 登録時に設定した Thing の MQTT クライアント ID |
| Username / Password | ブローカーの 認証 設定で構成されたもの |
| Clean Session | false(再接続時にキューイングされたメッセージを受信するため推奨) |
TIP
永続セッション(clean session = false)を使用すると、デバイスがオフライン中にパブリッシュされたシャドウのデルタ通知、コマンドリクエスト、ジョブ通知を受信できます。
必須サブスクライブ
接続後、以下のトピックをサブスクライブします。{thingName} は Thing の名前に置き換えてください。
$emqx/things/{thingName}/shadow/update/delta
$emqx/things/{thingName}/shadow/update
$emqx/things/{thingName}/jobs/notify
$emqx/things/{thingName}/jobs/notify-next
$emqx/things/{thingName}/jobs/get/accepted
$emqx/things/{thingName}/jobs/get/rejected
$emqx/things/{thingName}/jobs/start-next/accepted
$emqx/things/{thingName}/jobs/start-next/rejected
$emqx/things/{thingName}/jobs/+/get/accepted
$emqx/things/{thingName}/jobs/+/get/rejected
$emqx/things/{thingName}/jobs/+/update/accepted
$emqx/things/{thingName}/jobs/+/update/rejected
$emqx/commands/things/{thingName}/executions/+/requestデバイス状態の報告
シャドウ更新トピックにパブリッシュして、デバイスの現在の状態を報告します(QoS 1)。
Topic: $emqx/things/{thingName}/shadow/updateペイロード:
{
"state": {
"reported": {
"temperature": 21.5,
"humidity": 58,
"targetTemperature": 22
}
},
"clientToken": "optional-token-for-correlation",
"version": 3
}state.reported: 必須。デバイスの現在の状態をフラットまたはネストされた JSON オブジェクトで表現します。clientToken: 任意。レスポンスでエコーバックされる相関用の任意の文字列です。version: 任意。楽観的ロック用のシャドウバージョン。指定し、かつサーバーのバージョンと一致しない場合は更新が拒否されます。省略または0にすると自動インクリメントされます。
イベントのパブリッシュ
以下のトピックにイベントをパブリッシュします(QoS 1)。
Topic: $emqx/things/{thingName}/events/{eventType}ペイロード:
{
"eventType": "highTemperature",
"severity": "warn",
"data": {
"currentTemp": 45.2
}
}- ペイロード内の
eventTypeはトピックの{eventType}セグメントと一致する必要があります。 severity:info、warn、errorのいずれか。timestamp: 任意(Unix ミリ秒)。省略した場合はブローカーの取り込み時間が使用されます。
シャドウ同期
Desired State の更新
クラウドが Desired State を更新すると、Fleets はデルタトピックにパブリッシュします。
Topic: $emqx/things/{thingName}/shadow/update/deltaペイロード:
{
"version": 4,
"timestamp": 1748390400,
"state": {
"targetTemperature": 24
},
"metadata": {
"targetTemperature": { "timestamp": 1748390400 }
}
}state: Desired と Reported が異なるフィールドのみを含みます。timestamp: Unix 秒。
デルタ値をデバイスに適用し、その後シャドウ更新トピックを通じて更新済み状態を報告してください。
起動手順
起動時または再接続時に、デバイスがオフライン中に設定された Desired State があるかどうかを確認するために現在のシャドウを取得します。
GET /api/v1/thing-datas/{thingName}/shadow
Authorization: Basic <apiKey:apiSecret>保留中のデルタがあれば適用し、通常の動作に入ります。
完全なフローダイアグラムについては Device Shadow Sync を参照してください。
コマンド {#commands}
コマンドの受信
デバイスにコマンドが送信されると、Fleets は以下にパブリッシュします。
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/requestペイロード:
{
"commandId": "...",
"action": "setTemperature",
"params": { "value": 24 },
"timestamp": 1748390400,
"ttl": 30
}action: 実行するコマンド名。params: コマンドの入力パラメーター。ttl: コマンドのタイムアウト秒数。
コマンドレスポンスの送信
アクション実行後、レスポンスをパブリッシュします(QoS 1)。
Topic: $emqx/commands/things/{thingName}/executions/{executionId}/responseペイロード:
{
"status": "SUCCEEDED",
"result": {
"currentTemperature": 24
}
}status:SUCCEEDED、FAILED、REJECTED、TIMED_OUT、CANCELEDのいずれか。result: 任意。出力データを含む任意の JSON オブジェクト。- MQTT ペイロードに
thingNameやexecutionIdを含めないでください。Fleets はトピックパスからこれらを抽出します。
ttl 内にレスポンスが受信されない場合、実行ステータスは TIMED_OUT になります。
ジョブ {#jobs}
ジョブはプルベースのプロトコルに従います。デバイスは保留中のジョブをポーリングし、進捗を報告します。
ジョブ通知の受信
ジョブがデバイスにキューイングされると、Fleets は以下にパブリッシュします。
Topic: $emqx/things/{thingName}/jobs/notifyこの通知を受信したら、保留中ジョブリストをリクエストします。
Topic: $emqx/things/{thingName}/jobs/get
Payload: {"thingName": "{thingName}", "clientToken": "req-1"}レスポンスは以下で受信します。
$emqx/things/{thingName}/jobs/get/acceptedジョブの開始
次の保留中ジョブを開始するには:
Topic: $emqx/things/{thingName}/jobs/start-nextペイロード:
{
"thingName": "{thingName}",
"statusDetails": { "progress": "starting" },
"stepTimeoutInMinutes": 60,
"clientToken": "req-2"
}statusDetails: 任意。実行の初期ステータス詳細。stepTimeoutInMinutes: 任意。ステップのタイムアウト(1~10080分)。
ジョブドキュメントは以下で受信します。
$emqx/things/{thingName}/jobs/start-next/acceptedジョブステータスの報告
ジョブ実行後(または進捗更新時)にパブリッシュします。
Topic: $emqx/things/{thingName}/jobs/{jobId}/updateペイロード:
{
"thingName": "{thingName}",
"status": "SUCCEEDED",
"statusDetails": {
"progress": "done"
},
"expectedVersion": 1,
"clientToken": "req-3"
}status:IN_PROGRESS、SUCCEEDED、FAILED、REJECTEDのいずれか。statusDetails: 任意。実行状態に関する任意のキー・バリュー詳細。expectedVersion: 任意。楽観的ロック。指定し、かつサーバーバージョンと一致しない場合は更新が拒否されます。