Skip to content

MQTTデータをHTTPサーバーに取り込む

EMQXプラットフォームのHTTPサービスデータ統合は、EMQXと外部HTTPサービスを迅速に連携させるための機能です。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も提供します。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知やアラート通知、データ統合などのシナリオを実現します。

本ページでは、HTTPサービスデータ統合の機能的特徴を詳しく紹介し、HTTPサーバーコネクターの作成、ルールの作成、ルールのテストなど、データ統合作成の実践的な手順を案内します。

動作の仕組み

HTTPサービスデータ統合はEMQXプラットフォームに標準搭載された機能で、外部サービスとの連携を簡単に設定できます。HTTPサービスを利用することで、ユーザーは好みのプログラミング言語やフレームワークでコードを書き、カスタムかつ柔軟で複雑なデータ処理ロジックを実装可能です。

http frame

EMQXプラットフォームは、設定されたデータ統合を通じてデバイスのイベントやデータをHTTPサービスに転送します。ワークフローは以下の通りです。

  1. デバイスがEMQXプラットフォームに接続:IoTデバイスが正常に接続すると、オンラインイベントがトリガーされ、デバイスID、送信元IPアドレスなどの情報が含まれます。
  2. デバイスがメッセージをパブリッシュ:デバイスはテレメトリや状態データをEMQXプラットフォームに報告し、MQTTプロトコルを介して特定のトピックにメッセージをパブリッシュすることでルールをトリガーします。
  3. ルールエンジンがメッセージを処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
  4. HTTPサービスに送信:ルールのトリガーによりメッセージがHTTPサービスイベントに送信されます。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダーやボディ、URLを動的に構築することで、外部サービスとの柔軟なデータ連携を実現します。

イベントやメッセージデータがHTTPサービスに送信された後は、以下のように柔軟に処理可能です。

  • デバイスの状態更新やイベント記録を実装し、データに基づくデバイス管理システムを開発する。
  • メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現する。
  • ルールSQLでフィルタリングした異常データをHTTPサービス経由で直接アラート通知システムに呼び出し、デバイス異常監視を行う。

特徴と利点

EMQXプラットフォームのHTTPサービス統合を利用することで、以下のようなビジネス上の利点があります。

  • より多くの下流システムへデータを渡せる:HTTPサービスはMQTTデータを分析プラットフォームやクラウドサービスなど、より多くの外部システムに簡単に統合し、多システム間のデータ分配を実現します。
  • リアルタイムな応答と業務プロセスのトリガー:HTTPサービスを通じて外部システムはMQTTデータをリアルタイムに受け取り、業務プロセスをトリガーして迅速な対応を可能にします。例えば、アラームデータを受けて業務ワークフローを起動するなどです。
  • カスタムなデータ処理:外部システムは受信データを必要に応じて二次処理でき、より複雑な業務ロジックを実装可能で、EMQXの機能に制限されません。
  • 疎結合な統合:HTTPサービスはシンプルなHTTPインターフェースを利用し、疎結合なシステム統合手法を提供します。

まとめると、HTTPサービスはリアルタイムで柔軟かつカスタマイズ可能なデータ統合機能を提供し、柔軟で豊富なアプリケーション開発のニーズに応えます。

はじめる前に

このセクションでは、EMQXプラットフォームでHTTPサービスデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

簡単なHTTPサービスのセットアップ

以下の例を使って、簡単なWebサーバーを作成します。

python
from http.server import HTTPServer, BaseHTTPRequestHandler

class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
       self.send_response(200)
       self.end_headers()
       self.wfile.write(b'Hello, world!')

    def do_POST(self):
       content_length = int(self.headers['Content-Length'])
       body = self.rfile.read(content_length)
       print("Received POST request with body: " + str(body))
       self.send_response(201)
       self.end_headers()

httpd = HTTPServer(('0.0.0.0', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()

HTTPサーバーコネクターの作成

データ統合ルールを作成する前に、HTTPサービスにアクセスするためのHTTPサーバーコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
  2. 初めてコネクターを作成する場合は、Webサービスカテゴリの下にあるHTTPサーバーを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてWebサービスカテゴリの下にあるHTTPサービスを選択します。
  3. 新規コネクターページで以下のオプションを設定します。
    • コネクター名:システムが自動的にコネクター名を生成しますが、自分で名前を付けることも可能です。本例ではmy_httpserverを使用できます。
    • URL:Webサービスにネットワーク経由で正常にアクセスできるURLを入力します。
    • その他の設定はデフォルト値を使用します。必要に応じてHTTPリクエストヘッダーのキーと値を設定できます。
  4. テストボタンをクリックして接続をテストします。Webサービスにアクセス可能であれば成功のメッセージが返されます。
  5. 新規ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをHTTPサーバーに転送するアクションをルールに追加するためのルールを作成します。

  1. ルールエリアで新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールマッチング用のSQL文を入力します。以下のSQL例は、temp_hum/emqxトピックに送信されたメッセージから、報告時間、クライアントID、メッセージ本文(ペイロード)、および本文内の温度と湿度を読み取ります。

    sql
    SELECT 
    
    timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum
    
    FROM
    
    "temp_hum/emqx"

    テストを有効にする機能を使って、データ入力をシミュレートし結果をテストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します。

    • アクション名:システムが自動的にアクション名を生成しますが、自分で名前を付けることも可能です。

    • メソッド:HTTPリクエストメソッドとしてPOSTを選択します。

    • URLパス:アクション用に別のリクエストパスを設定可能で、コネクターのURL設定に追加されて完全なURLが作成されます。このオプションは変数を使ったテンプレートも利用可能です。まず、ルールSQLで関連変数を定義します(例:select clientid as device_id)。その後、HTTPアクションのURLパスに${device_id}のように変数を組み込めます。

    • ヘッダー:アクション固有のリクエストヘッダーを設定するか、コネクターで設定済みのヘッダーをそのまま使用できます。

    • ボディ:以下のメッセージ内容テンプレートのように、ルールから出力されたフィールドをアクションのリクエストボディに追加して設定します。

      json
      {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
  6. 確認ボタンをクリックしてルールの作成を完了します。

  7. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

温度・湿度データの報告をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがHTTPサービスに転送されているか確認します。

    bash
    py server.py
    
    Received POST request with body: b'[\n "temp": "27.5",\n "hum": "41.8"\n)127.0.0.1 - -[18/Dec/2023 14:50:44]"POST  HTTP/1.1" 201 -
  3. コンソールで稼働データを確認します。ルール一覧でルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計情報を閲覧できます。