スキーマレジストリの例 - 外部HTTPサーバー
このページでは、スキーマレジストリとルールエンジンが、カスタムロジックを持つ外部HTTPサーバーを利用してメッセージのエンコードおよびデコードをサポートする方法を示します。
特定のシナリオでは、EMQXプラットフォームがネイティブにサポートしていないカスタムのエンコードやデコードロジックを適用する必要がある場合があります。EMQXプラットフォームでは、ルール内の schema_encode
および schema_decode
関数を通じて外部HTTPサービスを呼び出し、この処理を委譲できます。
外部HTTP API仕様
EMQXプラットフォームの schema_encode
および schema_decode
関数と連携するカスタム外部HTTP APIを実装するには、外部HTTPサーバーはEMQXプラットフォームからのエンコードまたはデコード要求を処理する単一の POST
エンドポイントを提供する必要があります。
リクエスト形式
リクエストボディは以下のフィールドを持つJSONオブジェクトです:
payload
:Base64エンコードされた文字列。ルールエンジン内のschema_encode
またはschema_decode
関数に渡される値です。type
:評価される関数に応じて、encode
またはdecode
の文字列。schema_name
:EMQXプラットフォームで設定されたこの外部HTTPスキーマの名前を識別する文字列。opts
:EMQXプラットフォームで設定可能な任意の文字列で、追加オプションとしてHTTPサーバーにそのまま渡されます。
レスポンス形式
- サーバーはHTTPステータスコード
200
で応答する必要があります。 - レスポンスボディは結果を表すBase64エンコードされた文字列を含む必要があります。このBase64値はEMQXに返す際にさらにJSONエンコードしてはいけません。
利用例
例えば、デバイスがバイナリメッセージをパブリッシュし、そのペイロードをカスタムのXOR演算でエンコードまたはデコードしたい場合を考えます。このセクションでは、簡単な外部HTTPサービスを構築し、EMQXプラットフォームにカスタムのエンコード・デコードロジックを統合する方法を示します。
外部HTTPサービスの構築
以下の例は、PythonとFlaskを使ってシンプルなHTTPサーバーを作成・実行する方法を示します。このサーバーはBase64エンコードされたデータを受け取り、デコードしたペイロードにXOR演算を適用します。
外部HTTPサーバーのサンプルコード
Flaskがインストールされていることを確認してください:
pip install Flask==3.1.0
サンプルコード:
from flask import Flask, request
import base64
app = Flask(__name__)
@app.route("/serde", methods=['POST'])
def serde():
# 入力ペイロードはBase64エンコードされています
body = request.get_json(force=True)
print("incoming request:", body)
payload64 = body.get("payload")
payload = base64.b64decode(payload64)
secret = 122
response = bytes(b ^ secret for b in payload)
# レスポンスもBase64エンコードする必要があります
response64 = base64.b64encode(response)
return response64
サーバーの起動方法:
# サーバーが `myapp.py` というファイルと同じディレクトリにあることを想定しています
flask --app myapp --debug run -h 0.0.0.0 -p 9500
EMQXプラットフォームでの外部HTTPスキーマ作成
デプロイメントにアクセスし、左メニューから Smart Data Hub -> Schema Registry を選択します。
Internal タブページで New をクリックします。
以下のパラメータで外部HTTPサーバースキーマを作成します:
- Name:
myhttp
- Type:
External HTTP
- URL:サーバーが稼働している完全なURI。例:
http://server:9500/serde
- Name:
Confirm をクリックします。
スキーマを適用するルールの作成
EMQXプラットフォームのルールエンジンを使い、メッセージのエンコード・デコードにスキーマを適用するルールを作成します。
デプロイメントで左メニューから Integration -> Rules を選択します。
すでにコネクターやルールを作成している場合は、Rules エリアの New Rule をクリックして New Rule ページに入ります。未作成の場合は、Date Forward カテゴリから Republish を選択します。
以下のSQL文を使って先ほど作成したスキーマを利用します:
sqlSELECT schema_encode('myhttp', payload) as encoded, schema_decode('myhttp', encoded) as decoded FROM "t/external_http"
schema_encode('myhttp', payload)
とschema_decode('myhttp', encoded)
はどちらも設定した外部HTTPサーバーを呼び出し、指定されたペイロードのエンコード/デコードを行います。Next をクリックして Add Action (Sink) ページに進み、Connector ドロップダウンから
Republish
を選択します。TIP
ステップ2で Republish を選択している場合は、再度コネクターを選択する必要はありません。
Topic フィールドに送信先トピックとして
external_http/out
を入力します。Payload フィールドにメッセージコンテンツのテンプレートとして
${.}
を入力します。Confirm をクリックしてアクションをルールに追加し、Back to Rules をクリックします。
このアクションにより、デコードされたメッセージがJSON形式でトピック
external_http/out
に送信されます。${.}
はルールの出力全体の値に実行時に置き換わる変数プレースホルダーです。
ルール実行結果の確認
デプロイメントの左メニューから Online Test を選択します。
MQTTクライアントとしてデプロイメントに接続するための接続情報を設定します。
- 認証を設定している場合は、Username と Password を入力し、Connect をクリックします。
- 認証を設定していない場合は、Connect with Auto-Generated Authentication を選択します。
Subscriptions エリアの Topic フィールドに
external_http/out
と入力し、Subscribe をクリックします。Messages エリアの Topic フィールドに
t/external_http
と入力し、任意のペイロードを記入して Publish をクリックします。トピック
external_http/out
のメッセージが受信されることを確認します。例えば、ペイロードがhello
の場合:json{"encoded":"\u0012\u001F\u0016\u0016\u0015","decoded":"hello"}