Skip to content

ルールの作成

このページでは、EMQX ダッシュボードを使用してデータ処理のルールを作成し、ルールにアクションを追加する方法について説明します。また、ルールのテスト方法や作成後のルールの確認方法も紹介します。

本ページのデモでは、再パブリッシュアクションを例に取り、トピック t/# で受信したメッセージを処理し、トピック a/1 に再パブリッシュするルールの作成方法を説明します。ただし、アクションの追加では「コンソールへの結果出力」や「Sinks を使った転送」も紹介しています。

ルール SQL の定義

EMQX ダッシュボードにログインし、左側のナビゲーションメニューから Integration -> Rules をクリックします。

Rules ページで Create ボタンをクリックすると、Create Rule ページに遷移します。ここでルールのデータソースを定義し、フィルターされたメッセージに対して実行するアクションを決定します。

ルール名を入力し、管理しやすいようにメモを追加します。SQL Editor では、ビジネスニーズに合わせてデータソースを追加するためのステートメントをカスタマイズできます。本チュートリアルでは、デフォルト設定のままにしておきます。これは "t/#" パターンに一致するトピック(例:t/a, t/a/b, t/a/b/c など)配下のすべてのメッセージを選択して返すものです。

TIP

本チュートリアルではメッセージのペイロードが JSON 形式であることを想定しています。ペイロードが他の形式の場合は、スキーマレジストリを使ってデータ型を変換できます。

EMQX には豊富な SQL ステートメントのサンプルが組み込まれており、SQL Editor 下の SQL Examples ボタンから参照可能です。SQL の構文や使い方の詳細は SQL Syntax をご覧ください。

ルール作成画面

SQL ジェネレーター

EMQX 5.10.0 以降、SQL Editor は AI による自然言語からのルール SQL 生成機能(SQL ジェネレーター)をサポートしています。この機能では、自然言語で意図を記述すると、それに応じた適切な SQL ステートメントを自動生成します。

SQL ジェネレーターはデフォルトで有効になっています。ダッシュボード右上の Settings メニューからトグルスイッチで無効化可能です。

利用方法は以下の通りです:

  1. Create Rule ページの SQL Editor セクションに移動します。

  2. エディター下の SQL Generator ボタンをクリックし、Generate SQL with AI ダイアログを開きます。以下の項目を指定します:

    • Task Description(必須):SQL に実行させたい内容を自然言語で記述します。

      例: 「MQTT メッセージのメタデータから clientid を抽出し、ペイロードから device_idtemperature を抽出する。温度が 30 を超える場合のみ、トピック sensors/temperature のメッセージに適用する。」

    • Related Topics(任意):sensors/temperature のようなトピックフィルターを指定します。

    • Input Example (JSON)(任意・推奨):AI にデータ構造を理解させるためのサンプル MQTT メッセージペイロードを入力します。 例:

      json
      {
        "device_id": "sensor001",
        "temperature": 32.5,
        "unit": "C"
      }
    • Output Example (JSON)(任意):期待する結果のフォーマットを指定します。 例:

      json
      {
        "clientid": "client_a1b2c3",
        "device_id": "sensor001",
        "temperature": 32.5
      }

      TIP

      入力例・出力例を含めることで生成される SQL の精度が向上します。

  3. Generate をクリックして生成された SQL をプレビューします。

  4. プレビュー画面では、

    • 生成された SQL の確認および手動編集が可能です。
    • Apply SQL をクリックすると SQL Editor に挿入されます。
    • Back to Form をクリックすると入力画面に戻り、内容を修正して再生成できます。
  5. SQL Editor に挿入すると、SQL Editor 内に自動的に表示され、確認・編集が可能です。

出力例

上記サンプルのタスクと入力例から生成される SQL は以下のようになります。

sql
SELECT
  clientid,
  payload.device_id AS device_id,
  payload.temperature AS temperature
FROM
  "sensors/temperature"
WHERE
  payload.temperature > 30

このルールは、sensors/temperature トピックのメッセージから clientiddevice_idtemperature フィールドを抽出し、温度が 30 を超える場合に適用されます。

SQL ジェネレーターを使うべき場合

  • EMQX SQL 構文に不慣れな場合
  • ルールを素早くプロトタイプしたい場合
  • JSON 形式の構造化ペイロードを扱う場合

より詳細なカスタマイズや構文オプションは Rule SQL Syntax を参照してください。

SQL ステートメントのテスト

SQL ステートメントはシミュレーションデータを使って実行テストできます。アクションを追加してルールを作成する前に、SQL の実行結果が期待通りかどうかを検証できます。これは任意のステップですが、EMQX ルールに不慣れな場合は推奨されます。ルール全体の実行テストを行いたい場合は、ルールのテストを参照してください。

SQL ステートメントのテスト手順は以下の通りです:

  1. Create Rule ページで Try It Out トグルスイッチをオンにして、SQL ステートメントのテストを有効にします。
  2. SQL に合致する Data Source を選択し、ルールの指定ソース(FROM 句)と一致していることを確認します。
  3. テストデータを入力します。データソースを選択すると、EMQX は Client IDUsernameTopicQoSPayload などのシミュレーションデータのデフォルト値を提供します。必要に応じて適切な値に修正してください。
  4. Run Test ボタンをクリックしてテストを実行します。正常に動作すれば Test Passed のメッセージが表示されます。

test-sql

SQL の処理結果は Output Result セクションに JSON 形式で表示されます。SQL 処理結果のすべてのフィールドは、後続のアクション(組み込みアクションや Sink)で ${key} の形式で参照可能です。フィールドの詳細は SQL Data Sources and Fields を参照してください。

本デモはペイロードが JSON 形式であることを想定していますが、実際には スキーマレジストリ を使って他の形式のメッセージも扱えます。

次に、Create Rule ページ右側の Add Action ボタンをクリックして、ルールにさまざまなタイプのアクションを追加できます。

アクションの追加

Create Rule ページで右側の Add Action ボタンをクリックすると、Add Action ページが表示されます。Action ドロップダウンリストから、再パブリッシュ、コンソール出力、データブリッジによる転送の3種類のアクションを選択できます。

アクション追加画面

再パブリッシュアクションの追加

このセクションでは、トピック t/# で受信した元のメッセージを別のトピック a/1 に再パブリッシュするアクションの追加方法を説明します。

Add Action ページで、Type of Action ドロップダウンから Republish を選択し、以下の設定を行ってから Add ボタンをクリックして確定します:

  • Topic:転送先トピックを設定します。ここでは例として "a/1" とします。

  • QoS:再パブリッシュするメッセージの QoS を設定します。例では "0"

  • Retain:このメッセージをリテインメッセージとして転送するかどうかを設定します。本チュートリアルではデフォルトの false のままにします。

  • Payload${payload} と入力し、再パブリッシュされるメッセージのペイロードが元のメッセージと同じであることを示します。

  • MQTT 5.0 メッセージプロパティ:トグルスイッチをクリックして必要に応じてユーザープロパティや MQTT プロパティを設定します。これにより、再パブリッシュされるメッセージに豊富なメタデータを付加できます。

    • Payload Format Indicator:メッセージのペイロードが特定の形式かどうかを示す値を入力します。false の場合は未定義のバイト列、true の場合は UTF-8 エンコードされた文字データとみなされます。これにより MQTT クライアントやサーバーがメッセージ内容を効率的に解析できます。

    • Message Expiry Interval:メッセージが配信されなかった場合に無効とみなすまでの有効期限(秒単位)を指定します。

    • Content Type:再パブリッシュメッセージのペイロード内容のタイプやフォーマット(MIME タイプ)を指定します。例:text/plain(テキストファイル)、audio/aac(音声ファイル)、application/json(JSON 形式のアプリケーションメッセージ)など。

    • Response Topic:レスポンスメッセージをパブリッシュする特定の MQTT トピックを指定します。例:response/my_device

    • Correlation Data:レスポンスメッセージを元のリクエストメッセージと関連付けるための一意識別子やデータを入力します。例:リクエストIDやトランザクションIDなど。

  • Direct Dispatch:トグルスイッチで直接ディスパッチを有効・無効にします。有効にすると、メッセージがサブスクライバーに直接ディスパッチされ、追加のルール発動や同一ルールの再帰的な起動を防止します。

Create Rule ページで下部の Create ボタンをクリックしてルール作成を完了します。このルールは Rule ページに新規エントリーとして追加されます。

TIP

再パブリッシュアクションは元のメッセージの配信を妨げません。例えば、トピック "t/1" のメッセージはルールにより "a/1" に再パブリッシュされますが、同時に "t/1" のメッセージは "t/1" をサブスクライブしているクライアントにも配信されます。

コンソール出力アクションの追加

TIP

コンソール出力アクションはデバッグ目的でのみ使用してください。本番環境で使用するとパフォーマンス問題を引き起こす可能性があります。

コンソール出力アクションは、ルールの出力結果をコンソールやログファイルに表示するために使用します。

  • EMQX が console または foreground モードで起動している場合(Docker 環境ではデフォルトで foreground)、出力はコンソールに向けられます。
  • systemd 経由で起動している場合は、出力はジャーナルシステムにキャプチャされ、journalctl コマンドで確認可能です。

出力は以下の形式になります:

bash
[rule action] rule_id1
    Action Data: #{key1 => val1}
    Envs: #{key1 => val1, key2 => val2}

ここで、

  • [rule action] は再パブリッシュアクションが発動したルール ID です。
  • Action Data はルールの出力結果で、アクション実行時に渡されるデータやパラメータ(再パブリッシュアクション設定時のペイロード部分)を示します。
  • Envs は再パブリッシュ時に設定される環境変数で、データソースやアクション実行に関連する内部情報を含みます。

Sinks を使った転送アクションの追加

処理結果を Sinks を使って転送するアクションも追加可能です。ダッシュボードの Type of Action ドロップダウンリストから対象の Sink を選択するだけです。EMQX における各 Sink の詳細は データ統合 を参照してください。

ルールのテスト

ルールエンジンにはルールのテスト機能があり、シミュレーションデータや実際のクライアントデータを使ってルールをトリガーし、ルール SQL を実行し、ルールに追加されたすべてのアクションを実行して、各ステップの実行結果を取得できます。

ルールをテストすることで、期待通りに動作するか検証でき、問題があれば迅速に特定・解決できます。これにより開発効率が向上し、本番環境での障害発生を防止できます。

テスト手順

  1. Try It Out スイッチをオンにし、テスト対象として Rule を選択します。テスト開始前にルールを保存しておく必要があります。
  2. Start Test ボタンをクリックしてテストを開始します。ブラウザはルールがトリガーされるのを待ち、テスト結果を取得します。
  3. ルールをトリガーしてテストします。以下の2つの方法が利用可能です:
    • シミュレーションデータを使うInput Simulated Data ボタンをクリックし、ポップアップで SQL に合致する Data Source を選択し、ルールの指定ソース(FROM 句)と一致していることを確認します。EMQX はすべてのフィールド(Client IDUsernameTopicQoSPayload など)にデフォルト値を提供します。必要に応じて修正し、Submit Test ボタンをクリックして一度だけルールをトリガーします。
    • 実際のデバイスデータを使う:ページを開いたままにして、実際のクライアントや MQTT クライアントツールで EMQX に接続し、対応するイベントを発生させてテストします。
  4. テスト結果を確認します。ルールがトリガーされると、ダッシュボードに実行結果が出力され、各ステップの詳細な実行結果が表示されます。

テスト例

MQTTX を使って再パブリッシュアクション付きのルールをテストできます。クライアントを1つ作成し、そのクライアントでトピック a/1 をサブスクライブし、トピック t/1 にメッセージを送信します。ダイアログボックスに、このメッセージがトピック a/1 に再パブリッシュされたことが表示されます。

MQTTX クライアントツールと EMQX の接続方法の詳細は MQTTX - Get Started を参照してください。

MQTTX テスト例

対応して、ダッシュボードのテストインターフェースにはルール全体の実行結果が表示され、以下の内容が含まれます:

  • 左側にはルールの実行記録が表示されます。ルールがトリガーされるたびに記録が生成され、クリックすると対応するメッセージやイベントの詳細に切り替わります。
  • 右側には選択したルールのアクション記録リストが表示され、クリックするとアクションの実行結果やログを展開して確認できます。

ルール SQL またはいずれかのアクションの実行に失敗すると、そのルールの記録全体が失敗としてマークされます。記録を選択すると該当アクションのエラー情報を確認でき、トラブルシューティングに役立ちます。

ルールテスト結果

上記例では、ルールは4回トリガーされ、3回は完全に成功しています。4回目は HTTP Server アクションの実行失敗により失敗し、エラー理由は 302 ステータスコードのレスポンスでした。

ルールテストの詳細な使い方はブログ Enhancing Data Integration Stability: A Guide on EMQX Platform E2E Rule Testing を参照してください。

ルールの確認

Rules ページには作成済みのすべてのルールが一覧表示されます。

一覧の各エントリーには、ルール ID、関連するソース、有効状態、アクション数などの基本情報が表示されます。ソースにカーソルを合わせると対応する SQL ステートメントの詳細が表示されます。ルールの設定を変更するには、Actions 列の Settings をクリックします。More ボタンからはルールの複製や削除も可能です。

view_rules

また、FlowデザイナーIntegration -> Flow Designer からもルールを確認できます。Rules ページで作成したルールと Flowデザイナーで作成したルールは完全に相互運用可能です。

ルールの統計情報やアクションの実行情報を確認するには、Rules ページのルール ID または Flows ページのルール名をクリックします。

view_rules_flows

TIP

ルールアクションを更新したりデータソースを再定義した場合、以下のページに表示される統計情報はリセットされて再集計が開始されます。

Rule Statistics

ルールの検索

ルールが多数ある場合は、フィルターを使って表示したいルールを絞り込めます。ルール ID、受信メッセージのトピックまたはワイルドカード、有効状態、ルールメモ、ルールに関連付けられたアクションやソースでフィルター可能です。

search_rules

アクション(Sink)およびソースの確認

Rule ページの Actions (Sink)Sources タブには、作成済みのすべてのアクション(Sink)とソースが表示されます。名前、接続状態、関連ルール、有効状態、作成日時、最終更新日時などの重要な情報が含まれます。列名横の矢印をクリックすると並べ替えも可能です。

Enable 列のトグルスイッチをクリックすると、Sink またはソースの有効・無効を切り替えられます。Associated Rules 列の View Rules をクリックすると、その Sink またはソースを含むルール一覧が表示され、データ統合設定の管理が容易になります。

Action 列からは Sink またはソースの再接続や設定変更が可能です。More からは Sink またはソースの削除や、それを使った新規ルール作成が行えます。

多数の Sink またはソースがある場合は、名前、状態、有効状態でフィルターして表示を絞り込めます。

view_sink_source

Sink またはソースの統計情報やレート指標を確認するには、名前をクリックします。

action_statistics