MQTTデータをHTTPサーバーに取り込む
EMQX CloudのHTTPサービスデータ統合は、EMQXと外部HTTPサービスを迅速に連携させる方法を提供します。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も備えています。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知、アラート通知、データ統合などのシナリオを実現します。
本ページでは、HTTPサービスデータ統合の機能的特徴を詳しく紹介し、HTTPサーバーコネクターの作成、ルールの作成、ルールのテストなど、データ統合の作成に関する実践的なガイダンスを提供します。
動作の仕組み
HTTPサービスデータ統合はEMQX Cloudに標準搭載された機能で、外部サービスとの連携を簡単に設定できます。HTTPサービスを使うことで、ユーザーは好みのプログラミング言語やフレームワークでコードを書き、カスタムで柔軟かつ複雑なデータ処理ロジックを実装できます。

EMQX Cloudは、設定されたデータ統合を通じてデバイスのイベントやデータをHTTPサービスに転送し、以下のワークフローで動作します。
- デバイスがEMQX Cloudに接続:IoTデバイスが正常に接続すると、デバイスID、送信元IPアドレスなどの情報を含むオンラインイベントが発生します。
- デバイスがメッセージをパブリッシュ:デバイスはテレメトリや状態データをEMQX Cloudに報告し、MQTTプロトコルを通じて特定のトピックにメッセージをパブリッシュしてルールをトリガーします。
- ルールエンジンがメッセージを処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
- HTTPサービスへ送信:ルールのトリガーによりメッセージがHTTPサービスイベントに送信されます。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダー、ボディ、URLを動的に構築することで、外部サービスとの柔軟なデータ連携を実現します。
イベントやメッセージデータがHTTPサービスに送信された後は、以下のように柔軟に処理できます。
- デバイスの状態更新やイベント記録を実装し、データに基づくデバイス管理システムを開発する。
- メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現する。
- ルールSQLでフィルタリングされた異常データに対しては、HTTPサービス経由でアラート通知システムを直接呼び出し、デバイス異常監視を行う。
特徴と利点
EMQX CloudのHTTPサービス統合を利用することで、以下のようなビジネス上の利点があります。
- より多くの下流システムへデータを連携:HTTPサービスはMQTTデータを分析プラットフォームやクラウドサービスなど、より多くの外部システムに簡単に統合でき、多システム間のデータ配信を実現します。
- リアルタイムな応答と業務プロセスのトリガー:HTTPサービスを通じて外部システムがMQTTデータをリアルタイムに受信し、業務プロセスを迅速に起動できます。例えばアラームデータ受信による業務ワークフローのトリガーなどです。
- カスタムデータ処理:外部システムは受信データを必要に応じて二次処理でき、より複雑な業務ロジックを実装可能で、EMQXの機能に制約されません。
- 疎結合な連携:HTTPサービスはシンプルなHTTPインターフェースを利用し、疎結合なシステム連携方式を提供します。
まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様で豊かなアプリケーション開発のニーズに応えます。
はじめる前に
このセクションでは、EMQX CloudでHTTPサービスデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワークの設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
簡単なHTTPサービスのセットアップ
以下の例を使って簡単なWebサーバーを作成します。
from http.server import HTTPServer, BaseHTTPRequestHandler
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b'Hello, world!')
def do_POST(self):
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
print("Received POST request with body: " + str(body))
self.send_response(201)
self.end_headers()
httpd = HTTPServer(('0.0.0.0', 8080), SimpleHTTPRequestHandler)
httpd.serve_forever()HTTPサーバーコネクターの作成
データ統合ルールを作成する前に、HTTPサービスにアクセスするためのHTTPサーバーコネクターを作成する必要があります。
- デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
- 初めてコネクターを作成する場合は、Webサービスカテゴリの下にあるHTTPサーバーを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてWebサービスカテゴリの下のHTTPサーバーを選択します。
- 新規コネクターページで以下のオプションを設定します。
- コネクター名:システムが自動でコネクター名を生成しますが、自分で名前を付けることも可能です。本例では
my_httpserverを使用できます。 - URL:Webサービスがネットワーク経由で正常にアクセス可能なURLを入力します。
- その他の設定はデフォルト値を使用します。必要に応じてHTTPリクエストヘッダーのキーと値を設定できます。
- コネクター名:システムが自動でコネクター名を生成しますが、自分で名前を付けることも可能です。本例では
- テストボタンをクリックして接続をテストします。Webサービスにアクセス可能であれば成功のメッセージが返されます。
- 新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをHTTPサーバーに転送するためのアクションをルールに追加します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールマッチング用のSQL文を入力します。以下のSQL例は、
temp_hum/emqxトピックに送信されたメッセージから、メッセージの報告時間、クライアントID、メッセージ本文(ペイロード)の温度と湿度を読み取ります。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"Try It Out機能でデータ入力をシミュレートし、結果をテストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。
以下の情報を設定します。
アクション名:システムが自動でアクション名を生成しますが、自分で名前を付けることも可能です。
メソッド:HTTPリクエストメソッドとして
POSTを選択します。URLパス:アクション専用のリクエストパスを設定できます。これはコネクターのURL設定に追加され、完全なURLを構成します。このオプションでは変数を含むテンプレートが利用可能です。まずルールSQLで関連変数を定義します(例:
select clientid as device_id)。その後、HTTPアクションのURLパスに${device_id}のように変数を組み込めます。ヘッダー:アクション固有のリクエストヘッダーを設定するか、コネクターで設定したヘッダーをそのまま使用できます。
ボディ:リクエストボディとして以下のメッセージ内容テンプレートを設定し、ルールから出力されたフィールドをアクションのリクエストボディに追加します。
json{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがHTTPサービスに転送されているかを確認します。
bashpy server.py Received POST request with body: b'[\n "temp": "27.5",\n "hum": "41.8"\n)127.0.0.1 - -[18/Dec/2023 14:50:44]"POST HTTP/1.1" 201 -コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計情報を閲覧できます。