メッセージの再パブリッシュ
EMQX Platformのデータ統合を利用すると、特定の条件を満たすメッセージをコードを書かずに他のトピックへ再パブリッシュすることが可能です。EMQX Platformでは、ルールを作成し、ルールSQLでソースメッセージからのデータをフィルタリング・処理し、「メッセージの再パブリッシュ」アクションをルールに追加して、処理結果をメッセージパブリッシュで転送できます。
本ページでは、任意のメッセージのmsg
に文字列hello
が含まれる場合に、そのメッセージをgreet
トピックへ再パブリッシュするデータ統合の作成方法をデモンストレーションします。主な手順は以下の通りです。
- フィルタリング条件を設定するルールの作成
- ルールにメッセージ再パブリッシュのアクションを追加
- データ統合の作成完了および動作テスト
データ統合によるメッセージ再パブリッシュの設定は、コネクターの追加を必要としません。以下のセクションで具体的な設定手順を説明します。
ルールの作成
データ統合ページで、データ転送サービスカテゴリの下にある再パブリッシュをクリックします。すでに他のコネクターを作成している場合は、新規コネクターをクリックし、データ転送サービスカテゴリの中から再パブリッシュを選択します。
SQLエディターでルールSQLを定義し、任意のメッセージの
msg
に文字列hello
が含まれる場合にエンジンがトリガーされるようにします。- FROM句でメッセージデータのソースを指定します。このデモではすべてのトピック、すなわち
#
を対象とします。 - WHERE句でペイロード内の
msg
に対して正規表現マッチングを行い、hello
を含む場合にデータ統合を実行します。
上記の内容に基づくSQL例は以下の通りです。
sqlSELECT payload.msg as msg FROM "#" WHERE regex_match(msg, 'hello')
- FROM句でメッセージデータのソースを指定します。このデモではすべてのトピック、すなわち
SQL入力欄下のテスト有効化をクリックし、以下のデータを入力してテストできます。
- トピック: t/a
- ペイロード:
json{ "msg": "hello test" }
テストをクリックし、出力結果を確認します。正しく設定されていれば、フィールドと値に以下のように完全なJSONデータが表示されます。
json{ "msg": "hello test" }
テスト出力が期待通りであれば、次のステップに進んでください。
注意: テストが失敗する場合は、SQLの記述が正しいかご確認ください。
アクションの追加
新規ルールステップページで次へをクリックし、アクションを追加します。
新規アクションステップページで以下の情報を設定します。
- コネクター: デフォルトの
Republish
のままにします。 - トピック: 送信先トピックを
greet
に設定します。 - ペイロード: メッセージ内容のテンプレートとして
${msg} -- forward from EMQX Platform
を入力します。 - QoSはデフォルト値のままにします。
- コネクター: デフォルトの
必要に応じて、トグルスイッチをクリックしてMQTT 5.0メッセージプロパティのオプションを設定できます。詳細は再パブリッシュアクションの追加をご参照ください。
直接ディスパッチを有効にする場合はトグルスイッチをクリックします。有効化するとメッセージはサブスクライバーに直接配信され、追加のルールトリガーや同一ルールの再帰的な起動を防止します。
確認をクリックしてアクションとルールの作成を完了します。
新規ルール作成成功のポップアップでルールへ戻るをクリックし、メッセージ再パブリッシュのテストの手順に従ってルールをテストします。あるいは、ルールのテストをクリックしてページ上でシミュレーションデータを入力しテストすることも可能です。詳細はルールのテストをご覧ください。
メッセージの再パブリッシュのテスト
MQTTX(https://mqttx.app/)を使ってメッセージ送信をシミュレーションすることを推奨しますが、他の任意のクライアントでも構いません。
MQTTXを使ってデプロイ先に接続し、
test
トピックに以下のメッセージを送信します。json{ "msg": "hello" }
ルール一覧からメッセージ再パブリッシュルールを見つけ、ルールIDをクリックしてルール統計ページに入ります。関連する統計指標がページ上に表示されます。
クライアントで
greet
トピックをサブスクライブします。msg
にhello
が含まれる場合はメッセージが転送され、含まれない場合は転送されないことを確認できます。