Skip to content

MQTT 协议

本文以 Python MQTT 客户端(paho-mqtt)为例,介绍如何使用 MQTT 协议接入 FlowMQ 。

TIP

下文所有示例中的 your-flowmq-host 均为占位符,请根据实际情况进行替换。

1. 连接 FlowMQ

1.1 安装依赖

bash
python3 -m pip install paho-mqtt

1.2 建立连接(MQTT 5.0)

python
import ssl
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883  # TLS 通常使用 8883,见本文 TLS 小节
CLIENT_ID = "demo-client"

def on_connect(client, userdata, flags, reason_code, properties=None):
    print("connected, reason_code=", reason_code)
    # 连接成功后订阅
    client.subscribe("sensors/+/telemetry", qos=1)

def on_message(client, userdata, msg):
    print("recv", msg.topic, msg.qos, msg.payload)

client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message

# 如需用户名密码认证(示例)
# client.username_pw_set("username", "password")

client.connect(HOST, PORT, keepalive=60)
client.loop_forever()

2. 发布消息

2.1 基础发布

python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="pub-1", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

topic = "sensors/device-001/telemetry"
payload = b"{\"temp\": 23.5, \"ts\": 1700000000}"

info = client.publish(topic, payload=payload, qos=1, retain=False)
info.wait_for_publish()
print("published", info.mid)

client.disconnect()

2.2 带 MQTT 5.0 属性的发布(Properties)

以下示例展示消息过期间隔、响应主题与用户属性等典型用法。

python
from paho.mqtt import client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

HOST = "your-flowmq-host"
PORT = 1883

client = mqtt.Client(client_id="pub-2", protocol=mqtt.MQTTv5)
client.connect(HOST, PORT, 60)

props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 30          # 30 秒后过期
props.ResponseTopic = "reply/device-001"  # 响应主题
props.UserProperty = [("source", "edge"), ("format", "json")]

client.publish(
    "cmd/device-001",
    payload=b"reboot",
    qos=1,
    retain=False,
    properties=props,
)

client.disconnect()

3. 订阅

3.1 基础订阅

python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_message(client, userdata, msg):
    print(f"[{msg.topic}] {msg.payload.decode(errors='ignore')}")

client = mqtt.Client(client_id="sub-1", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)

client.subscribe("alerts/#", qos=1)
client.loop_forever()

3.2 通配符订阅(+#

  • +:单层通配符,例如 sensors/+/telemetry
  • #:多层通配符(必须位于末尾),例如 sensors/#

3.3 订阅端读取 MQTT 5.0 属性

python
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 1883

def on_message(client, userdata, msg):
    props = getattr(msg, "properties", None)
    user_props = None
    if props is not None:
        user_props = getattr(props, "UserProperty", None)
    print("topic=", msg.topic, "payload=", msg.payload)
    print("user_properties=", user_props)

client = mqtt.Client(client_id="sub-props", protocol=mqtt.MQTTv5)
client.on_message = on_message
client.connect(HOST, PORT, 60)
client.subscribe("cmd/#", qos=1)
client.loop_forever()

4. QoS 语义与选型

  • QoS 0(At most once / 至多一次)
    • 低延迟、低开销
    • 可能丢消息;适合高频遥测、允许采样丢失的场景
  • QoS 1(At least once / 至少一次)
    • 可能重复投递;应用侧需要具备幂等处理能力
    • 适合大多数业务消息与命令下发
  • QoS 2(Exactly once / 恰好一次)
    • 协议交互更复杂,吞吐与延迟成本更高
    • 适合强一致、不可重复的关键交易类消息(谨慎使用)

5. Retained、LWT 与消息属性

5.1 Retained(保留消息)

  • Retained 消息会作为该 Topic 的"最新状态"保存。
  • 新订阅者订阅该 Topic 后会立即收到最近一次 retained 消息。

适用:设备在线状态、配置快照、最后一次测量值等。

5.2 LWT(遗嘱消息)

  • 当客户端非正常断开时,Broker 会发布预先设置的遗嘱消息。
  • 常用于"设备离线通知"。

paho-mqtt 示例:

python
from paho.mqtt import client as mqtt

client = mqtt.Client(client_id="with-lwt", protocol=mqtt.MQTTv5)
client.will_set("presence/device-001", payload=b"offline", qos=1, retain=True)
client.connect("your-flowmq-host", 1883, 60)
client.loop_forever()

5.3 MQTT 5.0 Message Properties

常用属性包括:

  • MessageExpiryInterval:消息过期时间
  • ResponseTopic / CorrelationData:请求-响应相关
  • UserProperty:携带业务元数据(键值对)

6. 错误处理与排障建议

  • 连接失败:
    • 检查 HOST/PORT、网络连通性、防火墙策略
    • 检查认证信息(用户名/密码、证书、权限控制)
  • 订阅无消息:
    • 检查 Topic 是否一致(大小写、层级、前缀)
    • 检查权限控制是否允许订阅
    • 检查是否使用了过宽/过窄通配符

7. TLS/SSL 连接

生产环境建议启用 TLS 以保护传输安全。

python
import ssl
from paho.mqtt import client as mqtt

HOST = "your-flowmq-host"
PORT = 8883

client = mqtt.Client(client_id="tls-client", protocol=mqtt.MQTTv5)

# 示例:使用企业 CA 证书校验服务端
client.tls_set(
    ca_certs="/etc/ssl/certs/flowmq-ca.pem",
    certfile=None,
    keyfile=None,
    tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
client.tls_insecure_set(False)

client.connect(HOST, PORT, 60)
client.subscribe("sensors/#", qos=1)
client.loop_forever()