MQTTデータをHTTPサーバーに取り込む
EMQXプラットフォームのHTTPサービスデータ統合は、EMQXと外部HTTPサービスを迅速に連携させるための機能です。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も提供します。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知やアラート通知、データ統合などのシナリオを実現します。
本ページでは、HTTPサービスデータ統合の機能的特徴を詳しく紹介し、HTTPサーバーコネクターの作成、ルールの作成、ルールのテストなど、データ統合作成の実践的な手順を案内します。
動作の仕組み
HTTPサービスデータ統合はEMQXプラットフォームに標準搭載された機能で、外部サービスとの連携を簡単に設定できます。HTTPサービスを利用することで、ユーザーは好みのプログラミング言語やフレームワークでコードを書き、カスタムかつ柔軟で複雑なデータ処理ロジックを実装可能です。
EMQXプラットフォームは、設定されたデータ統合を通じてデバイスのイベントやデータをHTTPサービスに転送します。ワークフローは以下の通りです。
- デバイスがEMQXプラットフォームに接続:IoTデバイスが正常に接続すると、オンラインイベントがトリガーされ、デバイスID、送信元IPアドレスなどの情報が含まれます。
- デバイスがメッセージをパブリッシュ:デバイスはテレメトリや状態データをEMQXプラットフォームに報告し、MQTTプロトコルを介して特定のトピックにメッセージをパブリッシュすることでルールをトリガーします。
- ルールエンジンがメッセージを処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
- HTTPサービスに送信:ルールのトリガーによりメッセージがHTTPサービスイベントに送信されます。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダーやボディ、URLを動的に構築することで、外部サービスとの柔軟なデータ連携を実現します。
イベントやメッセージデータがHTTPサービスに送信された後は、以下のように柔軟に処理可能です。
- デバイスの状態更新やイベント記録を実装し、データに基づくデバイス管理システムを開発する。
- メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現する。
- ルールSQLでフィルタリングした異常データをHTTPサービス経由で直接アラート通知システムに呼び出し、デバイス異常監視を行う。
特徴と利点
EMQXプラットフォームのHTTPサービス統合を利用することで、以下のようなビジネス上の利点があります。
- より多くの下流システムへデータを渡せる:HTTPサービスはMQTTデータを分析プラットフォームやクラウドサービスなど、より多くの外部システムに簡単に統合し、多システム間のデータ分配を実現します。
- リアルタイムな応答と業務プロセスのトリガー:HTTPサービスを通じて外部システムはMQTTデータをリアルタイムに受け取り、業務プロセスをトリガーして迅速な対応を可能にします。例えば、アラームデータを受けて業務ワークフローを起動するなどです。
- カスタムなデータ処理:外部システムは受信データを必要に応じて二次処理でき、より複雑な業務ロジックを実装可能で、EMQXの機能に制限されません。
- 疎結合な統合:HTTPサービスはシンプルなHTTPインターフェースを利用し、疎結合なシステム統合手法を提供します。
まとめると、HTTPサービスはリアルタイムで柔軟かつカスタマイズ可能なデータ統合機能を提供し、柔軟で豊富なアプリケーション開発のニーズに応えます。
はじめる前に
このセクションでは、EMQXプラットフォームでHTTPサービスデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
簡単なHTTPサービスのセットアップ
以下の例を使って、簡単なWebサーバーを作成します。
from http.server import HTTPServer, BaseHTTPRequestHandler
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b'Hello, world!')
def do_POST(self):
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
print("Received POST request with body: " + str(body))
self.send_response(201)
self.end_headers()
httpd = HTTPServer(('0.0.0.0', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()
HTTPサーバーコネクターの作成
データ統合ルールを作成する前に、HTTPサービスにアクセスするためのHTTPサーバーコネクターを作成する必要があります。
- デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
- 初めてコネクターを作成する場合は、Webサービスカテゴリの下にあるHTTPサーバーを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてWebサービスカテゴリの下にあるHTTPサービスを選択します。
- 新規コネクターページで以下のオプションを設定します。
- コネクター名:システムが自動的にコネクター名を生成しますが、自分で名前を付けることも可能です。本例では
my_httpserver
を使用できます。 - URL:Webサービスにネットワーク経由で正常にアクセスできるURLを入力します。
- その他の設定はデフォルト値を使用します。必要に応じてHTTPリクエストヘッダーのキーと値を設定できます。
- コネクター名:システムが自動的にコネクター名を生成しますが、自分で名前を付けることも可能です。本例では
- テストボタンをクリックして接続をテストします。Webサービスにアクセス可能であれば成功のメッセージが返されます。
- 新規ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをHTTPサーバーに転送するアクションをルールに追加するためのルールを作成します。
ルールエリアで新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールマッチング用のSQL文を入力します。以下のSQL例は、
temp_hum/emqx
トピックに送信されたメッセージから、報告時間、クライアントID、メッセージ本文(ペイロード)、および本文内の温度と湿度を読み取ります。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
テストを有効にする機能を使って、データ入力をシミュレートし結果をテストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。
以下の情報を設定します。
アクション名:システムが自動的にアクション名を生成しますが、自分で名前を付けることも可能です。
メソッド:HTTPリクエストメソッドとして
POST
を選択します。URLパス:アクション用に別のリクエストパスを設定可能で、コネクターのURL設定に追加されて完全なURLが作成されます。このオプションは変数を使ったテンプレートも利用可能です。まず、ルールSQLで関連変数を定義します(例:
select clientid as device_id
)。その後、HTTPアクションのURLパスに${device_id}
のように変数を組み込めます。ヘッダー:アクション固有のリクエストヘッダーを設定するか、コネクターで設定済みのヘッダーをそのまま使用できます。
ボディ:以下のメッセージ内容テンプレートのように、ルールから出力されたフィールドをアクションのリクエストボディに追加して設定します。
json{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
確認ボタンをクリックしてルールの作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
温度・湿度データの報告をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがHTTPサービスに転送されているか確認します。
bashpy server.py Received POST request with body: b'[\n "temp": "27.5",\n "hum": "41.8"\n)127.0.0.1 - -[18/Dec/2023 14:50:44]"POST HTTP/1.1" 201 -
コンソールで稼働データを確認します。ルール一覧でルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計情報を閲覧できます。