Skip to content

メッセージ変換

メッセージ変換は、ユーザー定義のルールに基づいてメッセージを修正およびフォーマットし、その後の処理やサブスクライバーへの配信前に適用する機能です。この機能は高度にカスタマイズ可能で、複数のエンコーディングや高度な変換をサポートしています。

ワークフロー

メッセージがパブリッシュされると、以下のワークフローを経ます。

  1. スキーマ検証:メッセージがパブリッシュされ認可を通過すると、まずスキーマ検証が行われます。メッセージが検証に合格すると次のステップに進みます。

  2. メッセージ変換パイプライン

    • 変換マッチング:メッセージのトピックに基づいて、ユーザー定義の変換リストと照合されます。異なるトピックやトピックフィルターに対して複数の変換を設定可能です。
    • 変換実行:マッチした変換が設定順に実行されます。パイプラインはJSON、Protobuf、Avroなどの各種エンコーダー・デコーダーをサポートし、Variform式を用いてメッセージの拡張や修正が可能です。
    • 変換後処理:メッセージが変換パイプラインを正常に通過すると、ルールエンジンのトリガーやサブスクライバーへのメッセージ配信などの次の処理に進みます。
  3. 失敗時の処理:変換が失敗した場合、ユーザー設定に応じたアクションが実行されます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで特定の理由コード(131 - 実装固有エラー)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
    • 無視:追加の処理は行いません。

    変換失敗時には、設定に関わらずログエントリが生成される場合があります。ログの出力レベルはユーザーが設定可能で、デフォルトはwarningです。さらに、変換失敗はルールエンジンのイベント($events/message_transformation/failed)をトリガーでき、誤ったメッセージを別トピックに再パブリッシュしたり、Kafkaへ送信して解析するなどのカスタム処理が可能です。

ユーザーガイド

このセクションでは、メッセージ変換機能の設定方法とテスト方法を説明します。

ダッシュボードでのメッセージ変換設定

ダッシュボードでメッセージ変換を作成・設定する手順を示します。

  1. ダッシュボードにアクセスし、左側のナビゲーションメニューから Smart Data Hub -> Message Transform をクリックします。

  2. Message Transform ページ右上の Create をクリックします。

  3. 「Create Message Transform」ページで以下の情報を設定します。

    • Name:変換の名前を入力します。

    • Message Source Topic:変換対象のメッセージが送られるトピックを設定します。複数のトピックやトピックフィルターを設定可能です。

    • Note(任意):メモを入力します。

    • Message Format Transformation

      • Source Format:変換パイプラインに入るメッセージに適用するペイロードデコーダーを指定します。選択肢は以下の通りです。

        • None(デコードなし)
        • JSON
        • Avro
        • Protobuf
        • Custom (External HTTP)

        これらのデコーダーはバイナリの入力ペイロードを構造化されたマップに変換します。AvroProtobufCustom (External HTTP)を選択する場合は、あらかじめスキーマレジストリで作成済みである必要があります。

        複数の変換が連なるパイプラインでは、各ステップでのデコードは必須ではありません。例えば、変換T1で既にペイロードをデコードしていれば、後続の変換T2はデコードをスキップし、正しい形式のペイロードを利用できます。

      • Target Format:変換パイプラインの最後に最終的なメッセージペイロードをバイナリ値としてエンコードするためのペイロードエンコーダーを指定します。エンコーダーの選択肢はSource Formatと同じです。

        パイプラインの最後の変換のみがバイナリエンコードを行う必要があり、中間の変換はバイナリエンコードを処理する必要はありません。

    • Message Properties Transformation

      • Properties:式の結果として得られた変換後の値を書き込む先を指定します。有効な書き込み先はpayloadtopicqosretain(対応するフラグを設定)、およびuser_property(MQTTのUser-Propertyプロパティ用)です。user_propertyを使用する場合は、必ず1つのキーを指定してください(例:user_property.my_custom_prop)。payloadはそのまま使ってメッセージペイロード全体を上書きするか、特定のネストされたキーのパスを指定してJSONオブジェクトのように扱うことも可能です(例:payload.x.y)。
      • Target Value:設定したプロパティに書き込む値を定義します。この値はqosretaintopicpayloadpayload.x.yなどのフィールドからコピーするか、variform式を使って生成できます。
    • Transformation Failure Operation

      • Action After Failure:変換失敗時に実行するアクションを選択します。

        • Drop Message:パブリッシュ処理を終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで特定の理由コードを返します。
        • Disconnect and Drop Message:メッセージを破棄し、パブリッシュしたクライアントを切断します。
        • Ignore:追加の処理は行いません。
    • Output Logs:変換失敗時にログを生成するか選択します。ログはデフォルトで有効です。

    • Logs Level:ログの出力レベルを設定します。デフォルトはwarningです。

  4. 設定が完了したら Create をクリックします。

作成前に変換をテストするには、Preview をクリックします。新しいペインが開き、QoS、ペイロード、retainフラグの有無、パブリッシャーのユーザー名やクライアントIDなどのメッセージコンテキストを入力できます。必要な情報を入力後、Execute Transformation をクリックすると指定したコンテキストで変換を実行し、結果を確認できます。

作成した変換は、メッセージ変換ページのリストにデフォルトで有効状態で表示されます。必要に応じて無効化したり、Actions列のSettingsをクリックして設定を更新できます。削除や順序変更はMoreから行えます。

設定ファイルでのメッセージ変換設定

Avro形式でエンコードされたメッセージを受信し、それをJSONにデコードしたいとします。デコード後、パブリッシュしたクライアントのクライアント属性から取得したtenant属性をトピックの先頭に付加してからルールエンジンで処理したい場合、以下の設定で実現可能です。

hocon
message_transformation {
  transformations = [
    {
      name = mytransformation
      topics = ["t"]
      failure_action = drop
      payload_decoder = {type = avro, schema = myschema}
      payload_encoder = {type = json}
      operations = [
        {key = "topic", value = "concat([client_attrs.tenant, '/', topic])"}
      ]
    }
  ]
}

この設定は、mytransformationという名前の変換を指定し、

  • 指定したスキーマを使ってAvro形式のペイロードをデコードし、
  • ペイロードをJSON形式にエンコードし、
  • クライアント属性のtenantと元のトピックを連結してトピックを変更しています。

詳細な設定方法は設定マニュアルをご参照ください。

REST API

REST APIを通じたメッセージ変換の利用方法の詳細は、EMQX Enterprise APIをご覧ください。

デコード/エンコード用スキーマの作成

デコーダーおよびエンコーダースキーマの作成方法については、スキーマレジストリのセクションを参照してください。

統計と指標

メッセージ変換を有効にすると、ダッシュボードで統計情報や指標を確認できます。メッセージ変換ページで変換名をクリックすると、以下の情報が表示されます。

統計情報

  • Total:システム起動以降のトリガー総数
  • Success:成功したデータ変換の数
  • Failed:失敗したデータ変換の数

レート指標

  • 現在の変換速度
  • 過去5分間の速度
  • 過去の最大速度

統計はリセット可能で、Prometheusの/prometheus/message_transformationからも取得可能です。