MQTTデータをHTTPサーバーに取り込む
HTTPサーバーデータ統合は、EMQXを外部HTTPサービスと迅速に連携させるための機能です。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も提供します。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知、アラート通知、データ統合などのシナリオに対応可能です。
本ページでは、HTTPサーバーとのデータ統合の機能と特徴を詳細に解説し、HTTPサーバーデータ統合の設定方法について実践的なガイドを提供します。
TIP
HTTPサービスとの連携が必要だがルールによるデータ処理が不要なユーザーには、より簡単で使いやすいWebhookの利用を推奨します。
動作概要
HTTPサーバーデータ統合はEMQXの標準機能で、簡単な設定により外部HTTPサービスと連携できます。HTTPサービス側では、任意のプログラミング言語やフレームワークでコードを書き、カスタムで柔軟かつ複雑なデータ処理ロジックを実装できます。

EMQXはルールエンジンとSinkを介してデバイスのイベントやメッセージをHTTPサーバーに転送します。ワークフローは以下の通りです。
- デバイスがEMQXに接続:IoTデバイスが正常に接続すると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
- デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
- ルールエンジンがメッセージを処理:ルールエンジンはトピックフィルターに基づいてメッセージをマッチングし、フィールドのフィルタリング、データ形式の変換、追加コンテキストによるメッセージの強化など設定されたルールで処理します。
- HTTPサーバーへのブリッジング:ルールは処理済みメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。リクエストヘッダー、リクエストボディ、URLはルールの出力から動的に構築可能です。
イベントやメッセージデータがHTTPサーバーに送信された後、以下のような柔軟な処理が可能です。
- デバイス管理システムでデバイス状態の更新やイベント記録を行う。
- メッセージデータをデータベースに書き込み保存する。
- SQLルールで検出した異常データに基づきアラートや通知システムを起動する。
特徴とメリット
EMQXのHTTPサーバー統合を利用することで、以下のようなメリットがあります。
- より多くの下流システムへのデータ配信拡張:HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携し、複数システム間でのデータ配信を実現します。
- リアルタイム応答と業務プロセスのトリガー:HTTPサービスを通じて外部システムはMQTTデータをリアルタイムで受信し、業務プロセスを迅速に起動できます。例えば、アラートデータを受け取って業務フローをトリガーするケースなどです。
- カスタムデータ処理:外部システム側で受信データの二次処理が可能なため、EMQXの機能に制限されない複雑な業務ロジックを実装できます。
- 疎結合な連携:HTTPサービスはシンプルなHTTPインターフェースを利用するため、システム連携の疎結合なアプローチを提供します。
まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様で柔軟なアプリケーション開発ニーズに応えます。
はじめる前に
このセクションでは、HTTPサーバーデータ統合を作成する前に必要な準備、簡単なHTTPサーバーの構築方法を説明します。
前提条件
簡単なHTTPサーバーの構築
- Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスは
POST /リクエストを受け取り、リクエスト内容を表示した後に200 OKを返します。
from flask import Flask, json, request
api = Flask(__name__)
@api.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
api.run()- 上記コードを
http_server.pyファイルとして保存し、以下のコマンドでサーバーを起動します。
pip install flask
python3 http_server.pyコネクターの作成
このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。
- ダッシュボードの左メニューから Integration -> Connector をクリックします。
- ページ右上の Create をクリックします。
- コネクタータイプとして HTTP Server を選択し、Next をクリックします。
- コネクター名を入力します。名前は英数字の組み合わせとし、例:
httpserver。 - URL にHTTPサーバーのアドレスを設定します。例:
http://localhost:5000。 - その他の設定はデフォルトのままにします。
- 高度な設定(任意):詳細はSinkの特徴を参照してください。
- Createをクリックする前に、Test Connectivityを押してコネクターがHTTPサーバーに接続できるか確認できます。
- Createをクリックしてコネクターの設定を完了します。
コネクター作成後、ルールをこのコネクターを使って作成するかどうかのダイアログが表示されます。
- Create Ruleをクリックするとルール作成ページに直接移動し、統合設定を続行できます。
- あるいはBack To Connector Listをクリックしてコネクター一覧に戻り、後からIntegration -> Rulesでルールを作成できます。
本例ではCreate Ruleをクリックして続行します。
HTTPサーバーSinkを使ったルールの作成
このセクションでは、ルールを作成しHTTPサーバーSinkを設定してMQTTメッセージをHTTPサーバーに送信する方法を説明します。
Create Ruleをクリックすると自動的にCreate Ruleページに遷移し、HTTPサーバーSinkの設定用のAction paneが表示され、先ほど作成したコネクターが選択された状態になります。
Type of ActionとActionは自動で
HTTP ServerとCreate Actionが入力され、新しいSink作成が設定されます。Sinkの名前と説明を入力します。Connectorは先ほど作成した
httpserverが自動入力されます。HTTPリクエストを設定します。
- URL Path:
/ - Method:
POST
最終的なリクエストURLはコネクターのURLとこのパスを組み合わせて構築されます。
- URL Path:
MQTTメッセージデータをHTTPサーバーに送信するためのRequest Bodyを設定します。
json{ "topic": "${topic}", "payload": ${payload}, "clientid": "${clientid}", "qos": ${qos}, "timestamp": ${timestamp} }テンプレート内の変数はルールSQLで選択したフィールドから値が埋め込まれます。
フォールバックアクション(任意):メッセージ送信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
Createをクリックする前に、Test Connectivityを押してSinkがHTTPサーバーに接続できるか確認できます。
CreateをクリックしてSinkの設定を完了します。新しいSinkはCreate RuleページのAction Outputsセクションに表示されます。
Rule IDを入力します。システムがランダム生成するか、自分で任意に定義できます(任意)。例:
my_rule。SQLエディターに以下のSQL文を入力します。
bashSELECT * FROM "t/#"このルールは
"t/#"以下のトピックにパブリッシュされたすべてのMQTTメッセージにマッチします。TIP
独自のSQL文を指定する場合は、Sinkで必要なすべてのフィールドが
SELECT句に含まれていることを確認してください。ルール設定を確認後、Saveをクリックしてルールを生成します。
ルール作成後、t/#以下のトピックにパブリッシュされたメッセージはルールで処理され、設定したHTTPサーバーに転送されます。
また、Integration -> Flow DesignerでルールとHTTPサーバーSinkのデータフロートポロジーを確認できます。
ルールのテスト
MQTTXを使い、トピック
t/1にメッセージを送信してオンライン/オフラインイベントをトリガーします。bashmqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'ダッシュボードのRuleページでルール名をクリックし統計情報を確認します。新しい受信メッセージ数と送信メッセージ数が1件ずつ増えていれば、HTTPサーバーSinkによるメッセージ処理と転送が成功しています。
HTTPサーバーがリクエストを受信していることを確認します。
PythonのHTTPサーバーが起動中であれば、ターミナルに以下のような出力が表示されます。
textpython3 http_server.py * Serving Flask app 'http_server' * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:5000 (Press CTRL+C to quit) got post request: b'{"topic":"t/1","payload":{"msg":"hello HTTP Server"},"clientid":"emqx_c","qos":0,"timestamp":1700000000000}'表示された内容は、EMQXがMQTTメッセージをJSON形式でHTTPサーバーに転送したことを示しています。リクエストボディのフィールドはSinkのリクエストボディテンプレートで設定した変数に対応しています。