Skip to content

メッセージ変換

メッセージ変換は、ユーザー定義のルールに基づいてメッセージを修正およびフォーマットし、その後の処理やサブスクライバーへの配信前に適用する機能です。この機能はスキーマを用いた動的なメッセージ変換に基づいており、高度にカスタマイズ可能で、複数のエンコーディング形式や高度な変換をサポートし、複雑な下流データ処理のニーズに対応します。

ワークフロー

メッセージがパブリッシュされると、以下のワークフローを経ます:

  1. スキーマ検証:メッセージがパブリッシュされ認可を通過すると、まずスキーマ検証が行われます。検証に合格した場合、次のステップに進みます。

  2. メッセージ変換パイプライン

    • 変換マッチング:メッセージはトピックに基づいてユーザー定義の変換リストと照合されます。異なるトピックやトピックフィルターに対して複数の変換を設定可能です。
    • 変換実行:マッチした変換は設定された順序で実行されます。パイプラインはJSON、Protobuf、Avroなどの各種エンコーダー・デコーダーをサポートし、variform式を用いてメッセージの拡張や修正が可能です。
    • 変換後処理:メッセージが変換パイプラインを正常に通過すると、ルールエンジンのトリガーやサブスクライバーへのメッセージ配信などの次の処理に進みます。
  3. 失敗時の処理:変換が失敗した場合、ユーザー設定のアクションが実行されます:

    • メッセージ破棄:パブリッシュを終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コード(131 - 実装固有エラー)を返します。
    • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
    • 無視:追加のアクションは行いません。

    変換失敗時には、設定されたアクションにかかわらずログエントリが生成されます。ログの出力レベルはユーザーが設定可能で、デフォルトはwarningです。

    さらに、変換失敗はルールエンジンイベント($events/message_transformation_failed)をトリガーでき、ユーザーは誤ったメッセージを別のトピックに再パブリッシュしたり、Kafkaに送信して詳細解析を行うなどのカスタム処理を実装可能です。

メッセージ変換ルールの作成

Smart Data Hubをご契約の場合、左メニューの Smart Data Hub -> Message Transform をクリックしてメッセージ変換ページにアクセスできます。このページでメッセージ変換ルールを作成できます。

  1. メッセージ変換ページで、右上の + 新規作成 ボタンをクリックし、新規メッセージ変換ページに入り、以下の設定を行います:

    • 名前:メッセージ変換ルールを識別するための名前を入力します。

    • 説明:任意。メッセージ変換ルールの説明を追加します。

    • メッセージソースタイプ:変換対象のメッセージのトピックを設定します。複数のトピックやトピックフィルターを設定可能です。

    • メッセージフォーマット変換

      • ソースフォーマット:変換パイプラインに入る際に適用する有効なペイロードのデコード形式を指定します。選択肢はNone(デコードなし)、JSONAvroProtobufです。これらの形式はバイナリ入力ペイロードを構造化マッピングに変換します。AvroまたはProtobufを選択した場合は、事前にスキーマレジストリでスキーマを定義しておく必要があります。複数の変換ルールを含むパイプラインでは、各ステップでデコードを行う必要はありません。例えば、変換T1で既にペイロードをデコード済みであれば、後続のT2はデコードをスキップし、正しい形式のペイロードを直接使用できます。
      • ターゲットフォーマット:変換パイプライン完了時に最終的なメッセージペイロードをバイナリ値に変換するエンコード形式を指定します。エンコードの選択肢はソースフォーマットと同様で、None(エンコードなし)、JSONAvroProtobufです。パイプラインの最後の変換のみがペイロードをバイナリにエンコードする必要があり、中間の変換はバイナリエンコードを行う必要はありません。

      TIP

      ソースフォーマットとターゲットフォーマットが同じ場合、メッセージフォーマット変換は行われません。Avro/Protobuf形式とNoneは相互に変換できません。フォーマット変換の詳細ルールはメッセージフォーマット変換設定ルールを参照してください。

    • メッセージプロパティ変換

      • プロパティ:変換後の値(式で生成)を書き込む対象の場所を指定します。対象にはpayloadtopicqosretain(対応するフラグを設定)、およびuser_property(MQTTのUser-Property属性用)が含まれます。

        user_propertyを使用する場合は、具体的なキーを指定する必要があります(例:user_property.my_custom_prop)。

        payloadはメッセージペイロード全体を上書きするためにそのまま使用するか、ネストされたJSONオブジェクトとして扱うためにネストされたキーのパスを指定できます(例:payload.x.y)。

      • ターゲット値:指定したプロパティに書き込む値を定義します。この値はqosretaintopicpayloadpayload.x.yなど他のフィールドからコピーするか、variform式を使って生成できます。

      TIP

      設定方法の詳細はメッセージプロパティ変換設定ルールを参照してください。

    • 変換失敗時の操作

      • 失敗時のアクション:変換失敗時に実行するアクションを選択します:
        • メッセージ破棄:パブリッシュ処理を終了しメッセージを破棄、QoS 1およびQoS 2メッセージにはPUBACKで特定の理由コードを返します。
        • 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
        • 無視:追加のアクションは行いません。
      • ログレベル:検証失敗時のデプロイログ出力レベルを定義します。warningまたはerrorが選択可能です。
  2. (任意)作成前にプレビューをクリックして変換を試せます。プレビューでは、QoS、ペイロード、リテインフラグの有無、パブリッシャーのユーザー名やクライアントIDなど、入力メッセージのコンテキスト情報を入力できます。必要事項を入力後、変換を実行をクリックすると指定したコンテキストで変換が実行され、出力結果を確認できます。

  3. 確定をクリックして設定を完了します。

変換が作成されるとデフォルトで有効化され、メッセージ変換リストに表示されます。作成済みのメッセージ変換はメッセージ変換ルールの管理に従って管理できます。

メッセージフォーマット変換設定ルール

(設定可能);(設定不可)

ソースフォーマット \ ターゲットフォーマットJSONAvroProtobufNone
JSON
Avro
Protobuf
None

メッセージプロパティ変換設定ルール

(すべてのプロパティ設定をサポート);(設定不可)

user_propertyは全体として設定できませんが、個別のuser_propertyサブプロパティは操作可能です。

ソースフォーマット \ ターゲットフォーマットJSONAvroProtobufNone
JSON
Avropayloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能
Protobufpayloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能payloadを直接設定不可だが、payloadのサブプロパティ追加は可能
Nonepayloadを直接設定可能、サブプロパティ追加不可payloadを直接設定可能、サブプロパティ追加不可

メッセージ変換ルールの管理

メッセージ変換ルール一覧ページで以下の管理操作が可能です:

  • メッセージ変換ルールの有効化/無効化:有効列のトグルボタンをクリックします。
  • メッセージ変換ルールの編集:操作列の「編集」アイコンをクリックし、編集ページに入り修正後保存します。
  • メッセージ変換ルールの削除:操作列の「その他」アイコンをクリックし、ドロップダウンメニューから削除を選択して削除を確定します。
  • メッセージ変換ルールの順序調整:リストの行をマウスでドラッグして並び替えるか、操作列の「その他」メニュー内のクイック移動ボタンを使用します。

統計情報の閲覧

メッセージ変換ルール一覧で名前をクリックすると詳細ページに入り、メッセージ変換実行に関する統計情報を閲覧できます:

  • 統計情報
    • マッチ数:ルール有効化以降にメッセージ変換ルールがトリガーされた総回数。
    • 成功数:成功したメッセージ変換の回数。
    • 失敗数:失敗したメッセージ変換の回数。
  • レート指標
    • 現在のレート:直近1分間の変換レート。
    • 直近5分のレート:直近5分間の変換レート。
    • 最大レート:過去最高の変換レート。