Skip to content

ルールの作成

このページでは、EMQXダッシュボードを使用してデータ処理のルールを作成し、ルールにアクションを追加する方法について説明します。また、ルールのテスト方法や作成後のルールの確認方法も紹介します。

本ページのデモでは、リパブリッシュアクションを例に、トピック t/# で受信したメッセージを処理し、トピック a/1 にリパブリッシュするルールの作成方法を説明します。ただし、アクションの追加では「結果をコンソールに出力する」や「Sinksによる転送」も紹介しています。

ルールSQLの定義

EMQXダッシュボードにログインし、左側のナビゲーションメニューから Integration -> Rules をクリックします。

Rules ページで Create ボタンをクリックすると、Create Rule ページに遷移します。ここでルールのデータソースを定義し、フィルタリングされたメッセージに対して実行するアクションを決定します。

ルール名を入力し、将来の管理を容易にするためにメモを追加します。SQL Editor では、ビジネスニーズに合わせてデータソースを追加するSQL文をカスタマイズできます。このチュートリアルでは、デフォルト設定のまま、トピック "t/#" にマッチするすべてのメッセージ(例:t/at/a/bt/a/b/c など)を選択して返すSQLを使用します。

TIP

本チュートリアルではメッセージのペイロードがJSON形式であることを前提としています。ペイロードが他の形式の場合は、スキーマレジストリを使ってデータ型を変換できます。

EMQXには豊富なSQL文のサンプルが組み込まれており、SQL Editor 下の SQL Examples ボタンから参照可能です。SQLの構文や使い方の詳細はSQL Syntaxを参照してください。

ルール作成画面

SQLジェネレーター

EMQX 5.10.0以降、SQL EditorはAIを活用した自然言語によるSQL生成機能(SQLジェネレーター)をサポートしています。この機能により、自然言語で意図を記述すると、システムが適切なSQL文を自動生成します。

SQLジェネレーターはデフォルトで有効です。ダッシュボード右上の Settings メニューからトグルスイッチで無効化できます。

利用手順は以下の通りです:

  1. Create Rule ページの SQL Editor セクションに移動します。

  2. エディタ下の SQL Generator ボタンをクリックして Generate SQL with AI ダイアログを開き、以下の項目を指定します:

    • Task Description(必須):SQLに実行させたい内容を自然言語で記述します。

      例: 「MQTTメッセージのメタデータから clientid を抽出し、ペイロードから device_idtemperature を抽出します。トピック sensors/temperature のメッセージで温度が30より大きい場合のみ適用します。」

    • Related Topics(任意):sensors/temperature のようなトピックフィルターを指定します。

    • Input Example (JSON)(任意推奨):AIがデータ構造を理解しやすいように、サンプルのMQTTメッセージペイロードを提供します。 例:

      json
      {
        "device_id": "sensor001",
        "temperature": 32.5,
        "unit": "C"
      }
    • Output Example (JSON)(任意):期待する出力形式を指定します。 例:

      json
      {
        "clientid": "client_a1b2c3",
        "device_id": "sensor001",
        "temperature": 32.5
      }

      TIP

      入力・出力例を含めると生成されるSQLの精度が向上します。

  3. Generate をクリックして生成されたSQLをプレビューします。

  4. プレビュー画面では、

    • 生成されたSQLを確認・手動編集できます。
    • Apply SQL をクリックするとSQL Editorに挿入されます。
    • Back to Form をクリックすると入力内容を修正して再生成できます。
  5. SQL Editorに挿入すると、SQLが自動的にエディタに表示され、確認・編集が可能です。

生成例

上記のタスクと入力例を用いると、生成されるSQLは以下のようになります:

sql
SELECT
  clientid,
  payload.device_id AS device_id,
  payload.temperature AS temperature
FROM
  "sensors/temperature"
WHERE
  payload.temperature > 30

このルールは、トピック sensors/temperature のメッセージから clientiddevice_idtemperature を抽出し、温度が30より大きい場合に適用されます。

SQLジェネレーターの利用シーン

SQLジェネレーターは以下のような場合に特に有用です:

  • EMQXのSQL構文に不慣れな場合
  • ルールのプロトタイプを素早く作成したい場合
  • 構造化されたJSONペイロードを扱う場合

より詳細なカスタマイズや構文オプションは、Rule SQL Syntaxを参照してください。

SQL文のテスト

SQL文はシミュレートデータを使って実行テストできます。アクションを追加してルールを作成する前に、SQLの実行結果が期待通りか確認することが可能です。このステップは任意ですが、EMQXルールに不慣れな場合は推奨されます。ルール全体の実行テストを行いたい場合は、ルールのテストを参照してください。

SQL文のテスト手順は以下の通りです:

  1. Create Rule ページで Try It Out トグルスイッチをオンにしてSQL文のテストを有効化します。
  2. SQLに対応する Data Source を選択し、ルールの指定ソース(FROM句)と一致していることを確認します。
  3. テストデータを入力します。データソースを選択すると、EMQXは Client IDUsernameTopicQoSPayload などのすべてのシミュレートデータフィールドにデフォルト値を提供します。必要に応じて適切な値に変更してください。
  4. Run Test ボタンをクリックしてテストを実行します。正常に動作すれば Test Passed のメッセージが表示されます。

test-sql

SQLの処理結果は Output Result セクションにJSON形式で表示されます。SQL処理結果のすべてのフィールドは、その後のアクション(組み込みアクションやSink)で ${key} の形式で参照可能です。フィールドの詳細はSQLデータソースとフィールドを参照してください。

このデモではペイロードがJSON形式であることを前提としていますが、実際の利用ではスキーマレジストリを用いて他形式のメッセージも処理可能です。

次に、Create Rule ページ右側の Add Action ボタンをクリックして、ルールにさまざまなタイプのアクションを追加できます。

アクションの追加

Create Rule ページで右側の Add Action ボタンをクリックすると、Add Action ページが表示されます。Action ドロップダウンリストから、リパブリッシュ、コンソール出力、データブリッジによる転送の3種類のアクションから選択可能です。

アクション追加画面

リパブリッシュアクションの追加

このセクションでは、トピック t/# で受信した元のメッセージを別のトピック a/1 にリパブリッシュするアクションの追加方法を説明します。

Add Action ページで Type of Action ドロップダウンメニューから Republish を選択し、以下の設定を行ってから Add ボタンをクリックして確定します:

  • Topic:転送先トピックを設定します。例では "a/1"

  • QoS:リパブリッシュするメッセージのQoSを設定します。例では "0"

  • Retain:このメッセージをリテインメッセージとして転送するか設定します。本チュートリアルではデフォルトの false のままにします。

  • Payload${payload} と入力し、リパブリッシュするメッセージのペイロードは元のメッセージと同じで変更しないことを示します。

  • MQTT 5.0 メッセージプロパティ:トグルスイッチをクリックして、必要に応じてユーザープロパティやMQTTプロパティを設定します。これにより、リパブリッシュするメッセージにリッチなメタデータを付加できます。

    • Payload Format Indicator:メッセージのペイロードが特定の形式かどうかを示す値を入力します。false の場合は未定義のバイト列とみなされ、true の場合はUTF-8エンコードされた文字データであることを示します。これによりMQTTクライアントやサーバーはメッセージ本文の解析を効率化できます。

    • Message Expiry Interval:メッセージが配信されなかった場合に無効とみなすまでの有効期限(秒)を指定します。

    • Content Type:リパブリッシュメッセージのペイロードの種類や形式(MIMEタイプ)を指定します。例:text/plain(テキストファイル)、audio/aac(音声ファイル)、application/json(JSON形式のアプリケーションメッセージ)など。

    • Response Topic:レスポンスメッセージをパブリッシュするMQTTトピックを指定します。例:レスポンスを "response/my_device" トピックに送る場合は response/my_device と入力します。

    • Correlation Data:レスポンスメッセージと元のリクエストメッセージを関連付けるための一意の識別子やデータを入力します。例:リクエストID、トランザクションIDなど。

  • Direct Dispatch:トグルスイッチで直接ディスパッチを有効または無効にします。有効にすると、メッセージはサブスクライバーに直接配信され、追加のルール発動や同一ルールの再帰的な発火を防止します。

Create Rule ページで下部の Create ボタンをクリックすると、ルールの作成が完了し、Rules ページに新しいエントリとして追加されます。

TIP

リパブリッシュアクションは元のメッセージの配信を妨げません。例えば、ルールによりトピック "t/1" のメッセージが "a/1" にリパブリッシュされても、元の "t/1" メッセージは "t/1" をサブスクライブしているクライアントに引き続き配信されます。

コンソール出力アクションの追加

TIP

コンソール出力アクションはデバッグ目的でのみ使用してください。本番環境で使用するとパフォーマンス問題を引き起こす可能性があります。

コンソール出力アクションはルールの出力結果をコンソールやログファイルに表示するために使用します。

  • EMQXが console または foreground モードで起動された場合(Docker環境のデフォルトは foreground)、出力はコンソールに送られます。
  • systemd経由で起動された場合は、出力はジャーナルシステムにキャプチャされ、journalctl コマンドで確認可能です。

出力は以下の形式になります:

bash
[rule action] rule_id1
    Action Data: #{key1 => val1}
    Envs: #{key1 => val1, key2 => val2}

ここで

  • [rule action] はリパブリッシュアクションが発動したルールIDです。
  • Action Data はルールの出力結果で、アクション実行時に渡されるデータやパラメータ、つまりリパブリッシュアクション設定時のペイロード部分を示します。
  • Envs はリパブリッシュ時に設定される環境変数で、データソースやアクション実行に関連する内部情報を含みます。

Sinksによる転送アクションの追加

処理結果をSinksを使って転送するアクションも追加可能です。ダッシュボードの Type of Action ドロップダウンリストから対象のSinkを選択するだけです。EMQXの各Sinkの詳細はデータ統合を参照してください。

ルールのテスト

ルールエンジンはルールテスト機能を提供しており、シミュレートデータや実際のクライアントデータを使ってルールをトリガーし、ルールSQLを実行し、ルールに追加されたすべてのアクションを実行して各ステップの結果を取得できます。

ルールをテストすることで、ルールが期待通りに動作するか検証し、問題を素早く特定・解決できます。これにより開発効率が向上し、本番環境での失敗を防止できます。

テスト手順

  1. Try It Out スイッチをオンにし、テスト対象として Rule を選択します。テスト開始前にルールを保存しておく必要があります。
  2. Start Test ボタンをクリックしてテストを開始します。ブラウザはルールがトリガーされるのを待ち、テスト結果を生成します。
  3. ルールをトリガーしてテストします。以下の2つの方法が利用可能です:
    • シミュレートデータを使うInput Simulated Data ボタンをクリックし、ポップアップでSQLに対応する Data Source を選択し、ルールの指定ソース(FROM句)と一致していることを確認します。EMQXはすべてのフィールド(Client IDUsernameTopicQoSPayload など)にデフォルト値を提供します。必要に応じて修正し、Submit Test ボタンを押して一度だけルールをトリガーします。
    • 実際のデバイスデータを使う:現在のページを開いたままにし、実際のクライアントやMQTTクライアントツールでEMQXに接続し、対応するイベントを発生させてテストします。
  4. テスト結果を確認します。ルールがトリガーされると、実行結果がダッシュボードに出力され、各ステップの詳細な実行結果が表示されます。

テスト例

MQTTXを使ってリパブリッシュアクションのルールをテストできます。1つのクライアントを作成し、そのクライアントで a/1 トピックをサブスクライブし、t/1 メッセージを送信します。ダイアログボックスにこのメッセージが a/1 トピックにリパブリッシュされていることが表示されます。

MQTTXクライアントツールとEMQXの接続方法の詳細は、MQTTX - はじめにを参照してください。

MQTTXによるテスト例

対応して、ダッシュボードのテストインターフェースにはルール全体の実行結果が表示され、以下の内容が含まれます:

  • 左側にはルールの実行記録が表示されます。ルールがトリガーされるたびに記録が生成され、クリックすると対応するメッセージやイベントの詳細に切り替えられます。
  • 右側には選択したルールのアクション実行記録が表示され、クリックするとアクションの実行結果やログを展開して確認できます。

ルールSQLやいずれかのアクションの実行に失敗した場合、該当のルール記録は失敗としてマークされます。記録を選択すると対応するアクションのエラー情報を確認でき、トラブルシューティングに役立ちます。

ルールテスト結果

上記の例では、ルールは4回トリガーされ、3回は完全に成功しています。4回目は HTTP Server アクションの実行失敗により失敗し、エラー理由は302ステータスコードのレスポンスでした。

ルールテストの詳細な利用ガイドはブログデータ統合の安定性向上:EMQXプラットフォームE2Eルールテストガイドを参照してください。

ルールの確認

Rules ページでは、作成したすべてのルールの一覧を包括的に確認できます。

一覧の各エントリには、ルールID、関連するソース、有効状態、アクション数などの基本情報が表示されます。ソースにマウスを重ねると対応するSQL文の詳細が表示されます。ルールの設定を変更するには、Actions 列の Settings をクリックします。また、More ボタンからルールの複製や削除も可能です。

view_rules

また、FlowデザイナーIntegration -> Flow Designer からもルールを確認できます。Rules ページで作成したルールとFlowデザイナーで作成したルールは完全に相互運用可能です。

ルールの統計情報やアクション実行情報を確認するには、Rules ページのルールIDまたは Flows ページのルール名をクリックします。

view_rules_flows

TIP

ルールアクションを更新したりデータソースを再定義すると、以下のページに表示される統計情報はリセットされて再集計が開始されます。

ルール統計

ルールの検索

ルールが多数ある場合は、フィルターを使って検索対象を絞り込み、表示したいルールだけを表示できます。ルールID、受信メッセージのトピックやワイルドカード、有効状態、ルールメモ、ルールに関連するアクションやソースでフィルター可能です。

search_rules

アクション(Sink)とソースの確認

Rule ページの Actions (Sink)Sources タブには、作成済みのすべてのアクション(Sink)とソースが表示されます。名前、接続状態、関連ルール、有効状態、作成日時、最終更新日時などの重要な情報が含まれます。列名横の矢印をクリックするとソート可能です。

Enable 列のトグルスイッチをクリックすると、Sinkやソースの有効・無効を切り替えられます。Associated Rules 列の View Rules をクリックすると、そのSinkやソースを含むルール一覧が表示され、データ統合設定の管理が容易になります。

Action 列からはSinkやソースの再接続や設定変更が可能です。More からはSinkやソースの削除や、それらを利用した新規ルール作成も行えます。

多数のSinkやソースがある場合は、フィルターで検索対象を絞り込み、表示したいエントリだけを表示可能です。名前、状態、有効状態でフィルターできます。

view_sink_source

Sinkやソースの統計情報やレート指標を確認するには、該当の名前をクリックします。

action_statistics