Skip to content

MQTTデータをHTTPサーバーに取り込む

EMQX CloudのHTTPサービスデータ統合は、EMQXと外部HTTPサービスを迅速に連携させる方法を提供します。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も備えています。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知、アラート通知、データ統合などのシナリオを実現します。

本ページでは、HTTPサービスデータ統合の機能的特徴を詳しく紹介し、HTTPサーバーコネクターの作成、ルールの作成、ルールのテストなど、データ統合の作成に関する実践的なガイダンスを提供します。

動作の仕組み

HTTPサービスデータ統合はEMQX Cloudに標準搭載された機能で、外部サービスとの連携を簡単に設定できます。HTTPサービスを使うことで、ユーザーは好みのプログラミング言語やフレームワークでコードを書き、カスタムで柔軟かつ複雑なデータ処理ロジックを実装できます。

http frame

EMQX Cloudは、設定されたデータ統合を通じてデバイスのイベントやデータをHTTPサービスに転送し、以下のワークフローで動作します。

  1. デバイスがEMQX Cloudに接続:IoTデバイスが正常に接続すると、デバイスID、送信元IPアドレスなどの情報を含むオンラインイベントが発生します。
  2. デバイスがメッセージをパブリッシュ:デバイスはテレメトリや状態データをEMQX Cloudに報告し、MQTTプロトコルを通じて特定のトピックにメッセージをパブリッシュしてルールをトリガーします。
  3. ルールエンジンがメッセージを処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
  4. HTTPサービスへ送信:ルールのトリガーによりメッセージがHTTPサービスイベントに送信されます。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダー、ボディ、URLを動的に構築することで、外部サービスとの柔軟なデータ連携を実現します。

イベントやメッセージデータがHTTPサービスに送信された後は、以下のように柔軟に処理できます。

  • デバイスの状態更新やイベント記録を実装し、データに基づくデバイス管理システムを開発する。
  • メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現する。
  • ルールSQLでフィルタリングされた異常データに対しては、HTTPサービス経由でアラート通知システムを直接呼び出し、デバイス異常監視を行う。

特徴と利点

EMQX CloudのHTTPサービス統合を利用することで、以下のようなビジネス上の利点があります。

  • より多くの下流システムへデータを連携:HTTPサービスはMQTTデータを分析プラットフォームやクラウドサービスなど、より多くの外部システムに簡単に統合でき、多システム間のデータ配信を実現します。
  • リアルタイムな応答と業務プロセスのトリガー:HTTPサービスを通じて外部システムがMQTTデータをリアルタイムに受信し、業務プロセスを迅速に起動できます。例えばアラームデータ受信による業務ワークフローのトリガーなどです。
  • カスタムデータ処理:外部システムは受信データを必要に応じて二次処理でき、より複雑な業務ロジックを実装可能で、EMQXの機能に制約されません。
  • 疎結合な連携:HTTPサービスはシンプルなHTTPインターフェースを利用し、疎結合なシステム連携方式を提供します。

まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様で豊かなアプリケーション開発のニーズに応えます。

はじめる前に

このセクションでは、EMQX CloudでHTTPサービスデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワークの設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

簡単なHTTPサービスのセットアップ

以下の例を使って簡単なWebサーバーを作成します。

python
from http.server import HTTPServer, BaseHTTPRequestHandler

class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
       self.send_response(200)
       self.end_headers()
       self.wfile.write(b'Hello, world!')

    def do_POST(self):
       content_length = int(self.headers['Content-Length'])
       body = self.rfile.read(content_length)
       print("Received POST request with body: " + str(body))
       self.send_response(201)
       self.end_headers()

httpd = HTTPServer(('0.0.0.0', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()

HTTPサーバーコネクターの作成

データ統合ルールを作成する前に、HTTPサービスにアクセスするためのHTTPサーバーコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
  2. 初めてコネクターを作成する場合は、Webサービスカテゴリの下にあるHTTPサーバーを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてWebサービスカテゴリの下のHTTPサーバーを選択します。
  3. 新規コネクターページで以下のオプションを設定します。
    • コネクター名:システムが自動でコネクター名を生成しますが、自分で名前を付けることも可能です。本例ではmy_httpserverを使用できます。
    • URL:Webサービスがネットワーク経由で正常にアクセス可能なURLを入力します。
    • その他の設定はデフォルト値を使用します。必要に応じてHTTPリクエストヘッダーのキーと値を設定できます。
  4. テストボタンをクリックして接続をテストします。Webサービスにアクセス可能であれば成功のメッセージが返されます。
  5. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをHTTPサーバーに転送するためのアクションをルールに追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールマッチング用のSQL文を入力します。以下のSQL例は、temp_hum/emqxトピックに送信されたメッセージから、メッセージの報告時間、クライアントID、メッセージ本文(ペイロード)の温度と湿度を読み取ります。

    sql
    SELECT 
    
    timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum
    
    FROM
    
    "temp_hum/emqx"

    Try It Out機能でデータ入力をシミュレートし、結果をテストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します。

    • アクション名:システムが自動でアクション名を生成しますが、自分で名前を付けることも可能です。

    • メソッド:HTTPリクエストメソッドとしてPOSTを選択します。

    • URLパス:アクション専用のリクエストパスを設定できます。これはコネクターのURL設定に追加され、完全なURLを構成します。このオプションでは変数を含むテンプレートが利用可能です。まずルールSQLで関連変数を定義します(例:select clientid as device_id)。その後、HTTPアクションのURLパスに${device_id}のように変数を組み込めます。

    • ヘッダー:アクション固有のリクエストヘッダーを設定するか、コネクターで設定したヘッダーをそのまま使用できます。

    • ボディ:リクエストボディとして以下のメッセージ内容テンプレートを設定し、ルールから出力されたフィールドをアクションのリクエストボディに追加します。

      json
      {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
  6. 確認ボタンをクリックしてルール作成を完了します。

  7. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがHTTPサービスに転送されているかを確認します。

    bash
    py server.py
    
    Received POST request with body: b'[\n "temp": "27.5",\n "hum": "41.8"\n)127.0.0.1 - -[18/Dec/2023 14:50:44]"POST  HTTP/1.1" 201 -
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計情報を閲覧できます。