Paho Pythonで接続する
本ドキュメントでは、Pythonプロジェクトでpaho-mqttクライアントライブラリを使用してMQTTサーバーに接続し、サブスクライブ、サブスクライブ解除、メッセージの送受信を行う方法について説明します。
paho-mqttは、現在Pythonで最も利用されているMQTTクライアントライブラリです。Python 2.7.9+または3.6+のクライアントクラスでMQTT V5.0、V3.1、V3.1.1をサポートしています。また、MQTTサーバーに対して単発のメッセージを簡単にパブリッシュできるヘルパー関数も提供しています。
前提条件
Paho Pythonクライアントを使用してPythonアプリケーションをEMQX Cloudに接続する前に、以下の前提条件を満たしていることを確認してください。
- MQTTブローカーのデプロイ
- Pythonのバージョン確認
MQTTブローカーのデプロイ
アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。
サーバレスデプロイメント
EMQX Cloudコンソールでサーバレスデプロイメントを作成します。
デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。
ブローカーアドレス
ポート番号(サーバレスではTLSポートのみ対応)
サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート
8883を使用してください。デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細はサーバレスポートガイドを参照してください。
Dedicated Flex または BYOC デプロイメント
EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。
作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。
ブローカーアドレス
MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)
デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。
Pythonバージョンの確認
本プロジェクトはPython 3.8で開発およびテストを行っています。以下のコマンドでPythonのバージョンを確認してください。
➜ ~ python3 --version
Python 3.8.6MQTTクライアントのインストール
- PipはPythonで書かれたパッケージ管理システムで、ソフトウェアパッケージのインストールと管理に使用されます。以下のコマンドでpaho-mqttをインストールしてください。
pip install paho-mqtt- Paho MQTTクライアントをインポートします。
from paho.mqtt import client as mqtt_clientTCPポートで接続する
コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは1883ではない可能性がありますので、必ずポート番号を確認してください。
- MQTTブローカー接続のホスト、ポート、トピックを設定します。同時に、Pythonの関数
random.randintを呼び出してMQTTクライアントIDをランダムに生成します。
broker = 'broker.emqx.io'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'- 接続コールバック関数
on_connectを作成します。
この関数はクライアント接続後に呼び出され、引数のrcによって接続成功かどうかを判定します。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# クライアントIDを設定
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return clientSSL/TLSポートで接続する
コンソールのデプロイメント概要で該当するアドレスとポート情報を確認してください。ベーシックエディションの場合、ポートは8883ではない可能性がありますので、必ずポート番号を確認してください。
ここではSSL/TLSの片方向認証での接続方法を紹介します。双方向認証を使用する場合は、こちらを参照してください。
- MQTTブローカー接続のホスト、ポート、トピックを設定します。同時に、Pythonの関数
random.randintを呼び出してMQTTクライアントIDをランダムに生成します。
broker = 'broker.emqx.io'
port = 8883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'- CA証明書を設定します。サーバレスデプロイメントを使用している場合は、コンソールのデプロイメント概要からCA証明書ファイルをダウンロードできます。Dedicated Flexデプロイメントの場合は、TLS/SSLの設定を参照してください。
- 接続コールバック関数
on_connectを作成します。
この関数はクライアント接続後に呼び出され、引数のrcによって接続成功かどうかを判定します。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# クライアントIDを設定
client = mqtt_client.Client(client_id)
# CA証明書を設定
client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return clientパブリッシュとサブスクライブ
このセクションでは、MQTTブローカーに正常に接続した後のトピックのサブスクライブとメッセージのパブリッシュ方法を紹介します。
トピックのサブスクライブ
- サブスクライブするトピックとQoSレベルを設定します。
- メッセージコールバック関数
on_messageを作成します。この関数はクライアントがMQTTブローカーからメッセージを受信した後に呼び出されます。受信したトピック名とメッセージを表示します。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic, qos=0)
client.on_message = on_messageトピックのサブスクライブ解除
以下のコードでトピックのサブスクライブを解除できます。解除するトピックとQoSレベルを指定してください。
def unsubscribe(client: mqtt_client):
client.on_message = None
client.unsubscribe(topic)メッセージのパブリッシュ
- メッセージをパブリッシュする際に、MQTTブローカーにトピックとペイロードを通知します。
- まず、whileループを定義します。このループ内でMQTTクライアントの
publish関数を使い、1秒ごとにトピックpython/mqttにメッセージを送信します。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1メッセージの受信
以下のコードは、クライアントがメッセージイベントをリッスンし、メッセージ受信後にコールバック関数を実行して、受信したメッセージとトピックをコンソールに出力します。
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.on_message = on_messageMQTTブローカーからの切断
クライアントが能動的に切断したい場合は、以下のコードを使用してください。
def disconnect(client: mqtt_client):
client.loop_stop()
client.disconnect()上記は主要なコードのみを掲載しています。プロジェクトの完全なコードはこちらを参照してください。ダウンロードして体験できます。
完全なコード例
メッセージをパブリッシュするコード
# python 3.8
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'emqx'
password = '**********'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()サブスクライブするコード
# python3.8
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# クライアントIDをランダムに生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'emqx'
password = '**********'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()テスト
メッセージのパブリッシュ
メッセージパブリッシュのコードを実行すると、クライアントが接続しメッセージを正常にパブリッシュしていることが確認できます。

サブスクライブ

その他
以上はpaho-mqttクライアントライブラリを使ってEMQX Cloudに接続する方法の紹介です。
サンプルのソースコードはこちらからダウンロードできます。
他の言語のサンプルもGitHubでご覧いただけます。