MicroPython + Raspberry Pi で接続する
本記事では、Raspberry Pi(Raspberry Pi 4)上で MicroPython を使用し、クライアントと MQTT サーバー間で接続、サブスクライブ、パブリッシュする機能を実現する方法を紹介します。
MicroPython は、Python 3 プログラミング言語の完全なソフトウェア実装で、C 言語で書かれており、MCU(マイクロコントローラユニット)ハードウェア上で動作するフル Python コンパイラとランタイムシステムに最適化されています。ユーザーにはインタラクティブなプロンプト(REPL)が提供され、サポートされているコマンドを即座に実行できます。選択されたコア Python ライブラリに加え、MicroPython には低レベルハードウェアへのアクセスを可能にするモジュールが含まれており、マイクロコントローラや制約のある環境で動作するよう最適化された Python 3 言語の簡略実装で、Python 標準ライブラリの一部も含まれています。
Raspberry Pi は、英国の Raspberry Pi Foundation によって開発された ARM ベースのマイクロコンピュータマザーボードです。USB インターフェースやイーサネットインターフェースを備え、キーボード、マウス、ネットワークケーブルの接続が可能です。マザーボードは PC の基本機能を持ち、Raspberry Pi は Wi-Fi、Bluetooth、多数の GPIO を統合しており、教育、家庭用エンターテインメント、IoT などで広く利用されています。
本記事では、Raspberry Pi 4 上の MicroPython クライアントを TCP ポートおよび TLS/SSL ポート経由で MQTT ブローカーに接続する方法を示します。サーバーレスのデプロイメントの場合は TLS/SSL ポート接続のデモをご参照ください。TCP ポート経由の接続設定は TLS/SSL ポート経由の設定と異なりますが、パブリッシュおよびサブスクライブ機能のコードは同じです。
前提条件
接続前に、ブローカーとクライアントの準備を行う必要があります。
MQTT ブローカーの準備
EMQX が提供する無料のパブリック MQTT ブローカーを利用できます。このサービスはEMQX プラットフォームを基に構築されています。ブローカーのアクセス情報は以下の通りです:
- ブローカー: broker.emqx.io
- TCP ポート: 1883
- SSL/TLS ポート: 8883
また、デプロイメントを作成することも可能です。デプロイメントの概要で接続情報を確認してください。デプロイメントが稼働中であることを確認し、TCP ポートまたは TLS/SSL ポートを使って MQTT サーバーへの接続をテストしてください。
独自にデプロイメントを作成する場合は、アクセス制御 -> 認証を確認し、認証用のユーザー名とパスワードを設定してください。
MicroPython のインストール
MicroPython をインストールしコードを書くために、Raspberry Pi 4 上で以下のインストールを完了する必要があります。本記事で使用する Raspberry Pi OS は Raspberry Pi OS with desktop(Debian バージョン: 10、64-bit)です。
Raspberry Pi の OS が Debian バージョン 10(buster)ベースの場合、以下のコマンドで MicroPython を直接インストールできます。
bashsudo apt-get update sudo apt-get -y install micropython
TIP
インストール時に
E: Unable to locate package micropython
エラーが発生した場合は、snap を使うか、ソースコードからビルドしてインストールしてください。Raspberry Pi OS が Debian バージョン 11(bullseye)ベースの場合は、snap を使って MicroPython をインストールします。
bashsudo apt update sudo apt install snapd sudo reboot sudo snap install core sudo snap install micropython
インストール完了後、ターミナルで micropython
を実行し、MicroPython x.x
(x は数字)が表示されればインストール成功です。
MQTT クライアントライブラリのインストール
MQTT サーバーへの接続を簡単にするために、umqtt.simple
ライブラリをインストールします。
micropython -m upip install umqtt.simple
TCP ポートで接続する
このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TCP ポート経由で接続する方法を紹介します。完全なコード例は以下の通りです。
サブスクライブ
任意のエディタを開き、以下のコードを入力して sub.py
ファイルとして保存してください。
# sub.py
import time
from umqtt.simple import MQTTClient
SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'
def sub(topic, msg):
print('received message %s on topic %s' % (msg, topic))
def main(server=SERVER):
client = MQTTClient(ClientID, server, 1883, user, password)
client.set_callback(sub)
client.connect()
print('Connected to MQTT Broker "%s"' % (server))
client.subscribe(topic)
while True:
if True:
client.wait_msg()
else:
client.check_msg()
time.sleep(1)
if __name__ == "__main__":
main()
パブリッシュ
任意のエディタを開き、以下のコードを入力して pub.py
ファイルとして保存してください。
# pub.py
import time
from umqtt.simple import MQTTClient
server="broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'
def connect():
print('Connected to MQTT Broker "%s"' % (server))
client = MQTTClient(ClientID, server, 1883, user, password)
client.connect()
return client
def reconnect():
print('Failed to connect to MQTT broker, Reconnecting...' % (server))
time.sleep(5)
client.reconnect()
try:
client = connect()
except OSError as e:
reconnect()
while True:
print('send message %s on topic %s' % (msg, topic))
client.publish(topic, msg, qos=0)
time.sleep(1)
TLS/SSL ポートで接続する
このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TLS/SSL ポート経由で接続する方法を紹介します。TCP ポートと TLS/SSL ポートの接続設定は若干異なりますが、パブリッシュおよびサブスクライブのコードは同じです。完全なコード例は以下の通りです。
サブスクライブ
任意のエディタを開き、以下のコードを入力して sub-tls.py
ファイルとして保存してください。
# sub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient
SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b"hello"
def sub(topic, msg):
print('received message %s on topic %s' % (msg, topic))
def main(server=SERVER):
client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
client.set_callback(sub)
client.connect()
print('Connected to MQTT Broker "%s"' % (server))
client.subscribe(topic)
while True:
if True:
client.wait_msg()
else:
client.check_msg()
time.sleep(1)
if __name__ == "__main__":
main()
パブリッシュ
任意のエディタを開き、以下のコードを入力して pub-tls.py
ファイルとして保存してください。
# pub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient
server = "broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b'{"msg":"hello"}'
def connect():
print('Connected to MQTT Broker "%s"' % server)
client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
try:
client.connect()
return client
except Exception as e:
print('Failed to connect to MQTT broker:', e)
raise
def reconnect():
print('Failed to connect to MQTT broker. Reconnecting...')
time.sleep(5)
client = connect()
return client
try:
client = connect()
except Exception:
client = reconnect()
while True:
try:
print('Sending message %s on topic %s' % (msg, topic))
client.publish(topic, msg, qos=0)
time.sleep(1)
except Exception as e:
print('Failed to publish message:', e)
client = reconnect()
テスト
以下のテストには MQTT 5.0 クライアントツール MQTTX を使用します。
サブスクライブのテスト
MQTT サーバーへの接続に成功したら、Raspberry Pi と MQTTX を使って接続をテストできます。
ターミナルを開き、MicroPython のコードを実行してメッセージを待ち受けます。
bashmicropython sub.py
MQTTX クライアントで MQTT サーバーに接続し、トピック
raspberry/mqtt
にメッセージを送信します。Raspberry Pi の MicroPython で受信したメッセージを確認します。
パブリッシュのテスト
MQTTX クライアントでトピック
raspberry/mqtt
をサブスクライブします。ターミナルで MicroPython のコードを実行し、メッセージをパブリッシュします。
bashmicropython pub.py
MQTTX クライアントで Raspberry Pi から送信されたメッセージを確認します。
まとめ
以上で、MicroPython プロジェクトにおける MQTT 接続を作成し、クライアントを使って MQTT サーバーに接続、サブスクライブ、メッセージの送受信を行うシナリオをシミュレートしました。サンプルのソースコードはこちらからダウンロードできます。また、GitHubでは他の言語のデモ例も多数公開されています。