Skip to content

Paho Pythonでの接続

本ドキュメントでは、Pythonプロジェクトでpaho-mqttクライアントライブラリを使用してMQTTサーバーに接続し、サブスクライブ、サブスクライブ解除、メッセージの送受信を行う方法について説明します。

paho-mqttは、現在Pythonで最も広く使われているMQTTクライアントライブラリです。Python 2.7.9+または3.6+でMQTT V5.0、V3.1、V3.1.1のクライアントクラスをサポートしています。また、MQTTサーバーへ単発のメッセージを簡単にパブリッシュするためのヘルパー関数も提供しています。

前提条件

MQTTブローカーのデプロイ

  • EMQXが提供する無料のパブリックMQTTブローカーを利用できます。このサービスはEMQXプラットフォームをベースに作成されています。ブローカーの接続情報は以下の通りです:

    • ブローカー:broker.emqx.io
    • TCPポート:1883
    • TLS/SSLポート:8883
    • WebSocketポート:8083
    • WebSocket TLS/SSLポート:8084
  • また、デプロイメントを作成することも可能です。接続情報はデプロイメント概要で確認してください。デプロイメントが稼働中であることを確認してください。同時に、WebSocketを使ってMQTTサーバーへの接続テストが可能です。独自にデプロイメントを作成する場合は、アクセス制御 -> 認証を確認し、認証用のユーザー名とパスワードを設定してください。

Pythonバージョンの確認

本プロジェクトはPython 3.8で開発およびテストしています。以下のコマンドでPythonのバージョンを確認してください。

➜  ~ python3 --version             
Python 3.8.6

MQTTクライアントのインストール

  1. PipはPythonで書かれたパッケージ管理システムで、ソフトウェアパッケージのインストールと管理に使用されます。以下のコマンドでpaho-mqttをインストールします。
bash
pip install paho-mqtt
  1. Paho MQTTクライアントをインポートします。
python
from paho.mqtt import client as mqtt_client

TCPポートでの接続

コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは1883ではない可能性があるため、必ずポートを確認してください。

  • MQTTブローカー接続のためにホスト、ポート、トピックを設定します。同時にPythonの関数random.randintを使ってMQTTクライアントIDをランダムに生成します。
python
broker = 'broker.emqx.io'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
  • 接続コールバック関数on_connectを作成します。
    この関数はクライアント接続後に呼び出され、引数のrcで接続成功かどうかを判定します。
python
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # クライアントIDを設定
    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

SSL/TLSポートでの接続

コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは8883ではない可能性があるため、必ずポートを確認してください。

本節では、SSL/TLSの片方向認証でデプロイメントに接続する方法を紹介します。双方向認証を使用する場合は、こちらを参照してください。

  • MQTTブローカー接続のためにホスト、ポート、トピックを設定します。同時にPythonの関数random.randintを使ってMQTTクライアントIDをランダムに生成します。
python
broker = 'broker.emqx.io'
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
  • CA証明書の設定。Serverlessデプロイメントを使用している場合は、コンソールのデプロイメント概要からCA証明書ファイルをダウンロードできます。Dedicatedデプロイメントの場合は、TLS/SSLの設定を参照してください。
  • 接続コールバック関数on_connectを作成します。
    この関数はクライアント接続後に呼び出され、引数のrcで接続成功かどうかを判定します。
python
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # クライアントIDを設定
    client = mqtt_client.Client(client_id)
    # CA証明書を設定
    client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

パブリッシュとサブスクライブ

本節では、MQTTブローカーに正常に接続した後、トピックのサブスクライブとメッセージのパブリッシュ方法を紹介します。

トピックのサブスクライブ

  • サブスクライブするトピックとQoSレベルを設定します。
  • メッセージコールバック関数on_messageを作成します。この関数はクライアントがMQTTブローカーからメッセージを受信した後に呼び出されます。受信したトピック名とメッセージを出力します。
python
def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic, qos=0)
    client.on_message = on_message

トピックのサブスクライブ解除

以下のコードでトピックのサブスクライブを解除します。解除するトピックとQoSレベルを指定してください。

python
def unsubscribe(client: mqtt_client):
    client.on_message = None
    client.unsubscribe(topic)

メッセージのパブリッシュ

  • メッセージをパブリッシュする際に、MQTTブローカーにトピックとペイロードを通知します。
  • まずwhileループを定義し、その中でMQTTクライアントのpublish関数を使い、1秒ごとにトピックpython/mqttへメッセージを送信します。
python
def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
           print(f"Send `{msg}` to topic `{topic}`")
        else:
           print(f"Failed to send message to topic {topic}")
        msg_count += 1

メッセージの受信

以下のコードは、クライアントがメッセージイベントをリッスンし、メッセージを受信した後にコールバック関数を実行し、受信したメッセージとトピックをコンソールに出力します。

python
def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        
client.on_message = on_message

MQTTブローカーからの切断

クライアントが能動的に切断したい場合は、以下のコードを使用します。

python
def disconnect(client: mqtt_client):
    client.loop_stop()
    client.disconnect()

上記は主要なコードの抜粋です。プロジェクトの完全なコードはこちらを参照してください。ダウンロードして体験できます。

完全なコード例

メッセージパブリッシュのコード

python
# python 3.8

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

サブスクライブのコード

python
# python3.8

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'emqx'
password = '**********'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.tls_set(ca_certs='./server-ca.crt')
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

テスト

メッセージのパブリッシュ

メッセージパブリッシュのコードを実行すると、クライアントが接続し、メッセージを正常にパブリッシュしていることが確認できます。

python_pub

サブスクライブ

python_sub

さらに詳しく

以上はpaho-mqttクライアントライブラリを使ってEMQXプラットフォームに接続する方法の紹介でした。
サンプルのソースコードはこちらからダウンロード可能です。
他の言語のサンプルもGitHubでご覧いただけます。