Skip to content

ESP32 + MicroPythonでの接続

本記事では、ESP32などのマイコン上でMicroPythonのumqttモジュールを使用し、MQTTサーバーとの接続、サブスクライブ、メッセージの送受信といった機能を実現する方法を主に紹介します。

MicroPythonは、Python3プログラミング言語の軽量かつ効率的な実装で、C言語で書かれており、マイコン上での動作に最適化されています。MicroPythonはPython標準ライブラリの一部を含み、さらにbluetoothやmachineなどの特定のライブラリを提供し、ESP32やRaspberry Pi Picoなどの異なるハードウェアプラットフォームで共通のAPIを使って基盤となるハードウェアを制御できます。

umqttは、MicroPython向けのシンプルなMQTTクライアントで、メッセージコールバックをサポートし、メッセージ受信に対してブロッキングとノンブロッキングの2つの実装を提供しています。ただし、現時点ではMQTT v3.1.1のみ対応しており、QoS 2は未対応です。

前提条件

1. MQTT ブローカーのデプロイ

EMQXが提供するFree Public MQTT Serverを直接利用できます。これはEMQXのMQTT IoTクラウドプラットフォームを基盤に構築されています。サーバーの接続情報は以下の通りです。

  • 接続アドレス: broker.emqx.io
  • TCPポート: 1883
  • TLS/SSLポート: 8883

または、ご自身でデプロイメントを作成することも可能です。作成完了後、デプロイメントの概要画面で接続に関する情報(接続アドレス、ポートなど)を確認できます。

デプロイメントが稼働していることを確認したら、クライアントが接続時に使用するユーザー名とパスワードを、Access Control -> 認証にて追加してください。これによりクライアントがEMQXにアクセス可能になります。

2. MicroPythonファームウェアのインストール

ESP32、ESP8266、Raspberry Pi PicoなどのハードウェアプラットフォームにMicroPythonをインストールする方法は本記事の主題ではありませんが、必要です。まだインストールしていない場合は、以下を参照してください。

ESP32へのMicroPythonインストール方法

ESP8266へのMicroPythonインストール方法

Raspberry Pi PicoへのMicroPythonインストール方法

接続

ネットワークへのアクセス

まず、デバイスをWiFiネットワークに接続し、外部のMQTTサーバーにアクセスできるようにします。

wifi.pyファイルを作成し、以下のコードを追加してください。"NAME OF YOUR WIFI NETWORK""PASSWORD OF YOUR WIFI NETWORK"はご自身のWiFiネットワーク名とパスワードに置き換えてください。

import network
import time

def connect():
	ssid = 'NAME OF YOUR WIFI NETWORK'
	password = 'PASSWORD OF YOUR WIFI NETWORK'
	wlan = network.WLAN(network.STA_IF)
	wlan.active(True)
	wlan.connect(ssid, password)
	while wlan.isconnected() == False:
		print('Waiting for connection...')
		time.sleep(1)
	print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))

main.pyファイルでこのモジュールをインポートし、wifi.connect()を実行することで、デバイス起動時に自動的にネットワークに接続できます。

import wifi

wifi.connect()

main.pyはご自身で作成してください。

接続パラメータ

次に、MQTTサーバーのアドレス、ポート、接続に使用するクライアントID、ユーザー名、パスワードなどの接続情報を初期化します。最後に、後でパブリッシュやサブスクライブに使用するトピックも指定します。

SERVER = "xxxx.ala.cn-hangzhou.emqxsl.cn"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id = random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"

接続先のMQTTサーバーで認証が有効でない場合は、ユーザー名とパスワードの設定は不要です。

TCPでの接続

先に初期化した接続パラメータを使って接続します。

python
from umqtt.simple import MQTTClient

def connect():
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

SSL/TLSでの接続

サーバー証明書の正当性を検証するために、クライアント側で信頼できるCA証明書を指定する必要があります。無料のパブリックMQTTサーバーを利用する場合は、こちらからCA証明書をダウンロードできます。独自にデプロイメントを作成した場合は、デプロイメント概要ページからCA証明書をダウンロードしてください。

MicroPythonは現時点でPEM形式の証明書のみサポートしているため、ダウンロードした証明書の形式変換は不要です。以下のコードは、読み込んだCA証明書の内容をcadataにセットし、このCA証明書を信頼することを示しています。cert_reqsssl.CERT_REQUIREDに設定することで、ハンドシェイク時にサーバーが証明書を送信することを要求します。

with open('emqxsl-ca.crt', 'rb') as f:
    cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata

EMQX PlatformのServerlessインスタンスをデプロイしている場合は、server_hostnameオプションで接続先アドレスをSNI(Server Name Indication)として設定する必要があります。ServerlessはSNIでテナントを識別するため、これを設定しないと接続が失敗します。MicroPythonはデフォルトでSNIを送信しません。

ssl_params["server_hostname"] = SERVER

完全な接続コードは以下の通りです。

python
def connect():
  	with open('emqxsl-ca.crt', 'rb') as f:
        cadata = f.read()
    ssl_params = dict()
    ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
    ssl_params["cadata"] = cadata
    ssl_params["server_hostname"] = SERVER
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

cert_reqscadataパラメータを使用するには、MicroPythonバージョン1.20.0以上が必要です。これらのパラメータを指定しない場合、クライアントはサーバーの正当性を検証できず、中間者攻撃のリスクが高まります。

コールバックの設定とトピックのサブスクライブ

次に、メッセージ受信時に呼び出されるon_messageコールバック関数を実装します。ここにメッセージ処理のコードを記述できます。例では、受信したトピックとメッセージ内容を単純に表示しています。

メッセージの取りこぼしを防ぐため、トピックをサブスクライブする前にクライアントのコールバックをon_messageに設定することを推奨します。

python
def on_message(topic, msg):
    print("Received '{payload}' from topic '{topic}'\n".format(
        payload = msg.decode(), topic = topic.decode()))

def subscribe(client):
    client.set_callback(on_message)
    client.subscribe(TOPIC)

循環的なパブリッシュと受信

この例では、同じクライアントを使ってメッセージの循環的なパブリッシュと受信を行います。以下のコードの主な役割は、新しいメッセージ内容を継続的に構築してパブリッシュし、その後wait_msg()を呼び出してMQTTサーバーから転送されるメッセージをブロックして待つことです。

メッセージが到着するとon_messageコールバックが呼び出されてメッセージ内容を表示します。コールバック終了後、クライアントは1秒待ってから次のループに入ります。

def loop_publish(client):
    msg_count = 0
    while True:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        result = client.publish(TOPIC, msg)
        print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
        client.wait_msg()
        msg_count += 1
        time.sleep(1)

メイン関数

メイン関数では、まずwifi.connect()を呼び出してデバイスをネットワークに接続し、その後に先ほど実装した接続、サブスクライブ、循環パブリッシュの各関数を順に呼び出します。

def run():
    wifi.connect()
    client = connect()
    subscribe(client)
    loop_publish(client)

if __name__ == "__main__":
    run()

全コード

WiFi接続用コード:

python
import network
import time

def connect():
	ssid = 'NAME OF YOUR WIFI NETWORK'
	password = 'PASSWORD OF YOUR WIFI NETWORK'
	wlan = network.WLAN(network.STA_IF)
	wlan.active(True)
	wlan.connect(ssid, password)
	while wlan.isconnected() == False:
		print('Waiting for connection...')
		time.sleep(1)
	print('Connected on {ip}'.format(ip=wlan.ifconfig()[0]))

MQTTクライアント用コード:

python
import json
import random
import ssl
import time
import wifi

from umqtt.simple import MQTTClient

SERVER = "broker.emqx.io"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id=random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"

def on_message(topic, msg):
    print("Received '{payload}' from topic '{topic}'\n".format(
        payload = msg.decode(), topic = topic.decode()))

def connect():
    with open('broker.emqx.io-ca.crt', 'rb') as f:
        cadata = f.read()
    ssl_params = dict()
    ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
    ssl_params["cadata"] = cadata
    ssl_params["server_hostname"] = SERVER
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

def subscribe(client):
    client.set_callback(on_message)
    client.subscribe(TOPIC)

def loop_publish(client):
    msg_count = 0
    while True:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        result = client.publish(TOPIC, msg)
        print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
        client.wait_msg()
        msg_count += 1
        time.sleep(1)

def run():
    wifi.connect()
    client = connect()
    subscribe(client)
    loop_publish(client)

if __name__ == "__main__":
    run()

動作確認

コードを実行すると、コンソールには以下のように出力されます。

Connected on 192.168.0.32
Connected to MQTT Broker "broker.emqx.io"
Send '{"msg": 0}' to topic 'raspberry/mqtt'
Received '{"msg": 0}' from topic 'raspberry/mqtt'

Send '{"msg": 1}' to topic 'raspberry/mqtt'
Received '{"msg": 1}' from topic 'raspberry/mqtt'

Send '{"msg": 2}' to topic 'raspberry/mqtt'
Received '{"msg": 2}' from topic 'raspberry/mqtt'

さらに

これで、MicroPythonのumqttモジュールを使ってESP32からEMQX Platformに正常に接続できました。サンプルコードはこちらからダウンロード可能です。また、他言語のデモ例もGitHubで多数公開されています。