Skip to content

Paho Pythonで接続する

本ドキュメントでは、Pythonプロジェクトでpaho-mqttクライアントライブラリを使用してMQTTサーバーに接続し、サブスクライブ、サブスクライブ解除、メッセージの送受信を行う方法について説明します。

paho-mqttは、現在Pythonで最も利用されているMQTTクライアントライブラリです。Python 2.7.9+または3.6+のクライアントクラスでMQTT V5.0、V3.1、V3.1.1をサポートしています。また、MQTTサーバーに対して単発のメッセージを簡単にパブリッシュできるヘルパー関数も提供しています。

前提条件

Paho Pythonクライアントを使用してPythonアプリケーションをEMQX Cloudに接続する前に、以下の前提条件を満たしていることを確認してください。

  • MQTTブローカーのデプロイ
  • Pythonのバージョン確認

MQTTブローカーのデプロイ

アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。

サーバレスデプロイメント

  1. EMQX Cloudコンソールでサーバレスデプロイメントを作成します。

  2. デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。

    • ブローカーアドレス

    • ポート番号(サーバレスではTLSポートのみ対応)

  3. サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート8883を使用してください。

  4. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細はサーバレスポートガイドを参照してください。

Dedicated Flex または BYOC デプロイメント

  1. EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。

  2. 作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。

    • ブローカーアドレス

    • MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)

  3. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。

Pythonバージョンの確認

本プロジェクトはPython 3.8で開発およびテストを行っています。以下のコマンドでPythonのバージョンを確認してください。

➜  ~ python3 --version             
Python 3.8.6

MQTTクライアントのインストール

  1. PipはPythonで書かれたパッケージ管理システムで、ソフトウェアパッケージのインストールと管理に使用されます。以下のコマンドでpaho-mqttをインストールしてください。
bash
pip install paho-mqtt
  1. Paho MQTTクライアントをインポートします。
python
from paho.mqtt import client as mqtt_client

TCPポートで接続する

コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは1883ではない可能性がありますので、必ずポート番号を確認してください。

  • MQTTブローカー接続のホスト、ポート、トピックを設定します。同時に、Pythonの関数 random.randint を呼び出してMQTTクライアントIDをランダムに生成します。
python
broker = 'broker.emqx.io'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
  • 接続コールバック関数 on_connect を作成します。
    この関数はクライアント接続後に呼び出され、引数の rc によって接続成功かどうかを判定します。
python
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # クライアントIDを設定
    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

SSL/TLSポートで接続する

コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは8883ではない可能性がありますので、必ずポート番号を確認してください。

ここではSSL/TLSの片方向認証での接続方法を紹介します。双方向認証を使用する場合は、こちらを参照してください。

  • MQTTブローカー接続のホスト、ポート、トピックを設定します。同時に、Pythonの関数 random.randint を呼び出してMQTTクライアントIDをランダムに生成します。
python
broker = 'broker.emqx.io'
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
  • CA証明書を設定します。サーバレスデプロイメントを使用している場合は、コンソールのデプロイメント概要からCA証明書ファイルをダウンロードできます。Dedicated Flexデプロイメントの場合は、TLS/SSLの設定を参照してください。
  • 接続コールバック関数 on_connect を作成します。
    この関数はクライアント接続後に呼び出され、引数の rc によって接続成功かどうかを判定します。
python
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # クライアントIDを設定
    client = mqtt_client.Client(client_id)
    # CA証明書を設定
    client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

パブリッシュとサブスクライブ

このセクションでは、MQTTブローカーに正常に接続した後のトピックのサブスクライブとメッセージのパブリッシュ方法を紹介します。

トピックのサブスクライブ

  • サブスクライブするトピックとQoSレベルを設定します。
  • メッセージコールバック関数 on_message を作成します。この関数はクライアントがMQTTブローカーからメッセージを受信した後に呼び出されます。受信したトピック名とメッセージを表示します。
python
def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic, qos=0)
    client.on_message = on_message

トピックのサブスクライブ解除

以下のコードでトピックのサブスクライブを解除できます。解除するトピックとQoSレベルを指定してください。

python
def unsubscribe(client: mqtt_client):
    client.on_message = None
    client.unsubscribe(topic)

メッセージのパブリッシュ

  • メッセージをパブリッシュする際に、MQTTブローカーにトピックとペイロードを通知します。
  • まず、whileループを定義します。このループ内でMQTTクライアントの publish 関数を使い、1秒ごとにトピック python/mqtt にメッセージを送信します。
python
def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
           print(f"Send `{msg}` to topic `{topic}`")
        else:
           print(f"Failed to send message to topic {topic}")
        msg_count += 1

メッセージの受信

以下のコードは、クライアントがメッセージイベントをリッスンし、メッセージ受信後にコールバック関数を実行して、受信したメッセージとトピックをコンソールに出力します。

python
def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        
client.on_message = on_message

MQTTブローカーからの切断

クライアントが能動的に切断したい場合は、以下のコードを使用してください。

python
def disconnect(client: mqtt_client):
    client.loop_stop()
    client.disconnect()

上記は主要なコードのみを掲載しています。プロジェクトの完全なコードはこちらを参照してください。ダウンロードして体験できます。

完全なコード例

メッセージをパブリッシュするコード

python
# python 3.8

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

サブスクライブするコード

python
# python3.8

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'emqx'
password = '**********'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

テスト

メッセージのパブリッシュ

メッセージパブリッシュのコードを実行すると、クライアントが接続しメッセージを正常にパブリッシュしていることが確認できます。

python_pub

サブスクライブ

python_sub

その他

以上はpaho-mqttクライアントライブラリを使ってEMQX Cloudに接続する方法の紹介です。
サンプルのソースコードはこちらからダウンロードできます。
他の言語のサンプルもGitHubでご覧いただけます。