Skip to content

MQTTデータをHTTPサーバーに取り込む

HTTPサーバーデータ統合は、EMQXを外部のHTTPサービスと迅速に連携させる方法を提供します。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も備えています。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知やアラート通知、データ連携などのシナリオを実現します。

本ページでは、HTTPサーバーとのデータ統合の機能と特徴を詳しく解説し、HTTPサーバーデータ統合の設定方法について実践的なガイドを提供します。

TIP

HTTPサービスとの連携が必要で、ルールによるデータ処理が不要な場合は、よりシンプルで使いやすいWebhookの利用を推奨します。

動作の仕組み

HTTPサーバーデータ統合はEMQXの標準機能で、簡単な設定で外部HTTPサービスと連携できます。HTTPサービス側では、好みのプログラミング言語やフレームワークでコードを書き、柔軟かつ複雑なデータ処理ロジックを実装可能です。

HTTPサーバーとのEMQX統合

EMQXはルールエンジンとSinkを通じてデバイスのイベントやデータをHTTPサーバーに転送します。ワークフローは以下の通りです:

  1. デバイスがEMQXに接続:IoTデバイスが正常に接続されると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
  2. デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
  3. ルールエンジンがメッセージを処理:組み込みのルールエンジンがトピックマッチングに基づき、特定のソースからのMQTTメッセージやイベントを処理します。ルールは対応する処理を行い、データ形式の変換や特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを実施します。
  4. HTTPサーバーへの転送:設定されたルールが処理済みのメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダーやボディ、URLを動的に構築でき、外部サービスとの柔軟な連携を実現します。

イベントやメッセージデータがHTTPサーバーに送信された後は、以下のような柔軟な処理が可能です:

  • デバイス状態の更新やイベントログの実装により、デバイス管理システムの構築に活用。
  • メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現。
  • SQLルールでフィルタリングした異常データをHTTPサービスで直接アラート通知システムに連携し、デバイス異常監視を実現。

特徴とメリット

EMQXのHTTPサーバー統合を利用することで、以下のようなメリットがあります:

  • より多くの下流システムへのデータ配信拡張:HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携でき、複数システム間でのデータ分配を容易にします。
  • リアルタイム応答と業務プロセスのトリガー:HTTPサービスを介して外部システムがMQTTデータをリアルタイムに受け取り、業務プロセスをトリガー可能。例えばアラートデータの受信で業務フローを即座に開始できます。
  • カスタムデータ処理:外部システム側で受信データの二次処理が可能で、EMQXの機能に制約されない複雑な業務ロジックを実装できます。
  • 疎結合な連携:HTTPサービスはシンプルなHTTPインターフェースを使うため、システム連携を疎結合に実現します。

まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様なアプリケーション開発ニーズに応えます。

はじめる前に

このセクションでは、HTTPサーバーデータ統合を作成する前に必要な準備について説明します。簡単なHTTPサーバーのセットアップも含みます。

前提条件

簡単なHTTPサーバーのセットアップ

  1. Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスはPOST /リクエストを受け取り、リクエスト内容を表示した後に200 OKを返します:
bash
from flask import Flask, json, request

api = Flask(__name__)

@api.route('/', methods=['POST'])
def print_messages():
  reply= {"result": "ok", "message": "success"}
  print("got post request: ", request.get_data())
  return json.dumps(reply), 200

if __name__ == '__main__':
  api.run()
  1. 上記コードをhttp_server.pyというファイル名で保存し、以下のコマンドでサーバーを起動します:
shell
pip install flask

python3 http_server.py

コネクターの作成

このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。

  1. EMQXダッシュボードにアクセスし、Integration -> Connectorをクリックします。

  2. ページ右上のCreateをクリックし、HTTP Serverを選択してNextをクリックします。

  3. コネクター名を入力します。名前は英数字の組み合わせとし、例としてmy_httpserverなどが適切です。

  4. URLhttp://localhost:5000を設定します。その他の項目はデフォルトのままで問題ありません。

  5. 詳細設定(任意):詳細はSinkの機能を参照してください。

  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがHTTPサーバーに接続できるか確認できます。

  7. Createをクリックしてコネクターの作成を完了します。

これでHTTPサーバーコネクターが作成されました。次に、ルールとSinkを作成してHTTPサーバーに書き込むデータを指定します。

HTTPサーバーSink付きルールの作成

このセクションでは、HTTPサーバーSinkを追加したルールの作成方法を説明します。

  1. EMQXダッシュボードにアクセスし、Integration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力し、SQL Editorでルールを設定します。

  4. 以下のSQL文を例としてSQL Editorに入力します。これはトピックt/#配下のMQTTメッセージをHTTPサーバーに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT句に含めていることを確認してください。

    bash
    SELECT
      *
    FROM
      "t/#"
  5. + Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。Type of ActionのドロップダウンリストからHTTP Serverを選択し、EMQXがルールで処理したデータをHTTPサーバーに送信するようにします。

    ActionのドロップダウンはCreate Actionのままにするか、既存のHTTPサーバーアクションを選択しても構いません。この例では新しいSinkを作成してルールに追加します。

  6. NameDescriptionテキストボックスにSinkの名前と説明を入力します。

  7. Connectorのドロップダウンから先ほど作成したmy-httpserverを選択します。新しいコネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメータの詳細はコネクターの作成を参照してください。

  8. URLhttp://localhost:5000を設定し、MethodドロップダウンからPOSTを選択します。その他の項目はデフォルトのままで問題ありません。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。

  10. CreateボタンをクリックしてSinkの設定を完了します。Create Ruleページに戻ると、Action Outputsタブに新しいSinkが表示されます。

  11. Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。

これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいHTTPサーバーSinkが確認できます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleで解析されHTTPサーバーに送信・保存されていることが確認できます。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'

Ruleページでルール名をクリックし、統計情報を確認します。Sinkの稼働状況をチェックし、新しい受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

メッセージがHTTPサーバーに送信されているかを確認します:

python3 http_server.py
 * Serving Flask app 'http_server' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:5000 (Press CTRL+C to quit)

got post request:  b'hello HTTP Server'