Skip to content

MQTTデータをHTTPサーバーに取り込む

HTTPサーバーデータ統合は、EMQXを外部HTTPサービスと迅速に連携させるための機能です。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も提供します。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知、アラート通知、データ統合などのシナリオに対応可能です。

本ページでは、HTTPサーバーデータ統合の機能と特徴を詳しく解説し、HTTPサーバーデータ統合の設定方法について実践的な手順を紹介します。

TIP

ルールを使ったデータ処理が不要でHTTPサービスとの連携のみを行いたい場合は、より簡単で使いやすいWebhookの利用を推奨します。

動作の仕組み

HTTPサーバーデータ統合はEMQXに標準搭載された機能で、簡単な設定で外部HTTPサービスと連携できます。HTTPサービス側では、好みのプログラミング言語やフレームワークでコードを記述し、柔軟かつ複雑なデータ処理ロジックを実装可能です。

emqx-integration-http

EMQXはルールエンジンとSinkを介してデバイスのイベントやメッセージをHTTPサーバーに転送します。ワークフローは以下の通りです。

  1. デバイスがEMQXに接続: IoTデバイスが正常に接続すると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
  2. デバイスがメッセージをパブリッシュ: デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンがトリガーされます。
  3. ルールエンジンがメッセージを処理: ルールエンジンはトピックフィルターに基づいてメッセージをマッチングし、フィールドのフィルタリングやデータ形式の変換、追加コンテキストによるメッセージの拡充などの設定済みルールで処理します。
  4. HTTPサーバーへのブリッジング: ルールは処理済みメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。リクエストヘッダー、リクエストボディ、URLはルールの出力から動的に構築可能です。

イベントやメッセージデータがHTTPサーバーに送信された後は、以下のような柔軟な処理が行えます。

  • デバイス管理システムでのデバイス状態更新やイベント記録
  • メッセージデータのデータベースへの書き込みと保存
  • SQLルールで検出された異常データに基づくアラートや通知システムのトリガー

特徴とメリット

EMQXのHTTPサーバー統合を利用することで、以下のようなビジネス上の利点があります。

  • より多くの下流システムへのデータ配信拡張: HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携でき、複数システム間でのデータ分配が容易になります。
  • リアルタイム応答と業務プロセスのトリガー: HTTPサービスを介して外部システムはMQTTデータをリアルタイムに受信し、業務プロセスを迅速に起動可能です。例としてアラートデータ受信による業務フローのトリガーがあります。
  • カスタムデータ処理: 外部システム側で受信データに対し二次処理を行い、EMQXの機能に制限されない複雑なビジネスロジックを実装できます。
  • 疎結合な連携: HTTPサービスはシンプルなHTTPインターフェースを使用し、システム間の疎結合な連携を実現します。

まとめると、HTTPサービスはリアルタイムで柔軟かつカスタマイズ可能なデータ統合機能を提供し、多様で柔軟なアプリケーション開発ニーズに応えます。

はじめる前に

このセクションでは、HTTPサーバーデータ統合の作成を開始する前に必要な準備事項について説明します。簡単なHTTPサーバーのセットアップも含みます。

前提条件

簡単なHTTPサーバーのセットアップ

  1. Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスはPOST /リクエストを受け取り、リクエスト内容を表示した後に200 OKを返します。
bash
from flask import Flask, json, request

api = Flask(__name__)

@api.route('/', methods=['POST'])
def print_messages():
  reply= {"result": "ok", "message": "success"}
  print("got post request: ", request.get_data())
  return json.dumps(reply), 200

if __name__ == '__main__':
  api.run()
  1. 上記コードをhttp_server.pyとして保存し、以下のコマンドでサーバーを起動します。
shell
pip install flask

python3 http_server.py

コネクターの作成

このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。

  1. ダッシュボードの左メニューから Integration -> Connector をクリックします。
  2. ページ右上の Create をクリックします。
  3. コネクタータイプとして HTTP Server を選択し、Next をクリックします。
  4. コネクター名を入力します。名前は英数字の組み合わせとしてください。例: httpserver
  5. URL にHTTPサーバーのアドレスを設定します。例: http://localhost:5000
  6. その他の設定はデフォルトのままにします。
  7. 詳細設定(任意):詳細はSinkの特徴を参照してください。
  8. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがHTTPサーバーに接続できるか確認できます。
  9. Createをクリックしてコネクターの設定を完了します。

コネクター作成後、ルールをこのコネクターで作成するかどうかのダイアログが表示されます。

  • Create Rule をクリックするとルール作成ページに直接移動し、連携設定を続行できます。
  • Back To Connector List をクリックするとコネクター一覧に戻り、後で Integration -> Rules からルールを作成できます。

ここでは、Create Rule をクリックして続行します。

HTTPサーバーSinkを使ったルールの作成

このセクションでは、ルールを作成しHTTPサーバーSinkを設定してMQTTメッセージをHTTPサーバーに送信する方法を説明します。

Create Ruleをクリックすると自動的にCreate Ruleページに遷移し、HTTPサーバーSinkの設定用のAction paneが表示され、先ほど作成したコネクターが選択済みの状態になります。

  1. Type of ActionAction は自動で HTTP ServerCreate Action に設定され、新規Sinkが作成されます。

  2. Sinkの名前と説明を入力します。Connector は先ほど作成した httpserver が自動入力されます。

  3. HTTPリクエストを設定します。

    • URL Path: /
    • Method: POST

    最終的なリクエストURLはコネクターのURLとこのパスを組み合わせて構築されます。

  4. MQTTメッセージデータをHTTPサーバーに送信するためのRequest Bodyを設定します。

    json
    {
      "topic": "${topic}",
      "payload": ${payload},
      "clientid": "${clientid}",
      "qos": ${qos},
      "timestamp": ${timestamp}
    }

    テンプレート内の変数はルールSQLで選択されたフィールドから値が埋め込まれます。

  5. フォールバックアクション(任意): メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  6. Createをクリックする前に、Test ConnectivityをクリックしてSinkがHTTPサーバーに接続できるか確認できます。

  7. CreateをクリックしてSinkの設定を完了します。新しいSinkはCreate RuleページのAction Outputsセクションに表示されます。

  8. Rule IDを入力します。システムがランダム生成するか、任意に指定可能です(例: my_rule)。

  9. SQL Editorに以下のSQL文を入力します。

    bash
    SELECT
      *
    FROM
      "t/#"

    このルールは、t/#配下のトピックにパブリッシュされたすべてのMQTTメッセージにマッチします。

    TIP

    独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドをSELECT句に含めていることを確認してください。

  10. ルール設定を確認後、Saveをクリックしてルールを生成します。

ルール作成後、t/#配下のトピックにパブリッシュされたメッセージはルールで処理され、設定したHTTPサーバーに転送されます。

また、Integration -> Flow DesignerからルールとHTTPサーバーSinkのデータフロートポロジーを確認できます。

ルールのテスト

  1. MQTTXを使ってトピック t/1 にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

    bash
    mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'
  2. ダッシュボードのRuleページでルール名をクリックし、統計情報を確認します。メトリクスに新しい受信メッセージと送信メッセージが1件ずつ表示されていれば、HTTPサーバーSinkによるメッセージ処理と転送が成功しています。

  3. HTTPサーバーがリクエストを受信していることを確認します。

    Python HTTPサーバーが起動中であれば、ターミナルに以下のような出力が表示されます。

    text
    python3 http_server.py
     * Serving Flask app 'http_server'
     * Environment: production
       WARNING: This is a development server. Do not use it in a production deployment.
       Use a production WSGI server instead.
     * Debug mode: off
     * Running on http://127.0.0.1:5000 (Press CTRL+C to quit)
    
    got post request:  b'{"topic":"t/1","payload":{"msg":"hello HTTP Server"},"clientid":"emqx_c","qos":0,"timestamp":1700000000000}'

    表示された内容は、EMQXがMQTTメッセージをJSON形式でHTTPサーバーに転送したことを示しています。リクエストボディのフィールドはSinkのリクエストボディテンプレートで設定した変数に対応しています。