---
title: MQTT 协议
---

# MQTT 协议

本文以 Python MQTT 客户端（paho-mqtt）为例，介绍如何使用 MQTT 协议接入 FlowMQ 。

::: tip
下文所有示例中的 `your-flowmq-host` 均为占位符，请根据实际情况进行替换。
:::

## 1. 连接 FlowMQ


### 1.1 安装依赖

```bash
python3 -m pip install paho-mqtt
```

### 1.2 建立连接（MQTT 5.0）

```python
import ssl
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883  # TLS 通常使用 8883，见本文 TLS 小节
CLIENT_ID = "demo-client"

def on_connect(client, userdata, flags, reason_code, properties=None):
    print("connected, reason_code=", reason_code)
    # 连接成功后订阅
    client.subscribe("sensors/+/telemetry", qos=1)

def on_message(client, userdata, msg):
    print("recv", msg.topic, msg.qos, msg.payload)

client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message

# 如需用户名密码认证（示例）
# client.username_pw_set("username", "password")

client.connect(HOST, PORT, keepalive=60)
client.loop_forever()
```

## 2. 发布消息

### 2.1 基础发布

```python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="pub-1", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

topic = "sensors/device-001/telemetry"
payload = b"{\"temp\": 23.5, \"ts\": 1700000000}"

info = client.publish(topic, payload=payload, qos=1, retain=False)
info.wait_for_publish()
print("published", info.mid)

client.disconnect()
```

### 2.2 带 MQTT 5.0 属性的发布（Properties）

以下示例展示消息过期间隔、响应主题与用户属性等典型用法。

```python
from paho.mqtt import client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="pub-2", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 30          # 30 秒后过期
props.ResponseTopic = "reply/device-001"  # 响应主题
props.UserProperty = [("source", "edge"), ("format", "json")]

client.publish(
    "cmd/device-001",
    payload=b"reboot",
    qos=1,
    retain=False,
    properties=props,
)

client.disconnect()
```

## 3. 订阅

### 3.1 基础订阅

```python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_message(client, userdata, msg):
    print(f"[{msg.topic}] {msg.payload.decode(errors='ignore')}")

client = mqtt.Client(client_id="sub-1", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)

client.subscribe("alerts/#", qos=1)
client.loop_forever()
```

### 3.2 通配符订阅（`+` 与 `#`）

- `+`：单层通配符，例如 `sensors/+/telemetry`
- `#`：多层通配符（必须位于末尾），例如 `sensors/#`


### 3.3 订阅端读取 MQTT 5.0 属性

```python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_message(client, userdata, msg):
    props = getattr(msg, "properties", None)
    user_props = None
    if props is not None:
        user_props = getattr(props, "UserProperty", None)
    print("topic=", msg.topic, "payload=", msg.payload)
    print("user_properties=", user_props)

client = mqtt.Client(client_id="sub-props", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.subscribe("cmd/#", qos=1)
client.loop_forever()
```

## 4. QoS 语义与选型

- **QoS 0（At most once / 至多一次）**
  - 低延迟、低开销
  - 可能丢消息；适合高频遥测、允许采样丢失的场景
- **QoS 1（At least once / 至少一次）**
  - 可能重复投递；应用侧需要具备幂等处理能力
  - 适合大多数业务消息与命令下发
- **QoS 2（Exactly once / 恰好一次）**
  - 协议交互更复杂，吞吐与延迟成本更高
  - 适合强一致、不可重复的关键交易类消息（谨慎使用）

## 5. Retained、LWT 与消息属性

### 5.1 Retained（保留消息）

- Retained 消息会作为该 Topic 的"最新状态"保存。
- 新订阅者订阅该 Topic 后会立即收到最近一次 retained 消息。

适用：设备在线状态、配置快照、最后一次测量值等。

### 5.2 LWT（遗嘱消息）

- 当客户端非正常断开时，Broker 会发布预先设置的遗嘱消息。
- 常用于"设备离线通知"。

paho-mqtt 示例：

```python
from paho.mqtt import client as mqtt

client = mqtt.Client(client_id="with-lwt", protocol=mqtt.MQTTv5)
client.will_set("presence/device-001", payload=b"offline", qos=1, retain=True)
client.connect("your-flowmq-host", 1883, 60)
client.loop_forever()
```

### 5.3 MQTT 5.0 Message Properties

常用属性包括：

- `MessageExpiryInterval`：消息过期时间
- `ResponseTopic` / `CorrelationData`：请求-响应相关
- `UserProperty`：携带业务元数据（键值对）

## 6. 错误处理与排障建议

- 连接失败：
  - 检查 `HOST/PORT`、网络连通性、防火墙策略
  - 检查认证信息（用户名/密码、证书、权限控制）
- 订阅无消息：
  - 检查 Topic 是否一致（大小写、层级、前缀）
  - 检查权限控制是否允许订阅
  - 检查是否使用了过宽/过窄通配符

## 7. TLS/SSL 连接

生产环境建议启用 TLS 以保护传输安全。

```python
import ssl
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 8883

client = mqtt.Client(client_id="tls-client", protocol=mqtt.MQTTv5)

# 示例：使用企业 CA 证书校验服务端
client.tls_set(
    ca_certs="/etc/ssl/certs/flowmq-ca.pem",
    certfile=None,
    keyfile=None,
    tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
client.tls_insecure_set(False)

client.connect(HOST, PORT, 60)
client.subscribe("sensors/#", qos=1)
client.loop_forever()
```
