Skip to content

ルールの作成

このページでは、EMQXダッシュボードを使ってデータ処理のルールを作成し、ルールにアクションを紐付ける方法について説明します。また、ルールのテスト方法や作成後のルールの確認方法についても紹介します。

本ページのデモでは、リパブリッシュアクションを例に取り、トピック t/# で受信したメッセージを処理し、トピック a/1 にリパブリッシュするルールの作成方法を説明します。ただし、「コンソールへの結果出力」や「Sinksによる転送」も アクションの追加 にて言及しています。

データソースの定義

EMQXダッシュボードにログインし、左側のナビゲーションメニューから Integration -> Rules をクリックします。

Rules ページで Create ボタンをクリックすると、Create Rule ページに遷移します。ここでルールのデータソースを定義し、フィルタリングされたメッセージに対して後続のアクションを設定します。

ルールの名前を入力し、将来の管理に役立つようにメモを追加します。SQL Editor では、ビジネスニーズに合わせてデータソースを追加するSQL文をカスタマイズできます。本チュートリアルでは、デフォルト設定のままにしておきます。これは、"t/#" パターンにマッチするトピック(例:t/at/a/bt/a/b/cなど)配下のすべてのメッセージを選択して返すものです。

TIP

本チュートリアルではメッセージのペイロードがJSON形式であることを前提としています。もしペイロードが別の形式の場合は、Schema Registryなどでデータ型変換が可能です。

EMQXには豊富なSQL文のサンプルが組み込まれており、SQL Editor 下の SQL Example ボタンから参照できます。SQLの構文や使い方の詳細は SQL Syntax をご覧ください。

ルール作成画面

SQL文のテスト

SQL文はシミュレートデータを使って実行テストできます。アクションの追加やルール作成の前に、SQLの実行結果が期待通りかどうかを検証できます。これは任意ですが、EMQXルールに慣れていない場合は推奨されます。ルール全体の実行テストを行いたい場合は、ルールのテストを参照してください。

SQL文をテストする手順は以下の通りです:

  1. Create Rule ページで Try It Out のトグルスイッチをオンにして、SQL文のテストを有効化します。
  2. SQLに合致する Data Source を選択し、ルール内の指定ソース(FROM句)と一致していることを確認します。
  3. テストデータを入力します。データソースを選択すると、EMQXはすべてのシミュレートデータフィールド(Client IDUsernameTopicQoSPayloadなど)にデフォルト値を提供します。必要に応じて適切な値に変更してください。
  4. Run Test ボタンをクリックしてテストを実行します。正常であれば Test Passed のメッセージが表示されます。

test-sql

SQLの処理結果は Output Result セクションにJSON形式で表示されます。SQL処理結果のすべてのフィールドは、後続のアクション(組み込みアクションやSink)で ${key} の形式で参照可能です。フィールドの詳細については SQL Data Sources and Fields をご覧ください。

本デモはペイロードがJSON形式であることを想定していますが、実際には Schema Registry を使って他の形式のメッセージも扱えます。

続いて、Create Rule ページ右側の Add Action ボタンをクリックし、ルールに異なる種類のアクションを追加できます。

アクションの追加

Create Rule ページで右側の Add Action ボタンをクリックすると、Add Action ページが表示されます。Action ドロップダウンリストから、Republish、Console Output、Data Bridgeによる転送の3種類のアクションを選択できます。

アクション追加画面

リパブリッシュアクションの追加

ここでは、トピック t/# で受信した元のメッセージを別のトピック a/1 にリパブリッシュするアクションの追加方法を説明します。

Add Action ページで、Type of Action ドロップダウンメニューから Republish を選択し、以下の設定を行ってから Add ボタンをクリックして確定します:

  • Topic:転送先のトピックを設定します。本例では "a/1"

  • QoS:リパブリッシュするメッセージのQoSを設定します。本例では "0"

  • Retain:このメッセージをリテインメッセージとして転送するか設定します。本チュートリアルではデフォルトの false のままにします。

  • Payload${payload} と入力し、リパブリッシュするメッセージのペイロードを元のメッセージと同じにします(変更なし)。

  • MQTT 5.0 メッセージプロパティ:トグルスイッチをクリックして、必要に応じてユーザープロパティやMQTTプロパティを設定します。これによりリパブリッシュメッセージにリッチなメタデータを付与できます。

    • Payload Format Indicator:メッセージのペイロードが特定の形式かどうかを示す値を入力します。false の場合は不特定バイト列とみなされ、true の場合はUTF-8エンコードされた文字データとみなされます。これによりMQTTクライアントやサーバーがメッセージ内容を効率的に解析できます。
    • Message Expiry Interval:メッセージが配信されなかった場合に無効とみなすまでの有効期限(秒)を指定します。
    • Content Type:リパブリッシュメッセージのペイロードの種類や形式(MIMEタイプ)を指定します。例:text/plain(テキストファイル)、audio/aac(音声ファイル)、application/json(JSON形式のアプリケーションメッセージ)など。
    • Response Topic:レスポンスメッセージをパブリッシュする特定のMQTTトピックを指定します。例:response/my_device
    • Correlation Data:レスポンスメッセージと元のリクエストメッセージを関連付けるための一意の識別子やデータを入力します。例:リクエストIDやトランザクションIDなど。
  • Direct Dispatch:トグルスイッチを切り替えて直接ディスパッチを有効または無効にします。有効にすると、メッセージがサブスクライバーに直接配信され、意図しない動作(追加ルールのトリガーや同一ルールの再帰的発火など)を防ぎます。

Create Rule ページで下部の Create ボタンをクリックしてルール作成を完了します。このルールは Rule ページに新しいエントリとして追加されます。

TIP

リパブリッシュアクションは元のメッセージの配信を妨げません。例えば、トピック "t/1" 配下のメッセージはルールにより "a/1" にリパブリッシュされますが、同時に "t/1" のメッセージは "t/1" をサブスクライブしているクライアントにも配信され続けます。

コンソール出力アクションの追加

TIP

コンソール出力アクションはデバッグ用途のみに使用してください。本番環境で使用するとパフォーマンス問題を引き起こす可能性があります。

コンソール出力アクションはルールの出力結果を確認するために使います。結果メッセージはコンソールまたはログファイルに出力されます。

  • EMQXが console または foreground モードで起動している場合(Docker環境のデフォルトは foreground)、出力はコンソールに向けられます。
  • systemd経由で起動している場合は、出力はジャーナルシステムにキャプチャされ、journalctl コマンドで確認できます。

出力は以下の形式です:

bash
[rule action] rule_id1
    Action Data: #{key1 => val1}
    Envs: #{key1 => val1, key2 => val2}

ここで

  • [rule action] はリパブリッシュアクションが発動したルールIDです。
  • Action Data はルールの出力結果で、アクション実行時に渡すべきデータやパラメータ(リパブリッシュアクション設定時のペイロード部分)を示します。
  • Envs はリパブリッシュ時に設定される環境変数で、データソースやこのアクション実行に関連する内部情報を含みます。

Sinksによる転送アクションの追加

処理結果をSinkを使って転送するアクションも追加可能です。Dashboardの Type of Action ドロップダウンリストから対象のSinkを選択するだけです。EMQXの各Sinkの詳細は Data Integration を参照してください。

ルールのテスト

ルールエンジンはルールテスト機能を提供しており、シミュレートデータや実際のクライアントデータを使ってルールをトリガーし、SQLの実行やルールに追加されたすべてのアクションを実行し、各ステップの実行結果を取得できます。

ルールをテストすることで、ルールが期待通りに動作するか検証でき、問題の早期発見と解決が可能です。これにより開発効率が向上し、本番環境での失敗を防止できます。

テスト手順

  1. Try It Out スイッチをオンにし、テスト対象として Rule を選択します。テスト開始前にルールを保存しておく必要があります。
  2. Start Test ボタンをクリックしてテストを開始します。ブラウザは現在のルールがトリガーされてテスト結果が生成されるのを待機します。
  3. ルールをトリガーしてテストします。以下の2つの方法が利用可能です:
    • シミュレートデータを使うInput Simulated Data ボタンをクリックし、ポップアップでSQLに合致する Data Source を選択し、ルール内の指定ソース(FROM句)と一致していることを確認します。EMQXはすべてのフィールド(Client IDUsernameTopicQoSPayloadなど)にデフォルト値を提供します。必要に応じて修正し、Submit Test ボタンをクリックして一度だけルールをトリガーします。
    • 実際のデバイスデータを使う:現在のページを開いたままにし、実際のクライアントやMQTTクライアントツールでEMQXに接続し、対応するイベントをトリガーしてテストを行います。
  4. テスト結果の確認:ルールがトリガーされると、実行結果がダッシュボードに出力され、各ステップの詳細な実行結果が表示されます。

テスト例

MQTTX を使ってリパブリッシュアクション付きのルールをテストできます。1つのクライアントを作成し、そのクライアントでトピック a/1 をサブスクライブし、トピック t/1 にメッセージを送信します。ダイアログボックスに、このメッセージがトピック a/1 にリパブリッシュされたことが表示されます。

MQTTXクライアントツールとEMQXの接続方法の詳細は MQTTX - Get Started をご覧ください。

MQTTXによるテスト例

対応して、ダッシュボードのテスト画面にはルール全体の実行結果が表示され、以下の内容が含まれます:

  • 左側にはルール実行の記録が表示されます。ルールがトリガーされるたびに記録が生成され、クリックすると該当メッセージやイベントの詳細に切り替わります。
  • 右側には選択したルールのアクション実行記録が一覧表示され、クリックするとアクションの実行結果やログを展開して確認できます。

ルールSQLやアクションの実行に失敗した場合、該当ルールの記録は失敗としてマークされます。記録を選択すると該当アクションのエラー情報を確認でき、トラブルシューティングに役立ちます。

ルールテスト結果

上記例では、ルールは4回トリガーされ、そのうち3回は完全に成功しました。4回目は HTTP Server アクションの実行失敗により失敗となり、エラー理由は302ステータスコードのレスポンスでした。

ルールテストの詳細な活用方法はブログ Enhancing Data Integration Stability: A Guide on EMQX Platform E2E Rule Testing をご参照ください。

ルールの確認

Rules ページでは、作成したすべてのルールの一覧を確認できます。

一覧の各エントリには、ルールID、関連するソース、有効状態、アクション数などの基本情報が表示されます。ソースにカーソルを合わせると対応するSQL文の詳細が表示されます。ルールの設定を変更したい場合は、Actions 列の Settings をクリックします。More ボタンからはルールの複製や削除も可能です。

view_rules

また、Flow DesignerIntegration -> Flow Designer からもルールを確認できます。Rules ページで作成したルールとFlow Designerで作成したルールは完全に相互運用可能です。

ルールの統計情報やアクション実行状況を確認したい場合は、Rules ページのルールIDまたは Flows ページのルール名をクリックしてください。

view_rules_flows

TIP

ルールアクションを更新したりデータソースを再定義した場合、以下のページに表示される統計情報はリセットされて再集計が開始されます。

Rule Statistics

ルールの検索

ルールが多数ある場合は、フィルター機能を使って表示を絞り込み、目的のルールを探せます。ルールID、受信メッセージのトピックやワイルドカード、有効状態、ルールメモ、ルールに紐づくアクションやソースで絞り込みが可能です。

search_rules

アクション(Sink)およびソースの確認

Rule ページの Actions (Sink)Sources タブでは、作成済みのすべてのアクション(Sink)とソースが一覧表示されます。名前、接続状態、関連ルール、有効状態、作成日時、最終更新日時などの重要情報が確認でき、カラム名横の矢印をクリックしてソートも可能です。

Enable 列のトグルスイッチをクリックすると、Sinkやソースの有効化・無効化が切り替えられます。Associated Rules 列の View Rules をクリックすると、そのSinkやソースを含むルール一覧が開き、データ連携設定の管理が容易になります。

Action 列からは、Sinkやソースの再接続や設定変更が可能です。More からは削除や、それらを使った新規ルール作成も行えます。

Sinkやソースが多数ある場合は、フィルターで名前や状態、有効状態で絞り込みが可能です。

view_sink_source

Sinkやソースの統計情報やレート指標を確認したい場合は、名前をクリックしてください。

action_statistics