Skip to content

MicroPython + Raspberry Pi で接続する

本記事では、Raspberry Pi(Raspberry Pi 4)上で MicroPython を使用し、クライアントと MQTT サーバー間で接続、サブスクライブ、パブリッシュする機能の実現方法を紹介します。

MicroPython は、Python 3 プログラミング言語の完全なソフトウェア実装で、C 言語で書かれ、MCU(マイクロコントローラユニット)ハードウェア上で動作するフル Python コンパイラおよびランタイムシステムに最適化されています。ユーザーに対してインタラクティブなプロンプト(REPL)を提供し、サポートされているコマンドを即座に実行できます。選択されたコア Python ライブラリに加え、MicroPython には低レベルハードウェアへのアクセスを提供するモジュールが含まれており、マイクロコントローラや制約のある環境で動作するよう最適化された Python 標準ライブラリの一部を含む、Python 3 言語の簡素化された実装です。

Raspberry Pi は、英国の Raspberry Pi 財団によって開発された ARM ベースのマイクロコンピュータのマザーボードです。USB インターフェースやイーサネットインターフェースを備え、キーボード、マウス、ネットワークケーブルの接続が可能です。マザーボードは PC の基本機能を持ち、Raspberry Pi は Wi-Fi、Bluetooth、多数の GPIO を統合しており、教育、家庭用エンターテインメント、IoT など幅広く利用されています。

本記事では、Raspberry Pi 4 上の MicroPython クライアントを TCP ポートおよび TLS/SSL ポート経由で MQTT ブローカーに接続する方法を示します。サーバレス環境でのデプロイメントについては、TLS/SSL ポート接続のデモをご参照ください。TCP ポート接続の設定は TLS/SSL ポート接続とは異なりますが、パブリッシュおよびサブスクライブのコードは共通です。

前提条件

接続前に、ブローカーとクライアントの準備を行ってください。以下の前提条件を完了していることを確認してください。

  • MQTT ブローカーのデプロイ
  • MicroPython のインストール
  • MQTT クライアントライブラリのインストール

MQTT ブローカーのデプロイ

アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。

サーバレスデプロイメント

  1. EMQX Cloudコンソールでサーバレスデプロイメントを作成します。

  2. デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。

    • ブローカーアドレス

    • ポート番号(サーバレスではTLSポートのみ対応)

  3. サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート8883を使用してください。

  4. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細はサーバレスポートガイドを参照してください。

Dedicated Flex または BYOC デプロイメント

  1. EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。

  2. 作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。

    • ブローカーアドレス

    • MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)

  3. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。

MicroPython のインストール

MicroPython をインストールし、コードを書くために、Raspberry Pi 4 上で以下のインストールを完了する必要があります。本記事で使用する Raspberry Pi OS は Raspberry Pi OS with desktop(Debian バージョン: 10、64-bit)です。

  1. Raspberry Pi の OS が Debian バージョン 10(buster)ベースの場合、以下のコマンドで MicroPython を直接インストールできます。

    bash
    sudo apt-get update
    sudo apt-get -y install micropython

    TIP

    インストール中に E: Unable to locate package micropython エラーが発生した場合は、snap を使用するか、ソースコードからビルドしてインストールしてください。

  2. Raspberry Pi OS が Debian バージョン 11(bullseye)ベースの場合は、snap を使って MicroPython をインストールできます。

    bash
    sudo apt update
    sudo apt install snapd
    sudo reboot
    sudo snap install core
    sudo snap install micropython

インストール完了後、ターミナルで micropython を実行し、MicroPython x.x(x は数字)が表示されればインストール成功です。

micropython_installation

MQTT クライアントライブラリのインストール

MQTT サーバーへの接続を簡単にするため、umqtt.simple ライブラリをインストールします。

bash
micropython -m upip install umqtt.simple

TCP ポートで接続する

このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TCP ポート経由で接続する方法を紹介します。完全なコード例は以下の通りです。

サブスクライブ

任意のエディタで以下のコードを入力し、sub.py ファイルとして保存してください。

python
# sub.py
import time
from umqtt.simple import MQTTClient

SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'

def sub(topic, msg):
    print('received message %s on topic %s' % (msg, topic))

def main(server=SERVER):
    client = MQTTClient(ClientID, server, 1883, user, password)
    client.set_callback(sub)
    client.connect()
    print('Connected to MQTT Broker "%s"' % (server))
    client.subscribe(topic)
    while True:
        if True:
            client.wait_msg()
        else:
            client.check_msg()
            time.sleep(1)

if __name__ == "__main__":
    main()

パブリッシュ

任意のエディタで以下のコードを入力し、pub.py ファイルとして保存してください。

python
# pub.py
import time
from umqtt.simple import MQTTClient

server="broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = "raspberry/mqtt"
msg = b'{"msg":"hello"}'

def connect():
    print('Connected to MQTT Broker "%s"' % (server))
    client = MQTTClient(ClientID, server, 1883, user, password)
    client.connect()
    return client

def reconnect():
    print('Failed to connect to MQTT broker, Reconnecting...' % (server))
    time.sleep(5)
    client.reconnect()

try:
    client = connect()
except OSError as e:
    reconnect()

while True:
  print('send message %s on topic %s' % (msg, topic))
  client.publish(topic, msg, qos=0)
  time.sleep(1)

TLS/SSL ポートで接続する

このセクションでは、Raspberry Pi 上の MicroPython と MQTT サーバーを TLS/SSL ポート経由で接続する方法を紹介します。TCP ポートと TLS/SSL ポートの接続設定部分は若干異なりますが、パブリッシュおよびサブスクライブのコードは共通です。完全なコード例は以下の通りです。

サブスクライブ

任意のエディタで以下のコードを入力し、sub-tls.py ファイルとして保存してください。

python
# sub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient

SERVER="broker.emqx.io"
ClientID = f'raspberry-sub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b"hello"

def sub(topic, msg):
    print('received message %s on topic %s' % (msg, topic))

def main(server=SERVER):
    client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
    client.set_callback(sub)
    client.connect()
    print('Connected to MQTT Broker "%s"' % (server))
    client.subscribe(topic)
    while True:
        if True:
            client.wait_msg()
        else:
            client.check_msg()
            time.sleep(1)

if __name__ == "__main__":
    main()

パブリッシュ

任意のエディタで以下のコードを入力し、pub-tls.py ファイルとして保存してください。

python
# pub-tls.py
import time
import ussl
from umqtt.simple import MQTTClient

server = "broker.emqx.io"
ClientID = f'raspberry-pub-{time.time_ns()}'
user = "emqx"
password = "public"
topic = b'raspberry/mqtt'
msg = b'{"msg":"hello"}'

def connect():
    print('Connected to MQTT Broker "%s"' % server)
    client = MQTTClient(ClientID, server, 8883, user, password, ssl=True, ssl_params={'server_hostname': server})
    try:
        client.connect()
        return client
    except Exception as e:
        print('Failed to connect to MQTT broker:', e)
        raise

def reconnect():
    print('Failed to connect to MQTT broker. Reconnecting...')
    time.sleep(5)
    client = connect()
    return client

try:
    client = connect()
except Exception:
    client = reconnect()

while True:
    try:
        print('Sending message %s on topic %s' % (msg, topic))
        client.publish(topic, msg, qos=0)
        time.sleep(1)
    except Exception as e:
        print('Failed to publish message:', e)
        client = reconnect()

テスト

以下のテストでは、MQTT 5.0 クライアントツール MQTTX を使用します。

サブスクライブのテスト

MQTT サーバーに正常に接続した後、Raspberry Pi と MQTTX を使って接続をテストできます。

  1. ターミナルを開き、MicroPython のコードを実行してメッセージを待ち受けます。

    bash
    micropython sub.py

    micropython_connection

  2. MQTTX クライアントで MQTT サーバーに接続し、トピック raspberry/mqtt にメッセージを送信します。

    micropython_mqttx_pub

  3. Raspberry Pi 上の MicroPython で受信したメッセージを確認します。

    micropython_receive

パブリッシュのテスト

  1. MQTTX クライアントでトピック raspberry/mqtt をサブスクライブします。

  2. ターミナルで MicroPython のコードを実行し、メッセージをパブリッシュします。

    bash
    micropython pub.py

    micropython_send

  3. MQTTX クライアントで Raspberry Pi から送信されたメッセージを確認します。

    micropython_mqttx_sub

まとめ

以上で、MicroPython プロジェクト内に MQTT 接続を作成し、クライアントを使って MQTT サーバーに接続、サブスクライブ、メッセージの送受信を行うシナリオをシミュレートしました。サンプルのソースコードは こちら からダウンロード可能です。また、GitHub では他言語のデモ例も多数公開されています。