# 使用 ESP32 + MicroPython 连接
本文主要介绍如何在 ESP32 等微控制器中通过 MicroPython 的 umqtt 模块,实现与 MQTT 服务器的连接、订阅、收发消息等功能。
MicroPython (opens new window) 是 Python3 编程语言的精简高效实现,用 C 编写,经过优化以在微控制器上运行。MicroPython 包含了精选的核心 Python 标准库,也提供了 bluetooth、machine 这类特定的库,以便在 ESP32、Raspberry Pi Pico 这些不同的硬件平台中使用通用的 API 控制硬件底层。
umqtt (opens new window) 是一个用于 MicroPython 的简单 MQTT 客户端,支持消息回调,并且为接收消息提供了阻塞和非阻塞的两种实现。但目前它仅支持 MQTT v3.1.1,并且尚不支持 QoS 2。
# 前置准备
# 1. MQTT 服务器部署
您可以直接使用 EMQX 提供的 免费公共 MQTT 服务器 (opens new window),该服务基于 EMQX 的 MQTT 物联网云平台 (opens new window) 创建。服务器接入信息如下:
- 连接地址: broker.emqx.io
- TCP 端口: 1883
- TLS/SSL 端口: 8883
您也可以自行 创建部署,创建完成后您可以在部署概览中查看连接相关的信息,包括连接地址、端口等。
在确认部署处于运行状态后,您还需要在部署控制台 认证鉴权
> 认证
中添加客户端将在连接时使用的用户名和密码,以允许该客户端接入 EMQX。
# 2. 安装 MicroPython 固件
如何在 ESP32、ESP8266、Raspberry Pi Pico 这些硬件平台上安装 MicroPython 不是本文的重点,但它是必要的。如果您还没有安装 MicroPython,那么可以先参考这里:
如何在 ESP32 中安装 MicroPython? (opens new window)
如何在 ESP8266 中安装 MicroPython? (opens new window)
如何在 Raspberry Pi Pico 中安装 MicroPython? (opens new window)
# 连接
# 接入网络
首先,您需要让设备接入 WiFi 网络,这样才能连接到外部的 MQTT Server。
您可以创建一个 wifi.py
文件然后添加以下代码,然后将 "NAME OF YOUR WIFI NETWORK" 和 "PASSWORD OF YOUR WIFI NETWORK" 替换成自己的 WiFi 网络名称和密码:
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))
2
3
4
5
6
7
8
9
10
11
12
13
您可以在 main.py
文件中导入这个模块并运行 wifi.connect()
,以便让设备在每次启动后自动连接到网络:
import wifi
wifi.connect()
2
3
main.py
需要自行创建。
# 连接参数
接下来,您需要初始化一些连接信息,分别是 MQTT 服务器地址、端口以及连接时使用的 Client ID、用户名和密码。最后,还有一个您稍后发布和订阅将要使用的主题:
SERVER = "xxxx.ala.cn-hangzhou.emqxsl.cn"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id = random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"
2
3
4
5
6
如果连接的 MQTT Server 没有启用认证,那么您可以无需设置用户名和密码。
# 使用 TCP 连接
使用前面初始化的连接参数发起连接:
from umqtt.simple import MQTTClient
def connect():
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
2
3
4
5
6
7
# 使用 SSL/TLS 连接
为了校验服务端证书是否合法,您需要为客户端指定信任的 CA 证书,如果您使用的是免费公共 MQTT 服务器,CA 证书可以从 此处 (opens new window) 下载。如果您选择创建自己的部署,那么 CA 证书需要在部署概览页面中下载。
MicroPython 目前仅支持 PEM 格式的证书,所以您不需要对下载得到的证书进行格式转换。以下代码表示,您将 cadata
设置为读取到的 CA 证书文件内容,表示信任该 CA 证书,将 cert_reqs
设置为 ssl.CERT_REQUIRED
,表示客户端将要求服务端在握手时发送证书。
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
2
3
4
5
如果您部署的是 EMQX Cloud Serverless,那么还需要通过 server_hostname
选项将 SNI 设置为您的连接地址,这一步是非常必要的,因为 Serverless 需要根据 SNI 来区分租户,而 MicroPython 默认不会发送 SNI,这将导致您连接失败:
ssl_params["server_hostname"] = SERVER
完整的连接代码如下:
def connect():
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
2
3
4
5
6
7
8
9
10
11
如果想要使用
cert_reqs
、cadata
参数, 您必须使用 MicroPython 1.20.0 及以上版本。而如果未指定这些参数,客户端将不具备验证服务端身份的能力,这将引入一定的中间人攻击的风险。
# 设置回调并订阅主题
接下来,您还需要实现 on_message
回调函数,它将在消息到达时被调用,您可以在这里实现消息的处理代码。在本示例中我们仅仅打印了消息的主题和内容。
您最好在订阅主题前将客户端的回调设置为 on_message
函数,以免错过消息:
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)
2
3
4
5
6
7
# 循环发布和接收
在本示例中,我们使用同一个客户端来进行循环的消息发布和接收。以下代码的主要作用就是不断构造新的消息内容并发布,然后调用 wait_msg()
阻塞地等待从 MQTT 服务端转发的消息。一旦消息到达,就会触发 on_message
回调打印消息内容,回调完成后客户端将等待一秒然后进入下一次循环:
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)
2
3
4
5
6
7
8
9
10
11
12
# 主函数
在主函数中,我们首先调用 wifi.connect()
让设备接入网络,然后依次调用前面实现的连接、订阅和循环发布函数:
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()
2
3
4
5
6
7
8
# 完整代码
WiFi 连接代码:
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip=wlan.ifconfig()[0]))
2
3
4
5
6
7
8
9
10
11
12
13
MQTT 客户端代码:
import json
import random
import ssl
import time
import wifi
from umqtt.simple import MQTTClient
SERVER = "broker.emqx.io"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id=random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def connect():
with open('broker.emqx.io-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 测试验证
运行代码,控制台输出如下:
Waiting for connection...
Waiting for connection...
Connected on 192.168.0.145
Connected to MQTT Broker "xxxx.ala.cn-hangzhou.emqxsl.cn"
Send '{"msg": 0}' to topic 'raspberry/mqtt'
Received '{"msg": 0}' from topic 'raspberry/mqtt'
Send '{"msg": 1}' to topic 'raspberry/mqtt'
Received '{"msg": 1}' from topic 'raspberry/mqtt'
Send '{"msg": 2}' to topic 'raspberry/mqtt'
Received '{"msg": 2}' from topic 'raspberry/mqtt'
2
3
4
5
6
7
8
9
10
11
12
# 更多内容
现在,您已经可以在 ESP32 中通过 MicroPython 的 umqtt
模块成功连接到 EMQX Cloud。您可以在 这里 (opens new window) 下载完整的示例代码。同时也可以在 GitHub (opens new window) 上找到更多其他语言的 Demo 示例。