Skip to content

メッセージ変換

メッセージ変換は、ユーザー定義のルールに基づいてメッセージを変更およびフォーマットし、その後の処理やサブスクライバーへの配信を行う前に適用されます。この機能はスキーマを用いた動的なメッセージ変換に基づいており、高度にカスタマイズ可能で、複数のエンコーディング形式や高度な変換をサポートし、複雑な下流のデータ処理ニーズに対応します。

ワークフロー

メッセージがパブリッシュされると、以下のワークフローが実行されます。

  1. スキーマ検証:メッセージがパブリッシュされ認可を通過すると、まずスキーマ検証が行われます。メッセージが検証を通過した場合、次のステップに進みます。

  2. メッセージ変換パイプライン

    • 変換マッチング:メッセージは、そのトピックに基づいてユーザー定義の変換リストと照合されます。異なるトピックやトピックフィルターに対して複数の変換を設定可能です。
    • 変換実行:マッチした変換は設定された順序で実行されます。パイプラインはJSON、Protobuf、Avroなどの各種エンコーダー・デコーダーをサポートし、variform式を用いてメッセージの拡張や変更が可能です。
    • 変換後処理:メッセージが変換パイプラインを正常に通過すると、ルールエンジンのトリガーやサブスクライバーへのメッセージ配信など、次の処理に進みます。
  3. 失敗時の処理:変換が失敗した場合、ユーザー設定のアクションが実行されます。

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合は、PUBACKで特定の理由コード(131 - Implementation Specific Error)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
    • 無視:追加のアクションは行いません。

    変換失敗時には、設定されたアクションに関わらずログエントリが生成されます。ログの出力レベルはユーザーが設定可能で、デフォルトはwarningです。

    さらに、変換失敗はルールエンジンイベント($events/message_transformation_failed)をトリガーでき、ユーザーは誤ったメッセージを別のトピックに再パブリッシュしたり、Kafkaへ送信して詳細分析を行うなどのカスタム処理を実装可能です。

メッセージ変換ルールの作成

Smart Data Hubをご契約の場合、左メニューのSmart Data Hub -> Message Transformをクリックしてメッセージ変換ページにアクセスできます。このページでメッセージ変換ルールを作成可能です。

  1. メッセージ変換ページで右上の**+ New**ボタンをクリックし、新規メッセージ変換ページに入り、以下の設定を行います。

    • 名前:メッセージ変換ルールを識別するための名前を入力します。

    • 説明:任意。メッセージ変換ルールの説明を追加します。

    • メッセージソースタイプ:変換対象のメッセージのトピックを設定します。複数のトピックやトピックフィルターを設定可能です。

    • メッセージフォーマット変換

      • ソースフォーマット:変換パイプラインに入る際に適用される有効なペイロードのデコード形式を指定します。None(デコードなし)、JSONAvroProtobufのいずれかを選択可能です。これらの形式はバイナリの入力ペイロードを構造化マッピングに変換します。AvroProtobufを選択した場合は、事前にスキーマレジストリでスキーマを定義しておく必要があります。複数の変換ルールを含むパイプラインでは、各ステップでデコードを繰り返す必要はありません。例えば、変換T1で既にペイロードをデコードしていれば、後続の変換T2はデコードをスキップして正しくフォーマットされたペイロードを直接利用できます。
      • ターゲットフォーマット:変換パイプライン完了時に最終的なメッセージペイロードをバイナリ値に変換するエンコード形式を指定します。エンコードの選択肢はソースフォーマットと同じく、None(エンコードなし)、JSONAvroProtobufです。パイプラインの最後の変換のみがペイロードをバイナリとしてエンコードすればよく、中間の変換はバイナリエンコードを行う必要はありません。

      TIP

      ソースフォーマットとターゲットフォーマットが同じ場合、メッセージフォーマット変換は行われません。Avro/Protobuf形式とNoneは相互に変換できません。フォーマット変換の詳細ルールはメッセージフォーマット変換設定ルールを参照してください。

    • メッセージプロパティ変換

      • プロパティ:変換後の値(式で生成)を書き込む対象を指定します。対象はpayloadtopicqosretain(対応するフラグを設定)、およびuser_property(MQTTのUser-Property属性用)が有効です。

        user_propertyを使用する場合は、具体的なキーを指定する必要があります(例:user_property.my_custom_prop)。

        payloadはそのままメッセージペイロード全体を上書きするか、ネストしたJSONオブジェクトとして扱うためにネストされたキーのパスを指定できます(例:payload.x.y)。

      • ターゲット値:指定したプロパティに書き込む値を定義します。この値はqosretaintopicpayloadpayload.x.yなど他のフィールドからコピーするか、variform式を使って生成可能です。

      TIP

      設定例はメッセージプロパティ変換設定ルールを参照してください。

    • 変換失敗時の操作

      • 失敗後のアクション:変換失敗時に実行するアクションを選択します。
        • メッセージ破棄:パブリッシュ処理を終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返します。
        • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
        • 無視:追加のアクションは行いません。
      • ログレベル:デプロイログにおける検証失敗時のログ出力レベルを設定します。warningまたはerrorが選択可能です。
  2. (任意)作成前にプレビューボタンをクリックすると、変換をプレビューできます。ダイアログが開き、QoS、ペイロード、リテインフラグの有無、パブリッシャーのユーザー名やクライアントIDなどのコンテキスト情報を入力可能です。必要事項を入力後、Run Transformationをクリックすると指定したコンテキストで変換を実行し、結果を確認できます。

  3. Confirmをクリックして設定を完了します。

作成した変換ルールはデフォルトで有効化され、メッセージ変換リストに表示されます。作成済みのメッセージ変換はメッセージ変換ルールの管理に従って操作可能です。

メッセージフォーマット変換設定ルール

(設定可能)、(設定不可)

ソースフォーマット \ ターゲットフォーマットJSONAvroProtobufNone
JSON
Avro
Protobuf
None

メッセージプロパティ変換設定ルール

(すべてのプロパティ設定をサポート)、(設定不可)

user_propertyは全体として設定できませんが、個別のuser_propertyサブプロパティは操作可能です。

ソースフォーマット \ ターゲットフォーマットJSONAvroProtobufNone
JSON
Avropayloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能
Protobufpayloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能
Nonepayloadを直接設定可能、サブプロパティの追加は不可payloadを直接設定可能、サブプロパティの追加は不可

メッセージ変換ルールの管理

メッセージ変換ルール一覧ページで以下の管理操作が可能です。

  • メッセージ変換ルールの有効化/無効化:Enabled列のトグルボタンをクリックします。
  • メッセージ変換ルールの編集:Actions列の「編集」アイコンをクリックし、編集ページで修正後保存します。
  • メッセージ変換ルールの削除:Actions列の「その他」アイコンをクリックし、ドロップダウンメニューから削除を選択して削除を確定します。
  • メッセージ変換ルールの順序変更:リストの行をドラッグして並び替えるか、Actions列のその他オプションメニューにあるクイック移動ボタンを使用します。

統計メトリクスの表示

メッセージ変換ルール一覧で名前をクリックすると詳細ページに入り、メッセージ変換実行に関する統計情報を確認できます。

  • 統計
    • マッチ数:ルール有効化以降にメッセージ変換ルールがトリガーされた合計回数。
    • 成功数:成功したメッセージ変換の回数。
    • 失敗数:失敗したメッセージ変換の回数。
  • レート指標
    • 現在のレート:直近1分間の変換レート。
    • 直近5分のレート:直近5分間の変換レート。
    • 最大レート:過去の最高変換レート。