MQTTデータをHTTPサーバーに取り込む
HTTPサーバーデータ統合は、EMQXを外部のHTTPサービスと迅速に連携させる方法を提供します。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も備えています。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知やアラート通知、データ連携などのシナリオを実現します。
本ページでは、HTTPサーバーとのデータ統合の機能と特徴を詳しく解説し、HTTPサーバーデータ統合の設定方法について実践的なガイドを提供します。
TIP
HTTPサービスとの連携が必要で、ルールによるデータ処理が不要な場合は、よりシンプルで使いやすいWebhookの利用を推奨します。
動作の仕組み
HTTPサーバーデータ統合はEMQXの標準機能で、簡単な設定で外部HTTPサービスと連携できます。HTTPサービス側では、好みのプログラミング言語やフレームワークでコードを書き、柔軟かつ複雑なデータ処理ロジックを実装可能です。

EMQXはルールエンジンとSinkを通じてデバイスのイベントやデータをHTTPサーバーに転送します。ワークフローは以下の通りです:
- デバイスがEMQXに接続:IoTデバイスが正常に接続されると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
- デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
- ルールエンジンがメッセージを処理:組み込みのルールエンジンがトピックマッチングに基づき、特定のソースからのMQTTメッセージやイベントを処理します。ルールは対応する処理を行い、データ形式の変換や特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを実施します。
- HTTPサーバーへの転送:設定されたルールが処理済みのメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダーやボディ、URLを動的に構築でき、外部サービスとの柔軟な連携を実現します。
イベントやメッセージデータがHTTPサーバーに送信された後は、以下のような柔軟な処理が可能です:
- デバイス状態の更新やイベントログの実装により、デバイス管理システムの構築に活用。
- メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現。
- SQLルールでフィルタリングした異常データをHTTPサービスで直接アラート通知システムに連携し、デバイス異常監視を実現。
特徴とメリット
EMQXのHTTPサーバー統合を利用することで、以下のようなメリットがあります:
- より多くの下流システムへのデータ配信拡張:HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携でき、複数システム間でのデータ分配を容易にします。
- リアルタイム応答と業務プロセスのトリガー:HTTPサービスを介して外部システムがMQTTデータをリアルタイムに受け取り、業務プロセスをトリガー可能。例えばアラートデータの受信で業務フローを即座に開始できます。
- カスタムデータ処理:外部システム側で受信データの二次処理が可能で、EMQXの機能に制約されない複雑な業務ロジックを実装できます。
- 疎結合な連携:HTTPサービスはシンプルなHTTPインターフェースを使うため、システム連携を疎結合に実現します。
まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様なアプリケーション開発ニーズに応えます。
はじめる前に
このセクションでは、HTTPサーバーデータ統合を作成する前に必要な準備について説明します。簡単なHTTPサーバーのセットアップも含みます。
前提条件
簡単なHTTPサーバーのセットアップ
- Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスは
POST /
リクエストを受け取り、リクエスト内容を表示した後に200 OK
を返します:
from flask import Flask, json, request
api = Flask(__name__)
@api.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
api.run()
- 上記コードを
http_server.py
というファイル名で保存し、以下のコマンドでサーバーを起動します:
pip install flask
python3 http_server.py
コネクターの作成
このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。
EMQXダッシュボードにアクセスし、Integration -> Connectorをクリックします。
ページ右上のCreateをクリックし、HTTP Serverを選択してNextをクリックします。
コネクター名を入力します。名前は英数字の組み合わせとし、例として
my_httpserver
などが適切です。URLに
http://localhost:5000
を設定します。その他の項目はデフォルトのままで問題ありません。詳細設定(任意):詳細はSinkの機能を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてコネクターがHTTPサーバーに接続できるか確認できます。
Createをクリックしてコネクターの作成を完了します。
これでHTTPサーバーコネクターが作成されました。次に、ルールとSinkを作成してHTTPサーバーに書き込むデータを指定します。
HTTPサーバーSink付きルールの作成
このセクションでは、HTTPサーバーSinkを追加したルールの作成方法を説明します。
EMQXダッシュボードにアクセスし、Integration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDに
my_rule
を入力し、SQL Editorでルールを設定します。以下のSQL文を例としてSQL Editorに入力します。これはトピック
t/#
配下のMQTTメッセージをHTTPサーバーに保存することを意味します。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT
句に含めていることを確認してください。bashSELECT * FROM "t/#"
+ Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。Type of Actionのドロップダウンリストから
HTTP Server
を選択し、EMQXがルールで処理したデータをHTTPサーバーに送信するようにします。Actionのドロップダウンは
Create Action
のままにするか、既存のHTTPサーバーアクションを選択しても構いません。この例では新しいSinkを作成してルールに追加します。NameとDescriptionテキストボックスにSinkの名前と説明を入力します。
Connectorのドロップダウンから先ほど作成した
my-httpserver
を選択します。新しいコネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメータの詳細はコネクターの作成を参照してください。URLに
http://localhost:5000
を設定し、MethodドロップダウンからPOST
を選択します。その他の項目はデフォルトのままで問題ありません。フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。
CreateボタンをクリックしてSinkの設定を完了します。Create Ruleページに戻ると、Action Outputsタブに新しいSinkが表示されます。
Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。
これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいHTTPサーバーSinkが確認できます。
また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#
配下のメッセージがルールmy_rule
で解析されHTTPサーバーに送信・保存されていることが確認できます。
ルールのテスト
MQTTXを使ってトピックt/1
にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'
Ruleページでルール名をクリックし、統計情報を確認します。Sinkの稼働状況をチェックし、新しい受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。
メッセージがHTTPサーバーに送信されているかを確認します:
python3 http_server.py
* Serving Flask app 'http_server' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:5000 (Press CTRL+C to quit)
got post request: b'hello HTTP Server'