ルールの作成
このページでは、EMQXダッシュボードを使ってデータ処理のルールを作成し、ルールにアクションを紐付ける方法について説明します。また、ルールのテスト方法や作成後のルールの確認方法についても紹介します。
本ページのデモでは、リパブリッシュアクションを例に取り、トピック t/#
で受信したメッセージを処理し、トピック a/1
にリパブリッシュするルールの作成方法を説明します。ただし、「コンソールへの結果出力」や「Sinksによる転送」も アクションの追加 にて言及しています。
データソースの定義
EMQXダッシュボードにログインし、左側のナビゲーションメニューから Integration -> Rules をクリックします。
Rules ページで Create ボタンをクリックすると、Create Rule ページに遷移します。ここでルールのデータソースを定義し、フィルタリングされたメッセージに対して後続のアクションを設定します。
ルールの名前を入力し、将来の管理に役立つようにメモを追加します。SQL Editor では、ビジネスニーズに合わせてデータソースを追加するSQL文をカスタマイズできます。本チュートリアルでは、デフォルト設定のままにしておきます。これは、"t/#"
パターンにマッチするトピック(例:t/a
、t/a/b
、t/a/b/c
など)配下のすべてのメッセージを選択して返すものです。
TIP
本チュートリアルではメッセージのペイロードがJSON形式であることを前提としています。もしペイロードが別の形式の場合は、Schema Registryなどでデータ型変換が可能です。
EMQXには豊富なSQL文のサンプルが組み込まれており、SQL Editor 下の SQL Example ボタンから参照できます。SQLの構文や使い方の詳細は SQL Syntax をご覧ください。

SQL文のテスト
SQL文はシミュレートデータを使って実行テストできます。アクションの追加やルール作成の前に、SQLの実行結果が期待通りかどうかを検証できます。これは任意ですが、EMQXルールに慣れていない場合は推奨されます。ルール全体の実行テストを行いたい場合は、ルールのテストを参照してください。
SQL文をテストする手順は以下の通りです:
- Create Rule ページで Try It Out のトグルスイッチをオンにして、SQL文のテストを有効化します。
- SQLに合致する Data Source を選択し、ルール内の指定ソース(FROM句)と一致していることを確認します。
- テストデータを入力します。データソースを選択すると、EMQXはすべてのシミュレートデータフィールド(Client ID、Username、Topic、QoS、Payloadなど)にデフォルト値を提供します。必要に応じて適切な値に変更してください。
- Run Test ボタンをクリックしてテストを実行します。正常であれば Test Passed のメッセージが表示されます。
SQLの処理結果は Output Result セクションにJSON形式で表示されます。SQL処理結果のすべてのフィールドは、後続のアクション(組み込みアクションやSink)で ${key}
の形式で参照可能です。フィールドの詳細については SQL Data Sources and Fields をご覧ください。
本デモはペイロードがJSON形式であることを想定していますが、実際には Schema Registry を使って他の形式のメッセージも扱えます。
続いて、Create Rule ページ右側の Add Action ボタンをクリックし、ルールに異なる種類のアクションを追加できます。
アクションの追加
Create Rule ページで右側の Add Action ボタンをクリックすると、Add Action ページが表示されます。Action ドロップダウンリストから、Republish、Console Output、Data Bridgeによる転送の3種類のアクションを選択できます。

リパブリッシュアクションの追加
ここでは、トピック t/#
で受信した元のメッセージを別のトピック a/1
にリパブリッシュするアクションの追加方法を説明します。
Add Action ページで、Type of Action ドロップダウンメニューから Republish を選択し、以下の設定を行ってから Add ボタンをクリックして確定します:
Topic:転送先のトピックを設定します。本例では
"a/1"
。QoS:リパブリッシュするメッセージのQoSを設定します。本例では
"0"
。Retain:このメッセージをリテインメッセージとして転送するか設定します。本チュートリアルではデフォルトの false のままにします。
Payload:
${payload}
と入力し、リパブリッシュするメッセージのペイロードを元のメッセージと同じにします(変更なし)。MQTT 5.0 メッセージプロパティ:トグルスイッチをクリックして、必要に応じてユーザープロパティやMQTTプロパティを設定します。これによりリパブリッシュメッセージにリッチなメタデータを付与できます。
- Payload Format Indicator:メッセージのペイロードが特定の形式かどうかを示す値を入力します。
false
の場合は不特定バイト列とみなされ、true
の場合はUTF-8エンコードされた文字データとみなされます。これによりMQTTクライアントやサーバーがメッセージ内容を効率的に解析できます。 - Message Expiry Interval:メッセージが配信されなかった場合に無効とみなすまでの有効期限(秒)を指定します。
- Content Type:リパブリッシュメッセージのペイロードの種類や形式(MIMEタイプ)を指定します。例:
text/plain
(テキストファイル)、audio/aac
(音声ファイル)、application/json
(JSON形式のアプリケーションメッセージ)など。 - Response Topic:レスポンスメッセージをパブリッシュする特定のMQTTトピックを指定します。例:
response/my_device
。 - Correlation Data:レスポンスメッセージと元のリクエストメッセージを関連付けるための一意の識別子やデータを入力します。例:リクエストIDやトランザクションIDなど。
- Payload Format Indicator:メッセージのペイロードが特定の形式かどうかを示す値を入力します。
Direct Dispatch:トグルスイッチを切り替えて直接ディスパッチを有効または無効にします。有効にすると、メッセージがサブスクライバーに直接配信され、意図しない動作(追加ルールのトリガーや同一ルールの再帰的発火など)を防ぎます。
Create Rule ページで下部の Create ボタンをクリックしてルール作成を完了します。このルールは Rule ページに新しいエントリとして追加されます。
TIP
リパブリッシュアクションは元のメッセージの配信を妨げません。例えば、トピック "t/1"
配下のメッセージはルールにより "a/1"
にリパブリッシュされますが、同時に "t/1"
のメッセージは "t/1"
をサブスクライブしているクライアントにも配信され続けます。
コンソール出力アクションの追加
TIP
コンソール出力アクションはデバッグ用途のみに使用してください。本番環境で使用するとパフォーマンス問題を引き起こす可能性があります。
コンソール出力アクションはルールの出力結果を確認するために使います。結果メッセージはコンソールまたはログファイルに出力されます。
- EMQXが
console
またはforeground
モードで起動している場合(Docker環境のデフォルトはforeground
)、出力はコンソールに向けられます。 - systemd経由で起動している場合は、出力はジャーナルシステムにキャプチャされ、
journalctl
コマンドで確認できます。
出力は以下の形式です:
[rule action] rule_id1
Action Data: #{key1 => val1}
Envs: #{key1 => val1, key2 => val2}
ここで
[rule action]
はリパブリッシュアクションが発動したルールIDです。Action Data
はルールの出力結果で、アクション実行時に渡すべきデータやパラメータ(リパブリッシュアクション設定時のペイロード部分)を示します。Envs
はリパブリッシュ時に設定される環境変数で、データソースやこのアクション実行に関連する内部情報を含みます。
Sinksによる転送アクションの追加
処理結果をSinkを使って転送するアクションも追加可能です。Dashboardの Type of Action ドロップダウンリストから対象のSinkを選択するだけです。EMQXの各Sinkの詳細は Data Integration を参照してください。
ルールのテスト
ルールエンジンはルールテスト機能を提供しており、シミュレートデータや実際のクライアントデータを使ってルールをトリガーし、SQLの実行やルールに追加されたすべてのアクションを実行し、各ステップの実行結果を取得できます。
ルールをテストすることで、ルールが期待通りに動作するか検証でき、問題の早期発見と解決が可能です。これにより開発効率が向上し、本番環境での失敗を防止できます。
テスト手順
- Try It Out スイッチをオンにし、テスト対象として Rule を選択します。テスト開始前にルールを保存しておく必要があります。
- Start Test ボタンをクリックしてテストを開始します。ブラウザは現在のルールがトリガーされてテスト結果が生成されるのを待機します。
- ルールをトリガーしてテストします。以下の2つの方法が利用可能です:
- シミュレートデータを使う:Input Simulated Data ボタンをクリックし、ポップアップでSQLに合致する Data Source を選択し、ルール内の指定ソース(FROM句)と一致していることを確認します。EMQXはすべてのフィールド(Client ID、Username、Topic、QoS、Payloadなど)にデフォルト値を提供します。必要に応じて修正し、Submit Test ボタンをクリックして一度だけルールをトリガーします。
- 実際のデバイスデータを使う:現在のページを開いたままにし、実際のクライアントやMQTTクライアントツールでEMQXに接続し、対応するイベントをトリガーしてテストを行います。
- テスト結果の確認:ルールがトリガーされると、実行結果がダッシュボードに出力され、各ステップの詳細な実行結果が表示されます。
テスト例
MQTTX を使ってリパブリッシュアクション付きのルールをテストできます。1つのクライアントを作成し、そのクライアントでトピック a/1
をサブスクライブし、トピック t/1
にメッセージを送信します。ダイアログボックスに、このメッセージがトピック a/1
にリパブリッシュされたことが表示されます。
MQTTXクライアントツールとEMQXの接続方法の詳細は MQTTX - Get Started をご覧ください。

対応して、ダッシュボードのテスト画面にはルール全体の実行結果が表示され、以下の内容が含まれます:
- 左側にはルール実行の記録が表示されます。ルールがトリガーされるたびに記録が生成され、クリックすると該当メッセージやイベントの詳細に切り替わります。
- 右側には選択したルールのアクション実行記録が一覧表示され、クリックするとアクションの実行結果やログを展開して確認できます。
ルールSQLやアクションの実行に失敗した場合、該当ルールの記録は失敗としてマークされます。記録を選択すると該当アクションのエラー情報を確認でき、トラブルシューティングに役立ちます。

上記例では、ルールは4回トリガーされ、そのうち3回は完全に成功しました。4回目は HTTP Server アクションの実行失敗により失敗となり、エラー理由は302ステータスコードのレスポンスでした。
ルールテストの詳細な活用方法はブログ Enhancing Data Integration Stability: A Guide on EMQX Platform E2E Rule Testing をご参照ください。
ルールの確認
Rules ページでは、作成したすべてのルールの一覧を確認できます。
一覧の各エントリには、ルールID、関連するソース、有効状態、アクション数などの基本情報が表示されます。ソースにカーソルを合わせると対応するSQL文の詳細が表示されます。ルールの設定を変更したい場合は、Actions 列の Settings をクリックします。More ボタンからはルールの複製や削除も可能です。
また、Flow Designer の Integration -> Flow Designer からもルールを確認できます。Rules ページで作成したルールとFlow Designerで作成したルールは完全に相互運用可能です。
ルールの統計情報やアクション実行状況を確認したい場合は、Rules ページのルールIDまたは Flows ページのルール名をクリックしてください。
TIP
ルールアクションを更新したりデータソースを再定義した場合、以下のページに表示される統計情報はリセットされて再集計が開始されます。
ルールの検索
ルールが多数ある場合は、フィルター機能を使って表示を絞り込み、目的のルールを探せます。ルールID、受信メッセージのトピックやワイルドカード、有効状態、ルールメモ、ルールに紐づくアクションやソースで絞り込みが可能です。
アクション(Sink)およびソースの確認
Rule ページの Actions (Sink) と Sources タブでは、作成済みのすべてのアクション(Sink)とソースが一覧表示されます。名前、接続状態、関連ルール、有効状態、作成日時、最終更新日時などの重要情報が確認でき、カラム名横の矢印をクリックしてソートも可能です。
Enable 列のトグルスイッチをクリックすると、Sinkやソースの有効化・無効化が切り替えられます。Associated Rules 列の View Rules をクリックすると、そのSinkやソースを含むルール一覧が開き、データ連携設定の管理が容易になります。
Action 列からは、Sinkやソースの再接続や設定変更が可能です。More からは削除や、それらを使った新規ルール作成も行えます。
Sinkやソースが多数ある場合は、フィルターで名前や状態、有効状態で絞り込みが可能です。
Sinkやソースの統計情報やレート指標を確認したい場合は、名前をクリックしてください。