ESP32 + MicroPythonでの接続
本記事では、ESP32などのマイコン上でMicroPythonのumqtt
モジュールを使用し、MQTTサーバーとの接続、サブスクライブ、メッセージの送受信といった機能を実現する方法を主に紹介します。
MicroPythonは、Python3プログラミング言語の軽量かつ効率的な実装で、C言語で書かれており、マイコン上での動作に最適化されています。MicroPythonはPython標準ライブラリの一部を含み、さらにbluetoothやmachineなどの特定のライブラリを提供し、ESP32やRaspberry Pi Picoなどの異なるハードウェアプラットフォームで共通のAPIを使って基盤となるハードウェアを制御できます。
umqttは、MicroPython向けのシンプルなMQTTクライアントで、メッセージコールバックをサポートし、メッセージ受信に対してブロッキングとノンブロッキングの2つの実装を提供しています。ただし、現時点ではMQTT v3.1.1のみ対応しており、QoS 2は未対応です。
前提条件
1. MQTT ブローカーのデプロイ
EMQXが提供するFree Public MQTT Serverを直接利用できます。これはEMQXのMQTT IoTクラウドプラットフォームを基盤に構築されています。サーバーの接続情報は以下の通りです。
- 接続アドレス: broker.emqx.io
- TCPポート: 1883
- TLS/SSLポート: 8883
または、ご自身でデプロイメントを作成することも可能です。作成完了後、デプロイメントの概要画面で接続に関する情報(接続アドレス、ポートなど)を確認できます。
デプロイメントが稼働していることを確認したら、クライアントが接続時に使用するユーザー名とパスワードを、Access Control -> 認証にて追加してください。これによりクライアントがEMQXにアクセス可能になります。
2. MicroPythonファームウェアのインストール
ESP32、ESP8266、Raspberry Pi PicoなどのハードウェアプラットフォームにMicroPythonをインストールする方法は本記事の主題ではありませんが、必要です。まだインストールしていない場合は、以下を参照してください。
Raspberry Pi PicoへのMicroPythonインストール方法
接続
ネットワークへのアクセス
まず、デバイスをWiFiネットワークに接続し、外部のMQTTサーバーにアクセスできるようにします。
wifi.py
ファイルを作成し、以下のコードを追加してください。"NAME OF YOUR WIFI NETWORK"
と"PASSWORD OF YOUR WIFI NETWORK"
はご自身のWiFiネットワーク名とパスワードに置き換えてください。
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))
main.py
ファイルでこのモジュールをインポートし、wifi.connect()
を実行することで、デバイス起動時に自動的にネットワークに接続できます。
import wifi
wifi.connect()
main.py
はご自身で作成してください。
接続パラメータ
次に、MQTTサーバーのアドレス、ポート、接続に使用するクライアントID、ユーザー名、パスワードなどの接続情報を初期化します。最後に、後でパブリッシュやサブスクライブに使用するトピックも指定します。
SERVER = "xxxx.ala.cn-hangzhou.emqxsl.cn"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id = random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"
接続先のMQTTサーバーで認証が有効でない場合は、ユーザー名とパスワードの設定は不要です。
TCPでの接続
先に初期化した接続パラメータを使って接続します。
from umqtt.simple import MQTTClient
def connect():
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
SSL/TLSでの接続
サーバー証明書の正当性を検証するために、クライアント側で信頼できるCA証明書を指定する必要があります。無料のパブリックMQTTサーバーを利用する場合は、こちらからCA証明書をダウンロードできます。独自にデプロイメントを作成した場合は、デプロイメント概要ページからCA証明書をダウンロードしてください。
MicroPythonは現時点でPEM形式の証明書のみサポートしているため、ダウンロードした証明書の形式変換は不要です。以下のコードは、読み込んだCA証明書の内容をcadata
にセットし、このCA証明書を信頼することを示しています。cert_reqs
をssl.CERT_REQUIRED
に設定することで、ハンドシェイク時にサーバーが証明書を送信することを要求します。
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
EMQX PlatformのServerlessインスタンスをデプロイしている場合は、server_hostname
オプションで接続先アドレスをSNI(Server Name Indication)として設定する必要があります。ServerlessはSNIでテナントを識別するため、これを設定しないと接続が失敗します。MicroPythonはデフォルトでSNIを送信しません。
ssl_params["server_hostname"] = SERVER
完全な接続コードは以下の通りです。
def connect():
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
cert_reqs
とcadata
パラメータを使用するには、MicroPythonバージョン1.20.0以上が必要です。これらのパラメータを指定しない場合、クライアントはサーバーの正当性を検証できず、中間者攻撃のリスクが高まります。
コールバックの設定とトピックのサブスクライブ
次に、メッセージ受信時に呼び出されるon_message
コールバック関数を実装します。ここにメッセージ処理のコードを記述できます。例では、受信したトピックとメッセージ内容を単純に表示しています。
メッセージの取りこぼしを防ぐため、トピックをサブスクライブする前にクライアントのコールバックをon_message
に設定することを推奨します。
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)
循環的なパブリッシュと受信
この例では、同じクライアントを使ってメッセージの循環的なパブリッシュと受信を行います。以下のコードの主な役割は、新しいメッセージ内容を継続的に構築してパブリッシュし、その後wait_msg()
を呼び出してMQTTサーバーから転送されるメッセージをブロックして待つことです。
メッセージが到着するとon_message
コールバックが呼び出されてメッセージ内容を表示します。コールバック終了後、クライアントは1秒待ってから次のループに入ります。
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)
メイン関数
メイン関数では、まずwifi.connect()
を呼び出してデバイスをネットワークに接続し、その後に先ほど実装した接続、サブスクライブ、循環パブリッシュの各関数を順に呼び出します。
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()
全コード
WiFi接続用コード:
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip=wlan.ifconfig()[0]))
MQTTクライアント用コード:
import json
import random
import ssl
import time
import wifi
from umqtt.simple import MQTTClient
SERVER = "broker.emqx.io"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id=random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def connect():
with open('broker.emqx.io-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()
動作確認
コードを実行すると、コンソールには以下のように出力されます。
Connected on 192.168.0.32
Connected to MQTT Broker "broker.emqx.io"
Send '{"msg": 0}' to topic 'raspberry/mqtt'
Received '{"msg": 0}' from topic 'raspberry/mqtt'
Send '{"msg": 1}' to topic 'raspberry/mqtt'
Received '{"msg": 1}' from topic 'raspberry/mqtt'
Send '{"msg": 2}' to topic 'raspberry/mqtt'
Received '{"msg": 2}' from topic 'raspberry/mqtt'
さらに
これで、MicroPythonのumqttモジュールを使ってESP32からEMQX Platformに正常に接続できました。サンプルコードはこちらからダウンロード可能です。また、他言語のデモ例もGitHubで多数公開されています。