MQTT 协议
本文以 Python MQTT 客户端(paho-mqtt)为例,介绍如何使用 MQTT 协议接入 FlowMQ 。
TIP
下文所有示例中的 your-flowmq-host 均为占位符,请根据实际情况进行替换。
1. 连接 FlowMQ
1.1 安装依赖
bash
python3 -m pip install paho-mqtt1.2 建立连接(MQTT 5.0)
python
import ssl
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883 # TLS 通常使用 8883,见本文 TLS 小节
CLIENT_ID = "demo-client"
def on_connect(client, userdata, flags, reason_code, properties=None):
print("connected, reason_code=", reason_code)
# 连接成功后订阅
client.subscribe("sensors/+/telemetry", qos=1)
def on_message(client, userdata, msg):
print("recv", msg.topic, msg.qos, msg.payload)
client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
# 如需用户名密码认证(示例)
# client.username_pw_set("username", "password")
client.connect(HOST, PORT, keepalive=60)
client.loop_forever()2. 发布消息
2.1 基础发布
python
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883
client = mqtt.Client(client_id="pub-1", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)
topic = "sensors/device-001/telemetry"
payload = b"{\"temp\": 23.5, \"ts\": 1700000000}"
info = client.publish(topic, payload=payload, qos=1, retain=False)
info.wait_for_publish()
print("published", info.mid)
client.disconnect()2.2 带 MQTT 5.0 属性的发布(Properties)
以下示例展示消息过期间隔、响应主题与用户属性等典型用法。
python
from paho.mqtt import client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
HOST = "your-flowmq-host"
PORT = 1883
client = mqtt.Client(client_id="pub-2", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)
props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 30 # 30 秒后过期
props.ResponseTopic = "reply/device-001" # 响应主题
props.UserProperty = [("source", "edge"), ("format", "json")]
client.publish(
"cmd/device-001",
payload=b"reboot",
qos=1,
retain=False,
properties=props,
)
client.disconnect()3. 订阅
3.1 基础订阅
python
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883
def on_message(client, userdata, msg):
print(f"[{msg.topic}] {msg.payload.decode(errors='ignore')}")
client = mqtt.Client(client_id="sub-1", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.subscribe("alerts/#", qos=1)
client.loop_forever()3.2 通配符订阅(+ 与 #)
+:单层通配符,例如sensors/+/telemetry#:多层通配符(必须位于末尾),例如sensors/#
3.3 订阅端读取 MQTT 5.0 属性
python
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 1883
def on_message(client, userdata, msg):
props = getattr(msg, "properties", None)
user_props = None
if props is not None:
user_props = getattr(props, "UserProperty", None)
print("topic=", msg.topic, "payload=", msg.payload)
print("user_properties=", user_props)
client = mqtt.Client(client_id="sub-props", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.subscribe("cmd/#", qos=1)
client.loop_forever()4. QoS 语义与选型
- QoS 0(At most once / 至多一次)
- 低延迟、低开销
- 可能丢消息;适合高频遥测、允许采样丢失的场景
- QoS 1(At least once / 至少一次)
- 可能重复投递;应用侧需要具备幂等处理能力
- 适合大多数业务消息与命令下发
- QoS 2(Exactly once / 恰好一次)
- 协议交互更复杂,吞吐与延迟成本更高
- 适合强一致、不可重复的关键交易类消息(谨慎使用)
5. Retained、LWT 与消息属性
5.1 Retained(保留消息)
- Retained 消息会作为该 Topic 的"最新状态"保存。
- 新订阅者订阅该 Topic 后会立即收到最近一次 retained 消息。
适用:设备在线状态、配置快照、最后一次测量值等。
5.2 LWT(遗嘱消息)
- 当客户端非正常断开时,Broker 会发布预先设置的遗嘱消息。
- 常用于"设备离线通知"。
paho-mqtt 示例:
python
from paho.mqtt import client as mqtt
client = mqtt.Client(client_id="with-lwt", protocol=mqtt.MQTTv5)
client.will_set("presence/device-001", payload=b"offline", qos=1, retain=True)
client.connect("your-flowmq-host", 1883, 60)
client.loop_forever()5.3 MQTT 5.0 Message Properties
常用属性包括:
MessageExpiryInterval:消息过期时间ResponseTopic/CorrelationData:请求-响应相关UserProperty:携带业务元数据(键值对)
6. 错误处理与排障建议
- 连接失败:
- 检查
HOST/PORT、网络连通性、防火墙策略 - 检查认证信息(用户名/密码、证书、权限控制)
- 检查
- 订阅无消息:
- 检查 Topic 是否一致(大小写、层级、前缀)
- 检查权限控制是否允许订阅
- 检查是否使用了过宽/过窄通配符
7. TLS/SSL 连接
生产环境建议启用 TLS 以保护传输安全。
python
import ssl
from paho.mqtt import client as mqtt
HOST = "your-flowmq-host"
PORT = 8883
client = mqtt.Client(client_id="tls-client", protocol=mqtt.MQTTv5)
# 示例:使用企业 CA 证书校验服务端
client.tls_set(
ca_certs="/etc/ssl/certs/flowmq-ca.pem",
certfile=None,
keyfile=None,
tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
client.tls_insecure_set(False)
client.connect(HOST, PORT, 60)
client.subscribe("sensors/#", qos=1)
client.loop_forever()