Skip to content

ESP32 + MicroPython で接続する

この記事では、ESP32 などのマイコンで MicroPython のモジュール umqtt を使い、MQTT サーバーへの接続、サブスクライブ、メッセージの送受信などの機能を実現する方法を主に紹介します。

MicroPython は、Python3 言語を C 言語で実装した軽量かつ効率的な環境で、マイコン上での動作に最適化されています。MicroPython にはコアな Python 標準ライブラリの一部が含まれており、さらに ESP32 や Raspberry Pi Pico などのハードウェアプラットフォームで基盤となるハードウェアを制御するための bluetooth や machine といった専用ライブラリも提供されています。

umqtt は MicroPython 向けのシンプルな MQTT クライアントで、メッセージのコールバックをサポートし、メッセージ受信に対してブロッキングとノンブロッキングの2つの実装を提供しています。ただし、現時点では MQTT v3.1.1 のみ対応しており、QoS 2 は未対応です。

前提条件

接続の前に、ブローカーとクライアントの準備が必要です。以下の前提条件を完了していることを確認してください。

  • MQTT ブローカーのデプロイ
  • MicroPython ファームウェアのインストール

MQTT ブローカーのデプロイ

アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。

サーバレスデプロイメント

  1. EMQX Cloudコンソールでサーバレスデプロイメントを作成します。

  2. デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。

    • ブローカーアドレス

    • ポート番号(サーバレスではTLSポートのみ対応)

  3. サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート8883を使用してください。

  4. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細はサーバレスポートガイドを参照してください。

Dedicated Flex または BYOC デプロイメント

  1. EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。

  2. 作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。

    • ブローカーアドレス

    • MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)

  3. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。

MicroPython ファームウェアのインストール

ESP32、ESP8266、Raspberry Pi Pico などのハードウェアプラットフォームに MicroPython をインストールする方法は本記事の主題ではありませんが、必須です。まだインストールしていない場合は、以下を参考にしてください。

ESP32 に MicroPython をインストールする方法

ESP8266 に MicroPython をインストールする方法

Raspberry Pi Pico に MicroPython をインストールする方法

接続

ネットワークへのアクセス

まず、デバイスを WiFi ネットワークに接続し、外部の MQTT サーバーへ接続できるようにします。

wifi.py ファイルを作成し、以下のコードを追加してください。NAME OF YOUR WIFI NETWORKPASSWORD OF YOUR WIFI NETWORK はご自身の WiFi ネットワーク名とパスワードに置き換えてください。

import network
import time

def connect():
	ssid = 'NAME OF YOUR WIFI NETWORK'
	password = 'PASSWORD OF YOUR WIFI NETWORK'
	wlan = network.WLAN(network.STA_IF)
	wlan.active(True)
	wlan.connect(ssid, password)
	while wlan.isconnected() == False:
		print('Waiting for connection...')
		time.sleep(1)
	print('Connected on {ip}'.format(ip = wlan.ifconfig()[0]))

このモジュールを main.py ファイルでインポートし、wifi.connect() を実行すれば、デバイスは起動時に自動的にネットワークに接続します。

import wifi

wifi.connect()

main.py はご自身で作成してください。

接続パラメータ

次に、MQTT サーバーのアドレス、ポート、接続に使うクライアントID、ユーザー名、パスワードなどの接続情報を初期化します。最後に、パブリッシュやサブスクライブで使用するトピックも指定します。

SERVER = "xxxx.ala.cn-hangzhou.emqxsl.cn"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id = random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"

接続先の MQTT サーバーで認証が無効の場合は、ユーザー名とパスワードの設定は不要です。

TCP で接続する

先ほど初期化した接続パラメータを使って接続します。

python
from umqtt.simple import MQTTClient

def connect():
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

SSL/TLS で接続する

サーバー証明書の正当性を検証するために、クライアント側で信頼できる CA 証明書を指定する必要があります。無料のパブリック MQTT サーバーを利用する場合は、こちら から CA 証明書をダウンロードできます。独自にデプロイした場合は、デプロイ概要ページから CA 証明書をダウンロードしてください。

MicroPython は現在 PEM 形式の証明書のみ対応しているため、ダウンロードした証明書の形式変換は不要です。以下のコードは、CA 証明書ファイルの内容を読み込み cadata にセットし、この CA 証明書を信頼することを示しています。cert_reqsssl.CERT_REQUIRED に設定することで、ハンドシェイク時にサーバーが証明書を送信することを要求します。

with open('emqxsl-ca.crt', 'rb') as f:
    cadata = f.read()
ssl_params = dict()
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
ssl_params["cadata"] = cadata

EMQX Cloud のサーバレスインスタンスを利用する場合は、server_hostname オプションで SNI(Server Name Indication)を接続先アドレスに設定する必要があります。これはサーバレスが SNI によってテナントを識別するため必須の設定です。MicroPython はデフォルトで SNI を送信しないため、設定しないと接続に失敗します。

ssl_params["server_hostname"] = SERVER

完全な接続コードは以下の通りです。

python
def connect():
  	with open('emqxsl-ca.crt', 'rb') as f:
        cadata = f.read()
    ssl_params = dict()
    ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
    ssl_params["cadata"] = cadata
    ssl_params["server_hostname"] = SERVER
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

cert_reqscadata パラメータを使用するには、MicroPython バージョン 1.20.0 以上が必要です。これらのパラメータを指定しない場合、クライアントはサーバーの正当性を検証できず、中間者攻撃のリスクがあります。

コールバックを設定してトピックをサブスクライブする

次に、メッセージ到着時に呼ばれる on_message コールバック関数を実装します。ここにメッセージ処理のコードを記述できます。この例では、トピックとメッセージの内容を単純に表示しています。

メッセージの取りこぼしを防ぐため、トピックをサブスクライブする前にクライアントのコールバックを on_message に設定することを推奨します。

python
def on_message(topic, msg):
    print("Received '{payload}' from topic '{topic}'\n".format(
        payload = msg.decode(), topic = topic.decode()))

def subscribe(client):
    client.set_callback(on_message)
    client.subscribe(TOPIC)

循環的にパブリッシュと受信を行う

この例では、同じクライアントを使ってメッセージの循環的なパブリッシュと受信を行います。以下のコードの主な役割は、新しいメッセージ内容を継続的に構築してパブリッシュし、その後 wait_msg() を呼び出して MQTT サーバーから転送されるメッセージをブロックして待つことです。

メッセージが届くと on_message コールバックが呼ばれてメッセージ内容を表示し、コールバック終了後は 1 秒待ってから次のループに入ります。

def loop_publish(client):
    msg_count = 0
    while True:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        result = client.publish(TOPIC, msg)
        print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
        client.wait_msg()
        msg_count += 1
        time.sleep(1)

メイン関数

メイン関数では、まず wifi.connect() を呼んでデバイスをネットワークに接続し、その後に接続、サブスクライブ、循環パブリッシュの各関数を順に呼び出します。

def run():
    wifi.connect()
    client = connect()
    subscribe(client)
    loop_publish(client)

if __name__ == "__main__":
    run()

全コード

WiFi 接続コード:

python
import network
import time

def connect():
	ssid = 'NAME OF YOUR WIFI NETWORK'
	password = 'PASSWORD OF YOUR WIFI NETWORK'
	wlan = network.WLAN(network.STA_IF)
	wlan.active(True)
	wlan.connect(ssid, password)
	while wlan.isconnected() == False:
		print('Waiting for connection...')
		time.sleep(1)
	print('Connected on {ip}'.format(ip=wlan.ifconfig()[0]))

MQTT クライアントコード:

python
import json
import random
import ssl
import time
import wifi

from umqtt.simple import MQTTClient

SERVER = "broker.emqx.io"
PORT = 8883
CLIENT_ID = 'micropython-client-{id}'.format(id=random.getrandbits(8))
USERNAME = 'emqx'
PASSWORD = 'public'
TOPIC = "raspberry/mqtt"

def on_message(topic, msg):
    print("Received '{payload}' from topic '{topic}'\n".format(
        payload = msg.decode(), topic = topic.decode()))

def connect():
    with open('broker.emqx.io-ca.crt', 'rb') as f:
        cadata = f.read()
    ssl_params = dict()
    ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
    ssl_params["cadata"] = cadata
    ssl_params["server_hostname"] = SERVER
    client = MQTTClient(CLIENT_ID, SERVER, PORT, USERNAME, PASSWORD, ssl = True, ssl_params = ssl_params)
    client.connect()
    print('Connected to MQTT Broker "{server}"'.format(server = SERVER))
    return client

def subscribe(client):
    client.set_callback(on_message)
    client.subscribe(TOPIC)

def loop_publish(client):
    msg_count = 0
    while True:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        result = client.publish(TOPIC, msg)
        print("Send '{msg}' to topic '{topic}'".format(msg = msg, topic = TOPIC))
        client.wait_msg()
        msg_count += 1
        time.sleep(1)

def run():
    wifi.connect()
    client = connect()
    subscribe(client)
    loop_publish(client)

if __name__ == "__main__":
    run()

動作確認

コードを実行すると、コンソールには以下のように表示されます。

Connected on 192.168.0.32
Connected to MQTT Broker "broker.emqx.io"
Send '{"msg": 0}' to topic 'raspberry/mqtt'
Received '{"msg": 0}' from topic 'raspberry/mqtt'

Send '{"msg": 1}' to topic 'raspberry/mqtt'
Received '{"msg": 1}' from topic 'raspberry/mqtt'

Send '{"msg": 2}' to topic 'raspberry/mqtt'
Received '{"msg": 2}' from topic 'raspberry/mqtt'

さらに

これで MicroPython の umqtt モジュールを使い、ESP32 から EMQX Cloud に正常に接続できました。サンプルコードは こちら からダウンロード可能です。また、他の言語でのデモ例も GitHub に多数ありますのでご参照ください。