Skip to content

Python SDK

Python SDK 适合网关程序、验证脚本或已有 Python 服务。代码包已处理 MQTT 连接、命令订阅、参数校验、命令响应、状态上报和事件上报;接入时只需把默认状态更新逻辑替换为真实传感器、执行器或已有服务调用。

适用场景

  • 用网关程序把一组设备或外部系统接入设备智能体。
  • 用脚本快速验证设备规格、命令、遥测和事件。
  • 已有 Python 服务负责读取设备数据或调用业务系统。

代码包内容

内容作用
src/main.py设备入口,负责连接、订阅、响应、状态和事件上报
src/voice_client.py设备端语音 WebSocket 客户端
device-spec.json当前设备规格,作为命令校验和字段映射依据
.env.exampleMQTT、namespaceproductIddeviceId 等连接配置
README.md当前代码包的安装、运行和开发说明
_references/参考用的共享 SDK 源码,便于查看消息结构

真实接入时,修改 src/main.py 中的状态生成、命令处理和事件触发逻辑。

接入步骤

  1. 下载 Python SDK 代码包。
  2. 复制 .env.example.env,按需替换 MQTT 地址和认证信息。
  3. 启动程序,确认设备能上线。
  4. 修改 apply_command_to_state(),把默认状态更新替换为真实设备逻辑。
  5. 回到设备智能体工作区,测试命令、状态和事件。
bash
cp .env.example .env
uv run device-agent-toolkit

也可以直接运行入口文件:

bash
uv run python src/main.py

实现设备逻辑

默认代码会根据 device-spec.json 校验命令和参数,然后把命令参数合并到当前状态中。真实接入时,更新 apply_command_to_state()

python
def apply_command_to_state(device_spec, state, command, params):
    next_state = deepcopy(state)

    if command == "set_temperature":
        target = params["target_temperature"]
        call_thermostat_service(target)
        next_state["target_temperature"] = target

    if "updated_at" in next_state:
        next_state["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    return next_state

这个函数用于执行真实设备动作,例如读取传感器、调用网关接口、控制继电器,或把命令转换成已有系统的 API 调用。返回的新状态会用于命令响应和状态上报。

如果设备规格中定义了事件,可以在业务逻辑中调用 publish_event()

python
publish_event("temperature_alarm", {
    "current_temperature": 32.5,
    "level": "warning",
})

语音接入代码

Python SDK 包含 src/voice_client.py,用于从设备端接入语音对话。它连接 /ws/voice,发送 16 kHz 单声道 Int16LE PCM 音频,并通过事件回调接收识别文本、智能体回复和 TTS 音频。

python
from voice_client import VoiceClient

voice = VoiceClient(
    ws_url="ws://127.0.0.1:3001/ws/voice",
    device_id="device-001",
    product_id="agent-001",
)

await voice.connect()
await voice.start_listening("manual")
await voice.send_audio(pcm_chunk)
await voice.stop_listening()

真实设备需要把麦克风采集和扬声器播放接到这几个调用上。语音服务、音色和鉴权配置见 语音交互

视觉识别代码

src/main.py 内置了拍照识别命令流程。当设备规格中存在以下命令时,程序会优先执行视觉识别,而不是普通状态更新:

  • capture_and_recognize
  • take_photo_vision
  • vision_recognize
  • photo_identify

设备端会按顺序读取命令中的 imageDataUrlimageBase64,或 .env 中的 VISION_FALLBACK_IMAGE_DATA_URL。如果都没有,就调用 capture_local_vision_image()。真实接入时,在这个函数中读取摄像头、截图或图像文件,并返回一张图片。

python
def capture_local_vision_image():
    return {
        "mimeType": "image/jpeg",
        "imageBase64": read_camera_frame_as_base64(),
        "source": "sdk-camera",
    }

程序会把图片上传到 /api/vision/frames,再调用 /api/chat 并携带 visionRefs。这适合一次命令触发一次拍照识别,不是连续视频流。

验证接入

程序启动后,回到设备智能体工作区确认:

  1. 设备列表中出现这台设备,并且状态为在线。
  2. 当前数据能看到 Python 程序上报的字段。
  3. 通过对话下发命令后,apply_command_to_state() 中的逻辑被执行。
  4. 如果调用了 publish_event(),最近上报事件中能看到对应记录。