Skip to content

スキーマレジストリの例 - 外部HTTPサーバー

このページでは、スキーマレジストリとルールエンジンが、カスタムロジックを持つ外部HTTPサーバーを利用してメッセージのエンコードおよびデコードをサポートする方法を示します。

特定のシナリオでは、EMQXプラットフォームがネイティブにサポートしていないカスタムのエンコードやデコードロジックを適用する必要がある場合があります。EMQXプラットフォームでは、ルール内の schema_encode および schema_decode 関数を通じて外部HTTPサービスを呼び出し、この処理を委譲できます。

外部HTTP API仕様

EMQXプラットフォームの schema_encode および schema_decode 関数と連携するカスタム外部HTTP APIを実装するには、外部HTTPサーバーはEMQXプラットフォームからのエンコードまたはデコード要求を処理する単一の POST エンドポイントを提供する必要があります。

リクエスト形式

リクエストボディは以下のフィールドを持つJSONオブジェクトです:

  • payload:Base64エンコードされた文字列。ルールエンジン内の schema_encode または schema_decode 関数に渡される値です。
  • type:評価される関数に応じて、encode または decode の文字列。
  • schema_name:EMQXプラットフォームで設定されたこの外部HTTPスキーマの名前を識別する文字列。
  • opts:EMQXプラットフォームで設定可能な任意の文字列で、追加オプションとしてHTTPサーバーにそのまま渡されます。

レスポンス形式

  • サーバーはHTTPステータスコード 200 で応答する必要があります。
  • レスポンスボディは結果を表すBase64エンコードされた文字列を含む必要があります。このBase64値はEMQXに返す際にさらにJSONエンコードしてはいけません。

利用例

例えば、デバイスがバイナリメッセージをパブリッシュし、そのペイロードをカスタムのXOR演算でエンコードまたはデコードしたい場合を考えます。このセクションでは、簡単な外部HTTPサービスを構築し、EMQXプラットフォームにカスタムのエンコード・デコードロジックを統合する方法を示します。

外部HTTPサービスの構築

以下の例は、PythonとFlaskを使ってシンプルなHTTPサーバーを作成・実行する方法を示します。このサーバーはBase64エンコードされたデータを受け取り、デコードしたペイロードにXOR演算を適用します。

外部HTTPサーバーのサンプルコード

Flaskがインストールされていることを確認してください:

sh
pip install Flask==3.1.0

サンプルコード:

python
from flask import Flask, request
import base64

app = Flask(__name__)

@app.route("/serde", methods=['POST'])
def serde():
    # 入力ペイロードはBase64エンコードされています
    body = request.get_json(force=True)
    print("incoming request:", body)
    payload64 = body.get("payload")
    payload = base64.b64decode(payload64)
    secret = 122
    response = bytes(b ^ secret for b in payload)
    # レスポンスもBase64エンコードする必要があります
    response64 = base64.b64encode(response)
    return response64

サーバーの起動方法:

sh
# サーバーが `myapp.py` というファイルと同じディレクトリにあることを想定しています
flask --app myapp --debug run -h 0.0.0.0 -p 9500

EMQXプラットフォームでの外部HTTPスキーマ作成

  1. デプロイメントにアクセスし、左メニューから Smart Data Hub -> Schema Registry を選択します。

  2. Internal タブページで New をクリックします。

  3. 以下のパラメータで外部HTTPサーバースキーマを作成します:

    • Namemyhttp
    • TypeExternal HTTP
    • URL:サーバーが稼働している完全なURI。例:http://server:9500/serde
  4. Confirm をクリックします。

スキーマを適用するルールの作成

EMQXプラットフォームのルールエンジンを使い、メッセージのエンコード・デコードにスキーマを適用するルールを作成します。

  1. デプロイメントで左メニューから Integration -> Rules を選択します。

  2. すでにコネクターやルールを作成している場合は、Rules エリアの New Rule をクリックして New Rule ページに入ります。未作成の場合は、Date Forward カテゴリから Republish を選択します。

  3. 以下のSQL文を使って先ほど作成したスキーマを利用します:

    sql
    SELECT
      schema_encode('myhttp', payload) as encoded,
      schema_decode('myhttp', encoded) as decoded
    FROM
      "t/external_http"

    schema_encode('myhttp', payload)schema_decode('myhttp', encoded) はどちらも設定した外部HTTPサーバーを呼び出し、指定されたペイロードのエンコード/デコードを行います。

  4. Next をクリックして Add Action (Sink) ページに進み、Connector ドロップダウンから Republish を選択します。

    TIP

    ステップ2で Republish を選択している場合は、再度コネクターを選択する必要はありません。

  5. Topic フィールドに送信先トピックとして external_http/out を入力します。

  6. Payload フィールドにメッセージコンテンツのテンプレートとして ${.} を入力します。

  7. Confirm をクリックしてアクションをルールに追加し、Back to Rules をクリックします。

    このアクションにより、デコードされたメッセージがJSON形式でトピック external_http/out に送信されます。${.} はルールの出力全体の値に実行時に置き換わる変数プレースホルダーです。

ルール実行結果の確認

  1. デプロイメントの左メニューから Online Test を選択します。

  2. MQTTクライアントとしてデプロイメントに接続するための接続情報を設定します。

    • 認証を設定している場合は、UsernamePassword を入力し、Connect をクリックします。
    • 認証を設定していない場合は、Connect with Auto-Generated Authentication を選択します。
  3. Subscriptions エリアの Topic フィールドに external_http/out と入力し、Subscribe をクリックします。

  4. Messages エリアの Topic フィールドに t/external_http と入力し、任意のペイロードを記入して Publish をクリックします。

  5. トピック external_http/out のメッセージが受信されることを確認します。例えば、ペイロードが hello の場合:

    json
    {"encoded":"\u0012\u001F\u0016\u0016\u0015","decoded":"hello"}