Paho Pythonでの接続
本ドキュメントでは、Pythonプロジェクトでpaho-mqttクライアントライブラリを使用してMQTTサーバーに接続し、サブスクライブ、サブスクライブ解除、メッセージの送受信を行う方法について説明します。
paho-mqttは、現在Pythonで最も広く使われているMQTTクライアントライブラリです。Python 2.7.9+または3.6+でMQTT V5.0、V3.1、V3.1.1のクライアントクラスをサポートしています。また、MQTTサーバーへ単発のメッセージを簡単にパブリッシュするためのヘルパー関数も提供しています。
前提条件
MQTTブローカーのデプロイ
EMQXが提供する無料のパブリックMQTTブローカーを利用できます。このサービスはEMQXプラットフォームをベースに作成されています。ブローカーの接続情報は以下の通りです:
- ブローカー:broker.emqx.io
- TCPポート:1883
- TLS/SSLポート:8883
- WebSocketポート:8083
- WebSocket TLS/SSLポート:8084
また、デプロイメントを作成することも可能です。接続情報はデプロイメント概要で確認してください。デプロイメントが稼働中であることを確認してください。同時に、WebSocketを使ってMQTTサーバーへの接続テストが可能です。独自にデプロイメントを作成する場合は、アクセス制御 -> 認証を確認し、認証用のユーザー名とパスワードを設定してください。
Pythonバージョンの確認
本プロジェクトはPython 3.8で開発およびテストしています。以下のコマンドでPythonのバージョンを確認してください。
➜ ~ python3 --version
Python 3.8.6
MQTTクライアントのインストール
- PipはPythonで書かれたパッケージ管理システムで、ソフトウェアパッケージのインストールと管理に使用されます。以下のコマンドでpaho-mqttをインストールします。
pip install paho-mqtt
- Paho MQTTクライアントをインポートします。
from paho.mqtt import client as mqtt_client
TCPポートでの接続
コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは1883ではない可能性があるため、必ずポートを確認してください。
- MQTTブローカー接続のためにホスト、ポート、トピックを設定します。同時にPythonの関数
random.randint
を使ってMQTTクライアントIDをランダムに生成します。
broker = 'broker.emqx.io'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
- 接続コールバック関数
on_connect
を作成します。
この関数はクライアント接続後に呼び出され、引数のrc
で接続成功かどうかを判定します。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# クライアントIDを設定
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
SSL/TLSポートでの接続
コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは8883ではない可能性があるため、必ずポートを確認してください。
本節では、SSL/TLSの片方向認証でデプロイメントに接続する方法を紹介します。双方向認証を使用する場合は、こちらを参照してください。
- MQTTブローカー接続のためにホスト、ポート、トピックを設定します。同時にPythonの関数
random.randint
を使ってMQTTクライアントIDをランダムに生成します。
broker = 'broker.emqx.io'
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
- CA証明書の設定。Serverlessデプロイメントを使用している場合は、コンソールのデプロイメント概要からCA証明書ファイルをダウンロードできます。Dedicatedデプロイメントの場合は、TLS/SSLの設定を参照してください。
- 接続コールバック関数
on_connect
を作成します。
この関数はクライアント接続後に呼び出され、引数のrc
で接続成功かどうかを判定します。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# クライアントIDを設定
client = mqtt_client.Client(client_id)
# CA証明書を設定
client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
パブリッシュとサブスクライブ
本節では、MQTTブローカーに正常に接続した後、トピックのサブスクライブとメッセージのパブリッシュ方法を紹介します。
トピックのサブスクライブ
- サブスクライブするトピックとQoSレベルを設定します。
- メッセージコールバック関数
on_message
を作成します。この関数はクライアントがMQTTブローカーからメッセージを受信した後に呼び出されます。受信したトピック名とメッセージを出力します。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic, qos=0)
client.on_message = on_message
トピックのサブスクライブ解除
以下のコードでトピックのサブスクライブを解除します。解除するトピックとQoSレベルを指定してください。
def unsubscribe(client: mqtt_client):
client.on_message = None
client.unsubscribe(topic)
メッセージのパブリッシュ
- メッセージをパブリッシュする際に、MQTTブローカーにトピックとペイロードを通知します。
- まずwhileループを定義し、その中でMQTTクライアントの
publish
関数を使い、1秒ごとにトピックpython/mqtt
へメッセージを送信します。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
メッセージの受信
以下のコードは、クライアントがメッセージイベントをリッスンし、メッセージを受信した後にコールバック関数を実行し、受信したメッセージとトピックをコンソールに出力します。
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.on_message = on_message
MQTTブローカーからの切断
クライアントが能動的に切断したい場合は、以下のコードを使用します。
def disconnect(client: mqtt_client):
client.loop_stop()
client.disconnect()
上記は主要なコードの抜粋です。プロジェクトの完全なコードはこちらを参照してください。ダウンロードして体験できます。
完全なコード例
メッセージパブリッシュのコード
# python 3.8
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
サブスクライブのコード
# python3.8
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'emqx'
password = '**********'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
テスト
メッセージのパブリッシュ
メッセージパブリッシュのコードを実行すると、クライアントが接続し、メッセージを正常にパブリッシュしていることが確認できます。
サブスクライブ
さらに詳しく
以上はpaho-mqttクライアントライブラリを使ってEMQXプラットフォームに接続する方法の紹介でした。
サンプルのソースコードはこちらからダウンロード可能です。
他の言語のサンプルもGitHubでご覧いただけます。