Skip to content

MQTTによるA2Aの仕組み

このページでは、MQTTによるA2Aの基本概念について説明します。エージェント、クライアント、ブローカーの構成、エージェント同士の識別と検出方法、Agent Cardの内容、エージェント間の通信パターンなどを解説します。これらの概念を理解することは、EMQXのA2Aレジストリを活用するための基礎となります。

アーキテクチャ

MQTTによるA2Aは、ブローカー中心のモデルで3つの参加者が存在します。

  • エージェント(レスポンダー):自身のAgent Cardを検出トピックにリテインドメッセージとしてパブリッシュし、リクエストトピックをサブスクライブして、受信したタスクリクエストに応答します。
  • クライアントエージェント(リクエスター):検出トピックをサブスクライブして利用可能なエージェントを探索し、タスクリクエストを送信し、返信を受信します。
  • MQTTブローカー(EMQX):すべてのメッセージをルーティングし、A2AレジストリにAgent Cardを記録し、認証と認可を実施し、検出メッセージにライブネスメタデータを付加します。

エージェントの識別

各エージェントは3階層の階層構造で識別されます。

{org_id} / {unit_id} / {agent_id}
  • org_id:エージェントが所属する組織(例:com.example
  • unit_id:組織内の部署やチーム、デプロイ環境などの区分(例:factory-a
  • agent_id:組織・部署内で一意のエージェント識別子(例:iot-ops-agent-001

3つのセグメントはすべて ^[A-Za-z0-9_.-]+$ にマッチし、/+#、空白文字を含んではいけません。エージェントのMQTTクライアントIDは、これらを組み合わせた形式 {org_id}/{unit_id}/{agent_id} を使用します。

トピックモデル

トピック用途
$a2a/v1/discovery/{org_id}/{unit_id}/{agent_id}エージェントの登録および検出(リテインド)
$a2a/v1/request/{org_id}/{unit_id}/{agent_id}特定エージェントへのタスクリクエスト受信
$a2a/v1/reply/{org_id}/{unit_id}/{agent_id}/{suffix}推奨される返信トピックパターン(下記注記参照)
$a2a/v1/event/{org_id}/{unit_id}/{agent_id}要求なしのイベントパブリッシュ
$a2a/v1/request/{org_id}/{unit_id}/pool/{pool_id}ロードバランスされた共有プールトピック

注記

返信トピックは固定のプロトコルトピックではありません。リクエスターは任意のトピックを返信トピックとして使用できます。レスポンダーはMQTT v5のResponse Topicプロパティから返信先を知ります。上記パターンは一貫性を保ちACL設定を簡素化するための推奨例です。

検出サブスクリプションはワイルドカードを使って範囲を指定します。

$a2a/v1/discovery/com.example/+/+     # 組織内のすべてのエージェント
$a2a/v1/discovery/com.example/factory-a/+  # 部署内のすべてのエージェント

Agent Card

Agent Cardは、エージェントが自身の検出トピックにパブリッシュするJSONドキュメントです。エージェントの識別情報、機能、HTTPエンドポイント、任意のセキュリティメタデータを記述します。EMQXは受信時にA2Aレジストリにカードを記録します。

最低限必要なフィールド:

フィールド説明
name文字列人間が読めるエージェント名
description文字列エージェントの概要説明
version文字列バージョン文字列(例:"1.0.0")
url文字列(URI)エージェントのエンドポイントURI。任意
skills配列少なくとも1つのスキルオブジェクト。各スキルはidnamedescriptionを持つ

最小限のAgent Cardの例:

json
{
  "name": "IoT Operations Agent",
  "description": "工場のテレメトリを監視し、修復アクションを調整します。",
  "version": "1.2.3",
  "url": "mqtts://broker.example.com:8883",
  "skills": [
    {
      "id": "device-diagnostics",
      "name": "デバイス診断",
      "description": "テレメトリを解析し、デバイス異常を検出します。"
    }
  ]
}

capabilitiessecuritySchemessupportedInterfaces、拡張パラメータを含む完全なAgent Cardスキーマは、A2A仕様をご参照ください。

エージェントのライブネス

Agent Cardはエージェントが切断された後もリテインドメッセージとして残ります。EMQXは接続状態を追跡し、検出メッセージをサブスクライバーに転送する際にMQTT v5のユーザープロパティを付加します。

ユーザープロパティ意味
a2a-statusonlineエージェントのMQTT接続がアクティブ
a2a-statusofflineエージェントが切断済み(正常切断またはLWTによる)
a2a-status-sourcebrokerEMQXが状態を設定
a2a-status-sourceagentエージェント自身が状態を設定
a2a-status-sourcelwt予期しない切断(Last Will)を反映

エージェントは検出トピックにa2a-status=offlinea2a-status-source=lwtのLast Willメッセージを設定し、異常切断時にサブスクライバーへ自動通知されるようにしてください。

インタラクションパターン

MQTTによるA2Aは、エージェント間で以下のインタラクションパターンをサポートします。いずれもMQTT v5のResponse TopicCorrelation Dataプロパティを使用し、リクエスター側で生成するTask.idによりタスクの状態をライフサイクル全体で追跡します。

パターン説明
1リクエスト1レスポンスリクエスターがタスクリクエストをパブリッシュし、レスポンダーが指定されたResponse Topicに単一の返信をパブリッシュ
ストリーミングレスポンスレスポンダーが複数のステータスや成果物更新メッセージをパブリッシュし、最終的なタスク完了状態に達するまで続ける
マルチターン会話関連タスクをTask.context_idでグループ化し、中断したタスクの再開を可能にする
共有プールディスパッチ複数のエージェントインスタンスがプールトピックを共有し、MQTT共有サブスクリプションでロードバランスされたリクエスト処理を行う
タスクハンドオーバーレスポンダーが進行中のタスクをa2a-responder-agent-idユーザープロパティを使って別インスタンスに委譲する
OAuth 2.0認可リクエストごとにベアラートークンをa2a-authorization MQTTユーザープロパティとして渡す
エンドツーエンドセキュリティ任意のubsp-v1セキュリティプロファイルにより、信頼できないブローカー環境でもペイロードのエンドツーエンド暗号化を提供

各パターンの詳細な仕様は、MQTTによるA2Aトランスポート仕様をご覧ください。

例:工場アラート対応のワークフロー

2つのエージェントが協力して工場フロアのアラートを処理します。異常を検知して診断タスクを委譲するモニターエージェントと、タスクを処理して結果をストリーム配信する修理エージェントです。

ステップ1:両エージェントが登録。 各エージェントは自身のAgent Cardを検出トピックにリテインドメッセージとしてパブリッシュします。EMQXはカードを記録し、両エージェントをオンライン状態としてマークします。

ステップ2:モニターエージェントが修理エージェントを検出。 モニターエージェントは$a2a/v1/discovery/com.example/factory-a/+をサブスクライブし、修理エージェントのリテインドカードを即座に受信、設備故障の診断が可能であることを確認します。

ステップ3:モニターエージェントがタスクリクエストを送信。 モーターline-7から異常振動の読み取りが届きます。モニターエージェントは修理エージェントのリクエストトピックに、ユニークなTask.idとMQTTのResponse Topicプロパティを設定してリクエストをパブリッシュします。

ステップ4:修理エージェントがステータス更新をストリーム配信。 修理エージェントは返信トピックに進捗更新をパブリッシュし、最終的にcompletedステータス(ベアリング摩耗検出、検査予定)を送信します。各更新は元のCorrelation Dataをエコーし、モニターエージェントがリクエストと対応付けられるようにします。