MQTTデータをHTTPサーバーに取り込む
HTTPサーバーデータ統合は、EMQXを外部HTTPサービスと迅速に連携させるための機能です。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も提供します。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知、アラート通知、データ統合などのシナリオに対応可能です。
本ページでは、HTTPサーバーデータ統合の機能と特徴を詳しく解説し、HTTPサーバーデータ統合の設定方法について実践的な手順を紹介します。
TIP
ルールを使ったデータ処理が不要でHTTPサービスとの連携のみを行いたい場合は、より簡単で使いやすいWebhookの利用を推奨します。
動作の仕組み
HTTPサーバーデータ統合はEMQXに標準搭載された機能で、簡単な設定で外部HTTPサービスと連携できます。HTTPサービス側では、好みのプログラミング言語やフレームワークでコードを記述し、柔軟かつ複雑なデータ処理ロジックを実装可能です。

EMQXはルールエンジンとSinkを介してデバイスのイベントやメッセージをHTTPサーバーに転送します。ワークフローは以下の通りです。
- デバイスがEMQXに接続: IoTデバイスが正常に接続すると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
- デバイスがメッセージをパブリッシュ: デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンがトリガーされます。
- ルールエンジンがメッセージを処理: ルールエンジンはトピックフィルターに基づいてメッセージをマッチングし、フィールドのフィルタリングやデータ形式の変換、追加コンテキストによるメッセージの拡充などの設定済みルールで処理します。
- HTTPサーバーへのブリッジング: ルールは処理済みメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。リクエストヘッダー、リクエストボディ、URLはルールの出力から動的に構築可能です。
イベントやメッセージデータがHTTPサーバーに送信された後は、以下のような柔軟な処理が行えます。
- デバイス管理システムでのデバイス状態更新やイベント記録
- メッセージデータのデータベースへの書き込みと保存
- SQLルールで検出された異常データに基づくアラートや通知システムのトリガー
特徴とメリット
EMQXのHTTPサーバー統合を利用することで、以下のようなビジネス上の利点があります。
- より多くの下流システムへのデータ配信拡張: HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携でき、複数システム間でのデータ分配が容易になります。
- リアルタイム応答と業務プロセスのトリガー: HTTPサービスを介して外部システムはMQTTデータをリアルタイムに受信し、業務プロセスを迅速に起動可能です。例としてアラートデータ受信による業務フローのトリガーがあります。
- カスタムデータ処理: 外部システム側で受信データに対し二次処理を行い、EMQXの機能に制限されない複雑なビジネスロジックを実装できます。
- 疎結合な連携: HTTPサービスはシンプルなHTTPインターフェースを使用し、システム間の疎結合な連携を実現します。
まとめると、HTTPサービスはリアルタイムで柔軟かつカスタマイズ可能なデータ統合機能を提供し、多様で柔軟なアプリケーション開発ニーズに応えます。
はじめる前に
このセクションでは、HTTPサーバーデータ統合の作成を開始する前に必要な準備事項について説明します。簡単なHTTPサーバーのセットアップも含みます。
前提条件
簡単なHTTPサーバーのセットアップ
- Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスは
POST /リクエストを受け取り、リクエスト内容を表示した後に200 OKを返します。
from flask import Flask, json, request
api = Flask(__name__)
@api.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
api.run()- 上記コードを
http_server.pyとして保存し、以下のコマンドでサーバーを起動します。
pip install flask
python3 http_server.pyコネクターの作成
このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。
- ダッシュボードの左メニューから Integration -> Connector をクリックします。
- ページ右上の Create をクリックします。
- コネクタータイプとして HTTP Server を選択し、Next をクリックします。
- コネクター名を入力します。名前は英数字の組み合わせとしてください。例:
httpserver - URL にHTTPサーバーのアドレスを設定します。例:
http://localhost:5000 - その他の設定はデフォルトのままにします。
- 詳細設定(任意):詳細はSinkの特徴を参照してください。
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがHTTPサーバーに接続できるか確認できます。
- Createをクリックしてコネクターの設定を完了します。
コネクター作成後、ルールをこのコネクターで作成するかどうかのダイアログが表示されます。
- Create Rule をクリックするとルール作成ページに直接移動し、連携設定を続行できます。
- Back To Connector List をクリックするとコネクター一覧に戻り、後で Integration -> Rules からルールを作成できます。
ここでは、Create Rule をクリックして続行します。
HTTPサーバーSinkを使ったルールの作成
このセクションでは、ルールを作成しHTTPサーバーSinkを設定してMQTTメッセージをHTTPサーバーに送信する方法を説明します。
Create Ruleをクリックすると自動的にCreate Ruleページに遷移し、HTTPサーバーSinkの設定用のAction paneが表示され、先ほど作成したコネクターが選択済みの状態になります。
Type of Action と Action は自動で
HTTP ServerとCreate Actionに設定され、新規Sinkが作成されます。Sinkの名前と説明を入力します。Connector は先ほど作成した
httpserverが自動入力されます。HTTPリクエストを設定します。
- URL Path:
/ - Method:
POST
最終的なリクエストURLはコネクターのURLとこのパスを組み合わせて構築されます。
- URL Path:
MQTTメッセージデータをHTTPサーバーに送信するためのRequest Bodyを設定します。
json{ "topic": "${topic}", "payload": ${payload}, "clientid": "${clientid}", "qos": ${qos}, "timestamp": ${timestamp} }テンプレート内の変数はルールSQLで選択されたフィールドから値が埋め込まれます。
フォールバックアクション(任意): メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてSinkがHTTPサーバーに接続できるか確認できます。
CreateをクリックしてSinkの設定を完了します。新しいSinkはCreate RuleページのAction Outputsセクションに表示されます。
Rule IDを入力します。システムがランダム生成するか、任意に指定可能です(例:
my_rule)。SQL Editorに以下のSQL文を入力します。
bashSELECT * FROM "t/#"このルールは、
t/#配下のトピックにパブリッシュされたすべてのMQTTメッセージにマッチします。TIP
独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドを
SELECT句に含めていることを確認してください。ルール設定を確認後、Saveをクリックしてルールを生成します。
ルール作成後、t/#配下のトピックにパブリッシュされたメッセージはルールで処理され、設定したHTTPサーバーに転送されます。
また、Integration -> Flow DesignerからルールとHTTPサーバーSinkのデータフロートポロジーを確認できます。
ルールのテスト
MQTTXを使ってトピック
t/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。bashmqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'ダッシュボードのRuleページでルール名をクリックし、統計情報を確認します。メトリクスに新しい受信メッセージと送信メッセージが1件ずつ表示されていれば、HTTPサーバーSinkによるメッセージ処理と転送が成功しています。
HTTPサーバーがリクエストを受信していることを確認します。
Python HTTPサーバーが起動中であれば、ターミナルに以下のような出力が表示されます。
textpython3 http_server.py * Serving Flask app 'http_server' * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:5000 (Press CTRL+C to quit) got post request: b'{"topic":"t/1","payload":{"msg":"hello HTTP Server"},"clientid":"emqx_c","qos":0,"timestamp":1700000000000}'表示された内容は、EMQXがMQTTメッセージをJSON形式でHTTPサーバーに転送したことを示しています。リクエストボディのフィールドはSinkのリクエストボディテンプレートで設定した変数に対応しています。