メッセージ変換
メッセージ変換は、ユーザー定義のルールに基づいてメッセージを修正およびフォーマットし、その後の処理やサブスクライバーへの配信前に適用する機能です。この機能は高度にカスタマイズ可能で、複数のエンコーディングや高度な変換をサポートしています。
ワークフロー
メッセージがパブリッシュされると、以下のワークフローを経ます。
スキーマ検証:メッセージがパブリッシュされ認可を通過すると、まずスキーマ検証が行われます。メッセージが検証に合格すると次のステップに進みます。
メッセージ変換パイプライン:
- 変換マッチング:メッセージのトピックに基づいて、ユーザー定義の変換リストと照合されます。異なるトピックやトピックフィルターに対して複数の変換を設定可能です。
- 変換実行:マッチした変換が設定順に実行されます。パイプラインはJSON、Protobuf、Avroなどの各種エンコーダー・デコーダーをサポートし、Variform式を用いてメッセージの拡張や修正が可能です。
- 変換後処理:メッセージが変換パイプラインを正常に通過すると、ルールエンジンのトリガーやサブスクライバーへのメッセージ配信などの次の処理に進みます。
失敗時の処理:変換が失敗した場合、ユーザー設定に応じたアクションが実行されます。
- メッセージ破棄:パブリッシュを終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで特定の理由コード(131 - 実装固有エラー)を返します。
- 切断してメッセージ破棄:メッセージを破棄し、パブリッシュしたクライアントを切断します。
- 無視:追加の処理は行いません。
変換失敗時には、設定に関わらずログエントリが生成される場合があります。ログの出力レベルはユーザーが設定可能で、デフォルトは
warningです。さらに、変換失敗はルールエンジンのイベント($events/message_transformation/failed)をトリガーでき、誤ったメッセージを別トピックに再パブリッシュしたり、Kafkaへ送信して解析するなどのカスタム処理が可能です。
ユーザーガイド
このセクションでは、メッセージ変換機能の設定方法とテスト方法を説明します。
ダッシュボードでのメッセージ変換設定
ダッシュボードでメッセージ変換を作成・設定する手順を示します。
ダッシュボードにアクセスし、左側のナビゲーションメニューから Smart Data Hub -> Message Transform をクリックします。
Message Transform ページ右上の Create をクリックします。
「Create Message Transform」ページで以下の情報を設定します。
Name:変換の名前を入力します。
Message Source Topic:変換対象のメッセージが送られるトピックを設定します。複数のトピックやトピックフィルターを設定可能です。
Note(任意):メモを入力します。
Message Format Transformation:
Source Format:変換パイプラインに入るメッセージに適用するペイロードデコーダーを指定します。選択肢は以下の通りです。
None(デコードなし)JSONAvroProtobufCustom (External HTTP)
これらのデコーダーはバイナリの入力ペイロードを構造化されたマップに変換します。
Avro、Protobuf、Custom (External HTTP)を選択する場合は、あらかじめスキーマレジストリで作成済みである必要があります。複数の変換が連なるパイプラインでは、各ステップでのデコードは必須ではありません。例えば、変換
T1で既にペイロードをデコードしていれば、後続の変換T2はデコードをスキップし、正しい形式のペイロードを利用できます。Target Format:変換パイプラインの最後に最終的なメッセージペイロードをバイナリ値としてエンコードするためのペイロードエンコーダーを指定します。エンコーダーの選択肢はSource Formatと同じです。
パイプラインの最後の変換のみがバイナリエンコードを行う必要があり、中間の変換はバイナリエンコードを処理する必要はありません。
Message Properties Transformation:
- Properties:式の結果として得られた変換後の値を書き込む先を指定します。有効な書き込み先は
payload、topic、qos、retain(対応するフラグを設定)、およびuser_property(MQTTのUser-Propertyプロパティ用)です。user_propertyを使用する場合は、必ず1つのキーを指定してください(例:user_property.my_custom_prop)。payloadはそのまま使ってメッセージペイロード全体を上書きするか、特定のネストされたキーのパスを指定してJSONオブジェクトのように扱うことも可能です(例:payload.x.y)。 - Target Value:設定したプロパティに書き込む値を定義します。この値は
qos、retain、topic、payload、payload.x.yなどのフィールドからコピーするか、variform式を使って生成できます。
- Properties:式の結果として得られた変換後の値を書き込む先を指定します。有効な書き込み先は
Transformation Failure Operation:
Action After Failure:変換失敗時に実行するアクションを選択します。
- Drop Message:パブリッシュ処理を終了しメッセージを破棄します。QoS 1およびQoS 2メッセージの場合はPUBACKで特定の理由コードを返します。
- Disconnect and Drop Message:メッセージを破棄し、パブリッシュしたクライアントを切断します。
- Ignore:追加の処理は行いません。
Output Logs:変換失敗時にログを生成するか選択します。ログはデフォルトで有効です。
Logs Level:ログの出力レベルを設定します。デフォルトは
warningです。
設定が完了したら Create をクリックします。
作成前に変換をテストするには、Preview をクリックします。新しいペインが開き、QoS、ペイロード、retainフラグの有無、パブリッシャーのユーザー名やクライアントIDなどのメッセージコンテキストを入力できます。必要な情報を入力後、Execute Transformation をクリックすると指定したコンテキストで変換を実行し、結果を確認できます。
作成した変換は、メッセージ変換ページのリストにデフォルトで有効状態で表示されます。必要に応じて無効化したり、Actions列のSettingsをクリックして設定を更新できます。削除や順序変更はMoreから行えます。
設定ファイルでのメッセージ変換設定
Avro形式でエンコードされたメッセージを受信し、それをJSONにデコードしたいとします。デコード後、パブリッシュしたクライアントのクライアント属性から取得したtenant属性をトピックの先頭に付加してからルールエンジンで処理したい場合、以下の設定で実現可能です。
message_transformation {
transformations = [
{
name = mytransformation
topics = ["t"]
failure_action = drop
payload_decoder = {type = avro, schema = myschema}
payload_encoder = {type = json}
operations = [
{key = "topic", value = "concat([client_attrs.tenant, '/', topic])"}
]
}
]
}この設定は、mytransformationという名前の変換を指定し、
- 指定したスキーマを使ってAvro形式のペイロードをデコードし、
- ペイロードをJSON形式にエンコードし、
- クライアント属性の
tenantと元のトピックを連結してトピックを変更しています。
詳細な設定方法は設定マニュアルをご参照ください。
REST API
REST APIを通じたメッセージ変換の利用方法の詳細は、EMQX Enterprise APIをご覧ください。
デコード/エンコード用スキーマの作成
デコーダーおよびエンコーダースキーマの作成方法については、スキーマレジストリのセクションを参照してください。
統計と指標
メッセージ変換を有効にすると、ダッシュボードで統計情報や指標を確認できます。メッセージ変換ページで変換名をクリックすると、以下の情報が表示されます。
統計情報:
- Total:システム起動以降のトリガー総数
- Success:成功したデータ変換の数
- Failed:失敗したデータ変換の数
レート指標:
- 現在の変換速度
- 過去5分間の速度
- 過去の最大速度
統計はリセット可能で、Prometheusの/prometheus/message_transformationからも取得可能です。