Skip to content

MicroPython + Raspberry Pi で接続する

本記事では、Raspberry Pi(Raspberry Pi 4)上で MicroPython を使用し、クライアントと MQTT サーバー間で接続、サブスクライブ、パブリッシュする機能を実現する方法を紹介します。

MicroPython は、Python 3 プログラミング言語の完全なソフトウェア実装で、C 言語で書かれており、MCU(マイクロコントローラユニット)ハードウェア上で動作するフル Python コンパイラとランタイムシステムに最適化されています。ユーザーにはインタラクティブなプロンプト(REPL)が提供され、サポートされているコマンドを即座に実行できます。選択されたコア Python ライブラリに加え、MicroPython には低レベルハードウェアへのアクセスを可能にするモジュールが含まれており、マイクロコントローラや制約のある環境で動作するよう最適化された Python 3 言語の簡略実装で、Python 標準ライブラリの一部も含まれています。

Raspberry Pi は、英国の Raspberry Pi Foundation によって開発された ARM ベースのマイクロコンピュータマザーボードです。USB インターフェースやイーサネットインターフェースを備え、キーボード、マウス、ネットワークケーブルの接続が可能です。マザーボードは PC の基本機能を持ち、Raspberry Pi は Wi-Fi、Bluetooth、多数の GPIO を統合しており、教育、家庭用エンターテインメント、IoT などで広く利用されています。

本記事では、Raspberry Pi 4 上の MicroPython クライアントを TCP ポートおよび TLS/SSL ポート経由で MQTT ブローカーに接続する方法を示します。サーバーレスのデプロイメントの場合は TLS/SSL ポート接続のデモをご参照ください。TCP ポート経由の接続設定は TLS/SSL ポート経由の設定と異なりますが、パブリッシュおよびサブスクライブ機能のコードは同じです。

前提条件

接続前に、ブローカーとクライアントの準備を行う必要があります。

MQTT ブローカーの準備

EMQX が提供する無料のパブリック MQTT ブローカーを利用できます。このサービスはEMQX プラットフォームを基に構築されています。ブローカーのアクセス情報は以下の通りです:

  • ブローカー: broker.emqx.io
  • TCP ポート: 1883
  • SSL/TLS ポート: 8883

また、デプロイメントを作成することも可能です。デプロイメントの概要で接続情報を確認してください。デプロイメントが稼働中であることを確認し、TCP ポートまたは TLS/SSL ポートを使って MQTT サーバーへの接続をテストしてください。

独自にデプロイメントを作成する場合は、アクセス制御 -> 認証を確認し、認証用のユーザー名とパスワードを設定してください。

MicroPython のインストール

MicroPython をインストールしコードを書くために、Raspberry Pi 4 上で以下のインストールを完了する必要があります。本記事で使用する Raspberry Pi OS は Raspberry Pi OS with desktop(Debian バージョン: 10、64-bit)です。

  1. Raspberry Pi の OS が Debian バージョン 10(buster)ベースの場合、以下のコマンドで MicroPython を直接インストールできます。

    bash
    sudo apt-get update
    sudo apt-get -y install micropython

    TIP

    インストール時に E: Unable to locate package micropython エラーが発生した場合は、snap を使うか、ソースコードからビルドしてインストールしてください。

  2. Raspberry Pi OS が Debian バージョン 11(bullseye)ベースの場合は、snap を使って MicroPython をインストールします。

    bash
    sudo apt update
    sudo apt install snapd
    sudo reboot
    sudo snap install core
    sudo snap install micropython

インストール完了後、ターミナルで micropython を実行し、MicroPython x.x(x は数字)が表示されればインストール成功です。

micropython_installation

MQTT クライアントライブラリのインストール

MQTT サーバーへの接続を簡単にするために、umqtt.simple ライブラリをインストールします。

bash
micropython -m upip install umqtt.simple

TCP ポートで接続する

このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TCP ポート経由で接続する方法を紹介します。完全なコード例は以下の通りです。

サブスクライブ

任意のエディタを開き、以下のコードを入力して sub.py ファイルとして保存してください。

python
# sub.py
import time
from umqtt.simple import MQTTClient

SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'

def sub(topic, msg):
    print('received message %s on topic %s' % (msg, topic))

def main(server=SERVER):
    client = MQTTClient(ClientID, server, 1883, user, password)
    client.set_callback(sub)
    client.connect()
    print('Connected to MQTT Broker "%s"' % (server))
    client.subscribe(topic)
    while True:
        if True:
            client.wait_msg()
        else:
            client.check_msg()
            time.sleep(1)

if __name__ == "__main__":
    main()

パブリッシュ

任意のエディタを開き、以下のコードを入力して pub.py ファイルとして保存してください。

python
# pub.py
import time
from umqtt.simple import MQTTClient

server="broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'

def connect():
    print('Connected to MQTT Broker "%s"' % (server))
    client = MQTTClient(ClientID, server, 1883, user, password)
    client.connect()
    return client

def reconnect():
    print('Failed to connect to MQTT broker, Reconnecting...' % (server))
    time.sleep(5)
    client.reconnect()

try:
    client = connect()
except OSError as e:
    reconnect()

while True:
  print('send message %s on topic %s' % (msg, topic))
  client.publish(topic, msg, qos=0)
  time.sleep(1)

TLS/SSL ポートで接続する

このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TLS/SSL ポート経由で接続する方法を紹介します。TCP ポートと TLS/SSL ポートの接続設定は若干異なりますが、パブリッシュおよびサブスクライブのコードは同じです。完全なコード例は以下の通りです。

サブスクライブ

任意のエディタを開き、以下のコードを入力して sub-tls.py ファイルとして保存してください。

python
# sub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient

SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b"hello"

def sub(topic, msg):
    print('received message %s on topic %s' % (msg, topic))

def main(server=SERVER):
    client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
    client.set_callback(sub)
    client.connect()
    print('Connected to MQTT Broker "%s"' % (server))
    client.subscribe(topic)
    while True:
        if True:
            client.wait_msg()
        else:
            client.check_msg()
            time.sleep(1)

if __name__ == "__main__":
    main()

パブリッシュ

任意のエディタを開き、以下のコードを入力して pub-tls.py ファイルとして保存してください。

python
# pub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient

server = "broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b'{"msg":"hello"}'

def connect():
    print('Connected to MQTT Broker "%s"' % server)
    client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
    try:
        client.connect()
        return client
    except Exception as e:
        print('Failed to connect to MQTT broker:', e)
        raise

def reconnect():
    print('Failed to connect to MQTT broker. Reconnecting...')
    time.sleep(5)
    client = connect()
    return client

try:
    client = connect()
except Exception:
    client = reconnect()

while True:
    try:
        print('Sending message %s on topic %s' % (msg, topic))
        client.publish(topic, msg, qos=0)
        time.sleep(1)
    except Exception as e:
        print('Failed to publish message:', e)
        client = reconnect()

テスト

以下のテストには MQTT 5.0 クライアントツール MQTTX を使用します。

サブスクライブのテスト

MQTT サーバーへの接続に成功したら、Raspberry Pi と MQTTX を使って接続をテストできます。

  1. ターミナルを開き、MicroPython のコードを実行してメッセージを待ち受けます。

    bash
    micropython sub.py

    micropython_connection

  2. MQTTX クライアントで MQTT サーバーに接続し、トピック raspberry/mqtt にメッセージを送信します。

    micropython_mqttx_pub

  3. Raspberry Pi の MicroPython で受信したメッセージを確認します。

    micropython_receive

パブリッシュのテスト

  1. MQTTX クライアントでトピック raspberry/mqtt をサブスクライブします。

  2. ターミナルで MicroPython のコードを実行し、メッセージをパブリッシュします。

    bash
    micropython pub.py

    micropython_send

  3. MQTTX クライアントで Raspberry Pi から送信されたメッセージを確認します。

    micropython_mqttx_sub

まとめ

以上で、MicroPython プロジェクトにおける MQTT 接続を作成し、クライアントを使って MQTT サーバーに接続、サブスクライブ、メッセージの送受信を行うシナリオをシミュレートしました。サンプルのソースコードはこちらからダウンロードできます。また、GitHubでは他の言語のデモ例も多数公開されています。