ESP32 + MicroPython で接続する
この記事では、ESP32 などのマイコンで MicroPython のモジュール umqtt を使い、MQTT サーバーへの接続、サブスクライブ、メッセージの送受信などの機能を実現する方法を主に紹介します。
MicroPython は、Python3 言語を C 言語で実装した軽量かつ効率的な環境で、マイコン上での動作に最適化されています。MicroPython にはコアな Python 標準ライブラリの一部が含まれており、さらに ESP32 や Raspberry Pi Pico などのハードウェアプラットフォームで基盤となるハードウェアを制御するための bluetooth や machine といった専用ライブラリも提供されています。
umqtt は MicroPython 向けのシンプルな MQTT クライアントで、メッセージのコールバックをサポートし、メッセージ受信に対してブロッキングとノンブロッキングの2つの実装を提供しています。ただし、現時点では MQTT v3.1.1 のみ対応しており、QoS 2 は未対応です。
前提条件
接続の前に、ブローカーとクライアントの準備が必要です。以下の前提条件を完了していることを確認してください。
- MQTT ブローカーのデプロイ
- MicroPython ファームウェアのインストール
MQTT ブローカーのデプロイ
アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。
サーバレスデプロイメント
EMQX Cloudコンソールでサーバレスデプロイメントを作成します。
デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。
ブローカーアドレス
ポート番号(サーバレスではTLSポートのみ対応)
サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート
8883を使用してください。デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細はサーバレスポートガイドを参照してください。
Dedicated Flex または BYOC デプロイメント
EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。
作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。
ブローカーアドレス
MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)
デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。
MicroPython ファームウェアのインストール
ESP32、ESP8266、Raspberry Pi Pico などのハードウェアプラットフォームに MicroPython をインストールする方法は本記事の主題ではありませんが、必須です。まだインストールしていない場合は、以下を参考にしてください。
ESP32 に MicroPython をインストールする方法
ESP8266 に MicroPython をインストールする方法
Raspberry Pi Pico に MicroPython をインストールする方法
接続
ネットワークへのアクセス
まず、デバイスを WiFi ネットワークに接続し、外部の MQTT サーバーへ接続できるようにします。
wifi.py ファイルを作成し、以下のコードを追加してください。NAME OF YOUR WIFI NETWORK と PASSWORD OF YOUR WIFI NETWORK はご自身の WiFi ネットワーク名とパスワードに置き換えてください。
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))このモジュールを main.py ファイルでインポートし、wifi.connect() を実行すれば、デバイスは起動時に自動的にネットワークに接続します。
import wifi
wifi.connect()
main.pyはご自身で作成してください。
接続パラメータ
次に、MQTT サーバーのアドレス、ポート、接続に使うクライアントID、ユーザー名、パスワードなどの接続情報を初期化します。最後に、パブリッシュやサブスクライブで使用するトピックも指定します。
SERVER = "xxxx.ala.cn-hangzhou.emqxsl.cn"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id = random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"接続先の MQTT サーバーで認証が無効の場合は、ユーザー名とパスワードの設定は不要です。
TCP で接続する
先ほど初期化した接続パラメータを使って接続します。
from umqtt.simple import MQTTClient
def connect():
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return clientSSL/TLS で接続する
サーバー証明書の正当性を検証するために、クライアント側で信頼できる CA 証明書を指定する必要があります。無料のパブリック MQTT サーバーを利用する場合は、こちら から CA 証明書をダウンロードできます。独自にデプロイした場合は、デプロイ概要ページから CA 証明書をダウンロードしてください。
MicroPython は現在 PEM 形式の証明書のみ対応しているため、ダウンロードした証明書の形式変換は不要です。以下のコードは、CA 証明書ファイルの内容を読み込み cadata にセットし、この CA 証明書を信頼することを示しています。cert_reqs を ssl.CERT_REQUIRED に設定することで、ハンドシェイク時にサーバーが証明書を送信することを要求します。
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadataEMQX Cloud のサーバレスインスタンスを利用する場合は、server_hostname オプションで SNI(Server Name Indication)を接続先アドレスに設定する必要があります。これはサーバレスが SNI によってテナントを識別するため必須の設定です。MicroPython はデフォルトで SNI を送信しないため、設定しないと接続に失敗します。
ssl_params["server_hostname"] = SERVER完全な接続コードは以下の通りです。
def connect():
with open('emqxsl-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
cert_reqsとcadataパラメータを使用するには、MicroPython バージョン 1.20.0 以上が必要です。これらのパラメータを指定しない場合、クライアントはサーバーの正当性を検証できず、中間者攻撃のリスクがあります。
コールバックを設定してトピックをサブスクライブする
次に、メッセージ到着時に呼ばれる on_message コールバック関数を実装します。ここにメッセージ処理のコードを記述できます。この例では、トピックとメッセージの内容を単純に表示しています。
メッセージの取りこぼしを防ぐため、トピックをサブスクライブする前にクライアントのコールバックを on_message に設定することを推奨します。
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)循環的にパブリッシュと受信を行う
この例では、同じクライアントを使ってメッセージの循環的なパブリッシュと受信を行います。以下のコードの主な役割は、新しいメッセージ内容を継続的に構築してパブリッシュし、その後 wait_msg() を呼び出して MQTT サーバーから転送されるメッセージをブロックして待つことです。
メッセージが届くと on_message コールバックが呼ばれてメッセージ内容を表示し、コールバック終了後は 1 秒待ってから次のループに入ります。
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)メイン関数
メイン関数では、まず wifi.connect() を呼んでデバイスをネットワークに接続し、その後に接続、サブスクライブ、循環パブリッシュの各関数を順に呼び出します。
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()全コード
WiFi 接続コード:
import network
import time
def connect():
ssid = 'NAME OF YOUR WIFI NETWORK'
password = 'PASSWORD OF YOUR WIFI NETWORK'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
print('Connected on {ip}'.format(ip=wlan.ifconfig()[0]))MQTT クライアントコード:
import json
import random
import ssl
import time
import wifi
from umqtt.simple import MQTTClient
SERVER = "broker.emqx.io"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id=random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"
def on_message(topic, msg):
print("Received '{payload}' from topic '{topic}'\n".format(
payload = msg.decode(), topic = topic.decode()))
def connect():
with open('broker.emqx.io-ca.crt', 'rb') as f:
cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata
ssl_params["server_hostname"] = SERVER
client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
client.connect()
print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
return client
def subscribe(client):
client.set_callback(on_message)
client.subscribe(TOPIC)
def loop_publish(client):
msg_count = 0
while True:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
result = client.publish(TOPIC, msg)
print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
client.wait_msg()
msg_count += 1
time.sleep(1)
def run():
wifi.connect()
client = connect()
subscribe(client)
loop_publish(client)
if __name__ == "__main__":
run()動作確認
コードを実行すると、コンソールには以下のように表示されます。
Connected on 192.168.0.32
Connected to MQTT Broker "broker.emqx.io"
Send '{"msg": 0}' to topic 'raspberry/mqtt'
Received '{"msg": 0}' from topic 'raspberry/mqtt'
Send '{"msg": 1}' to topic 'raspberry/mqtt'
Received '{"msg": 1}' from topic 'raspberry/mqtt'
Send '{"msg": 2}' to topic 'raspberry/mqtt'
Received '{"msg": 2}' from topic 'raspberry/mqtt'さらに
これで MicroPython の umqtt モジュールを使い、ESP32 から EMQX Cloud に正常に接続できました。サンプルコードは こちら からダウンロード可能です。また、他の言語でのデモ例も GitHub に多数ありますのでご参照ください。