メッセージの再パブリッシュ
データ統合を通じて、特定の条件を満たすメッセージをコードを書かずに他のトピックへ再パブリッシュすることが可能です。EMQX ブローカーでは、ルールを作成し、ルールSQLでソースメッセージのデータをフィルタリング・処理し、「メッセージの再パブリッシュ」アクションをルールに追加することで、処理結果をメッセージパブリッシュで転送できます。
本ページでは、任意のメッセージの msg に文字列 hello が含まれる場合に、greet トピックへメッセージを再パブリッシュするデータ統合の作成方法をデモンストレーションします。主な手順は以下の通りです。
- フィルタリング条件を設定するルールの作成
- メッセージ再パブリッシュのアクションをルールに追加
- データ統合の作成完了および動作テストの実施
データ統合によるメッセージ再パブリッシュの設定は、コネクターの追加を必要としません。以下のセクションで具体的な設定手順を説明します。
ルールの作成
データ統合 ページの データ転送 サービスカテゴリで 再パブリッシュ をクリックします。すでに他のコネクターを作成している場合は、新規コネクター をクリックし、データ転送 サービスカテゴリの中から 再パブリッシュ を選択してください。
SQLエディター にて、任意のメッセージの
msgに文字列helloが含まれる場合にエンジンをトリガーするルールSQLを定義します。- FROM句でメッセージデータのソースを指定します。本デモでは全トピック(
#)のメッセージを対象とします。 - WHERE句でメッセージペイロード内の
msgに対して正規表現マッチングを行い、helloを含む場合にデータ統合を実行します。
上記の方針に基づくSQL例は以下の通りです。
sqlSELECT payload.msg as msg FROM "#" WHERE regex_match(msg, 'hello')- FROM句でメッセージデータのソースを指定します。本デモでは全トピック(
SQL入力欄下の Try It Out をクリックし、以下のデータを入力してテストできます。
- トピック: t/a
- ペイロード:
json{ "msg": "hello test" }テスト をクリックし、出力結果を確認してください。正しく設定されていれば、フィールドと値に完全なJSONデータが表示されます。
json{ "msg": "hello test" }テスト出力が期待通りであれば、次のステップに進んでください。
注意: テストが失敗する場合は、SQLの記述が正しいかご確認ください。
アクションの追加
新規ルール ステップページで 次へ をクリックし、アクションを追加します。
新規アクション ステップページで以下のパラメータを設定します。
- コネクター: デフォルトの
Republishのままにします。 - トピック: 転送先トピックとして
greetを設定します。 - ペイロード: メッセージ内容テンプレートとして
${msg} -- forward from EMQX Cloudを入力します。 - QoSはデフォルト値のままにします。
- コネクター: デフォルトの
必要に応じて、MQTT 5.0 メッセージプロパティ のオプションをトグルスイッチで開いて設定します。詳細は MQTT 5.0 メッセージプロパティ を参照してください。
ダイレクトディスパッチ を有効にする場合はトグルスイッチをクリックします。有効化すると、メッセージはサブスクライバーに直接配信され、追加のルールのトリガーや同一ルールの再帰的な発動を防止します。
確認 をクリックしてアクションとルールの作成を完了します。
ルール作成成功 のポップアップで ルールに戻る をクリックし、メッセージ再パブリッシュのテスト の指示に従ってルールをテストします。あるいは、ルールをテスト をクリックし、ページ上でシミュレーションデータを入力してルールを検証できます。詳細は ルールをテスト を参照してください。
メッセージの再パブリッシュのテスト
MQTTX を使ってメッセージ送信をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、
testトピックに以下のメッセージを送信します。json{ "msg": "hello" }ルール一覧からメッセージ再パブリッシュルールを見つけ、ルールIDをクリックしてルール統計ページに入ります。関連する統計指標が表示されます。リセットボタンをクリックするとメトリクスデータをリセットできます。
TIP
サーバレスデプロイメントではメトリクスのリセットはサポートされていません。

クライアントで
greetトピックをサブスクライブします。msgにhelloが含まれる場合はメッセージが転送され、含まれない場合は転送されないことが確認できます。
アクションパラメータ
| パラメータ | 説明 |
|---|---|
| トピック | 再パブリッシュするメッセージの転送先トピックを設定します。${field} 構文を使った動的トピック構築に対応しています。入力欄に $ を入力すると利用可能な変数のドロップダウンリストが表示されます。 |
| QoS | 再パブリッシュするメッセージのQoSレベルを設定します。 |
| リテイン | このメッセージをリテインメッセージとして転送するかどうかを設定します。 |
| ペイロード | 再パブリッシュするメッセージの本文内容を設定します。ルールSQLの出力フィールドを参照する ${field} 構文に対応しています。エディターで $ を入力すると利用可能な変数のドロップダウンリストが表示されます。${payload} と入力すると、元のメッセージと同じペイロードをそのまま再パブリッシュします。 |
| MQTT 5.0 メッセージプロパティ | トグルスイッチをクリックして必要に応じてメッセージプロパティを設定します。再パブリッシュメッセージにリッチなメッセージメタデータを付加できます。詳細は以下を参照してください。 |
| ダイレクトディスパッチ | 有効にすると、メッセージはサブスクライバーに直接配信され、追加のルールのトリガーや同一ルールの再帰的発動を防止します。 |
MQTT 5.0 メッセージプロパティ
| パラメータ | 説明 |
|---|---|
| ペイロードフォーマットインジケーター | ペイロードのフォーマットを示します。false の場合は不特定のバイト列として扱われ、true の場合はUTF-8エンコードされた文字データとして扱われ、クライアントが内容を効率的に解析できます。 |
| メッセージ有効期限間隔 | 配信先に届かない場合にメッセージが失効するまでの秒数を指定します。 |
| コンテンツタイプ | 再パブリッシュメッセージのペイロード内容の種類やフォーマット(MIMEタイプ)を指定します。例: text/plain、audio/aac、application/json。 |
| レスポンストピック | 応答メッセージをパブリッシュする特定のMQTTトピックを入力します。例: response/my_device。 |
| 相関データ | 応答を元のリクエストに関連付けるための一意の識別子を入力します。トランザクションIDやリクエスト識別子などが該当します。 |