Skip to content

Rule SQL リファレンス

EMQX のルールでは、データの抽出、フィルタリング、エンリッチメント、および変換のために SQL ベースの構文を使用します。この SQL ライクな構文には、SELECTFOREACH の2種類のステートメントがあります。

ステートメント説明
SELECTSQL ステートメントの結果が単一のメッセージとなる場合に使用します。
FOREACH単一の入力メッセージからゼロ個以上のメッセージを生成する場合に使用します。

各ルールは正確に1つのステートメントを持つことができます。SQL ステートメントは豊富な組み込み関数を提供しており、簡単な変換やタイムスタンプの作成などが可能です。

また、SQL ステートメントは式の中に jq プログラム を埋め込むことをサポートしており、必要に応じて複雑なデータ変換を行うことができます。式は SELECT および FOREACH ステートメント内に埋め込むことができます。SELECT および FOREACH ステートメントで参照可能なフィールドについては、データソースとフィールドを参照してください。

データ統合アーキテクチャ

SELECT ステートメント

SELECT ステートメントは、入力メッセージから特定のフィールドを選択し、フィールド名の変更、データの変換、条件に基づくメッセージのフィルタリングを行います。

ルールエンジンの SQL における SELECT ステートメントの基本形式は以下の通りです。

sql
SELECT <fields_expressions> FROM <topic> [WHERE <conditions>]

SELECT 句で出力に含めるフィールド(メッセージのペイロードおよびメタデータの両方)を指定し、WHERE 句で特定の条件に基づいてメッセージをフィルタリングできます。

FROM

FROM 句はクエリのデータソースを指定するために使用します。特定のトピックや条件に合致するイベントからデータを選択できます。

トピックによる選択

例えば、トピックパターン t/#my/other/topic にパブリッシュされたすべてのメッセージに適用されるルールを定義したい場合、以下のように記述します。

sql
SELECT clientid, payload.clientid as myclientid FROM "t/#", "my/other/topic"

ここで、

  • SELECT 句は出力に含めるフィールドを指定します。

    • clientid: メタデータ内のクライアントID

    • payload.clientid: メッセージペイロード内のクライアントID。ペイロード内のすべてのフィールドは payload の下に格納されています。

      • as 構文は payload.clientid フィールドの名前を myclientid に変更します。

イベントによる選択

ルールをイベントに紐付けることも可能です。例えば、クライアント c1 が EMQX への接続を開始したときに IP アドレスとポート番号を取得したい場合、以下のように記述します。

sql
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'

TIP

利用可能なすべてのイベントは EMQX ダッシュボードのルール編集画面の Events タブで確認できます。

WHERE

WHERE 句はオプションで、FROM 句で指定したトピックやイベントのフィルタに加えて、メッセージが満たすべき追加条件を指定し、メッセージの絞り込みを行います。

例えば、トピック t/# のメッセージのうち、ユーザー名が eric のものだけをフィルタリングする場合は以下のように記述します。

sql
SELECT * FROM "t/#" WHERE username = 'eric'

TIP

WHERE 句で使用するフィールドは、メッセージのメタデータまたはペイロード内に存在するフィールドでなければなりません。存在しない場合はエラーになります。

式の利用

SELECT および WHERE 句でデータ変換に使用できます。例えば、以下の SQL 文は clientid フィールドの値を大文字に変換し、サフィックスを追加して、出力メッセージのフィールド名を cid としています。

sql
SELECT (upper(clientid) + '_UPPERCASE_LETTERS') as cid FROM "t/#"

次の例は括弧付きの算術式を使ったデータ変換の例です。

sql
SELECT (payload.integer_field + 2) * 2 as num FROM "t/#"

複雑な構造を持つペイロードのフィールドにドット記法でアクセスすることも可能です(ペイロードが JSON 形式であることが前提です)。

sql
SELECT payload.a.b.c.deep as my_field FROM "t/#"

以下の例は、WHERE 句で等価演算子(=)を使ってフィールドの値を判定する例です。SELECT * はすべてのメタデータとペイロードを出力メッセージに転送します。

sql
SELECT * FROM "t/#" WHERE payload.x.y = 1

WHERE 句では and および or 演算子を使って複雑なブール式を作成できます。

sql
SELECT * FROM "t/#" WHERE payload.name = "sensor_1" and payload.temprature > 39

FOREACH ステートメント

FOREACH ステートメントは SELECT ステートメントのより一般的な形と考えられます。各入力メッセージに対してゼロ個以上の出力メッセージを生成できます。特定の条件に基づいてデータをフィルタリングし、結果を MQTT トピックやデータブリッジに出力する際に使用します。

ルールエンジン SQL における FOREACH ステートメントの基本形式は以下の通りです。

sql
FOREACH <expression_that_evaluates_to_array> [as <name>]
[DO <fields_expressions>]
[INCASE <condition>]
FROM <topic>
[WHERE <condition>]

FOREACH ステートメントは、入力メッセージから配列を作成する FOREACH 句で始まります。FROM および WHERE 句は SELECT ステートメントの同名句と同様の目的で機能します。FOREACH ステートメントには FOREACHFROMWHERE 句のほかに、以下の2つのオプション句があります。

必須/任意説明
DO任意FOREACH で選択した配列の各要素を変換します。

SELECT ステートメントの SELECT 句に対応し、同じ式を受け入れます。
INCASE任意指定した条件に合致しない配列要素をフィルタリングします。

WHERE 句と同じ式を受け入れます。

TIP

FOREACH 句以外はすべて SELECT ステートメントの対応する句と同じ意味を持つため、FOREACH ステートメントは SELECT ステートメントの一般化とみなせます。以下の2つのステートメントは等価です(jq('.', payload) はペイロードを配列でラップしています)。

sql
FOREACH jq('.', payload) as it
DO it.field_1, it.field_2 
FROM "t/#"
sql
SELECT payload.field_1, payload.field_2
FROM "t/#"

ネストした FOREACH ループはサポートされていません。この制限を回避するには、異なるレベルやデータセットを処理する2つの Republish アクションを設定し、ネストされた FOREACH ループの問題を回避できます。

FOREACH 句の as 構文は配列要素に名前を付けるために使われ、DO 句内で「現在の」要素を参照しやすくします。as name 部分を省略した場合、デフォルトで item という名前が使われます。

以下は、FOREACH ステートメントで2つの値を出力する例です。両方の値は value という1つのフィールドのみを含み、その値はそれぞれメッセージの field_1field_2 の値です。

sql
FOREACH jq('[.field_1, .field_2]', payload) 
DO item as value
FROM "t/#"

FOREACH ステートメントは入力データが配列形式であることを要求します。入力メッセージがすでに配列を含む場合は、直接 FOREACH ステートメントを適用できます。

例えば、トピック t/# にパブリッシュされたメッセージで、センサーの idx が1以上の場合にタイムスタンプ、クライアントID、センサー名、インデックスを出力したい場合、以下のように記述します。

sql
FOREACH
    payload.sensors as sensor  
DO
    timestamp,
    clientid,
    upper(sensor.name) as name,
    sensor.idx as idx
INCASE
    sensor.idx >= 1
FROM "t/#"

ここで、

  • FOREACH 句は入力メッセージのペイロード内の sensors フィールドを配列として指定し、配列要素に sensor という名前を付けています。
  • DO 句は出力に含めるフィールドを指定しています。
    • timestamp は入力メッセージのメタデータからのタイムスタンプです。
    • clientid は入力メッセージのメタデータからのクライアントIDです。
    • sensor.name は組み込み関数 upper で大文字化され、as name で名前を変更しています。ここでの sensorFOREACH 句で指定された配列の現在の要素を指します。
    • sensor.idxas idx で名前を変更しています。
  • INCASE 句は追加のフィルタ条件を指定し、idx フィールドの値が1以上のセンサーのみを対象とします。
  • FROM 句はトピックパターン t/# にマッチするメッセージを対象としています。

ルール作成後は、本番環境に投入する前に必ずテストすることを推奨します。ダッシュボードの UI にはサンプルメッセージでルールをテストできる機能があります。SQL ステートメントのテスト方法の詳細は ルールのテスト を参照してください。上記のルールは以下の JSON 形式のペイロードを入力としてテストできます。

json
{"sensors": [
    {"idx":0, "name":"t0"},
    {"idx":1, "name":"t1"},
    {"idx":2, "name":"t2"}
  ]
}

入力メッセージに配列が含まれていない場合は、jq 関数を使ってペイロードを配列でラップできます。例えば以下のように記述します。

sql
FOREACH jq('.', payload) 
DO item.field_1, item.field_2 
FROM "t/#"

EMQX は高度な変換のために jq 関数の使用をサポートしています。詳細なコード例は 組み込み jq 関数 を参照してください。

式と演算

EMQX のルール構文では、データ変換やメッセージのフィルタリングのために式を使用できます。これらの式は SELECTFOREACHDOINCASEWHERE などの句で利用可能です。このセクションでは式の使用方法について説明します。以下は式を構成するために使用できる演算子の一覧です。また、組み込み関数も多数利用可能です。

算術演算

演算子用途戻り値
+加算、または文字列の連結合計、または連結された文字列
-減算差分
*乗算
/除算
div整数除算整数の商
mod剰余剰余

論理演算

演算子用途戻り値
>より大きいtrue/false
<より小さいtrue/false
<=以下true/false
>=以上true/false
<>等しくないtrue/false
!=等しくないtrue/false
=2つのオペランドが完全に等しいかを判定。値の比較に使用可能true/false
=~トピックがトピックフィルターにマッチするか判定。トピックマッチング専用true/false
and論理積(AND)true/false
or論理和(OR)true/false

CASE 式

CASE 式は条件付きの処理を行うために使用できます。CASE 式は他の言語の if-then-else 文に相当します。以下の例で使い方を示します。

sql
SELECT
  CASE WHEN payload.x < 0 THEN 0
       WHEN payload.x > 7 THEN 7
       ELSE payload.x
  END as x
FROM "t/#"

メッセージが以下の場合、

json
{"x": 8}

出力は以下のようになります。

json
{"x": 7}

さらに例

SELECT ステートメントの例

  • トピック "t/a" のメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/a"
  • トピック "t/a" または "t/b" のメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/a","t/b"
  • トピックが 't/#' にマッチするメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/#"
  • トピックが 't/#' にマッチするメッセージから qosusernameclientid フィールドを抽出する(出力メッセージのペイロードにこれらのフィールドが含まれます)。

    sql
    SELECT qos, username, clientid FROM "t/#"
  • ペイロードに username フィールドがあり、その値が 'Steven' のメッセージから username フィールドを抽出する(FROM 句で # を使うのは全メッセージを対象にルールを評価するため推奨されません)。

    sql
    SELECT username FROM "#" WHERE username='Steven'
  • 入力メッセージのペイロードから x フィールドを抽出し、出力メッセージで y に名前を変更する。yWHERE 句でも使用可能。この SQL 文はペイロードが {"x": 1} のメッセージにマッチし、{"x": 2} にはマッチしません。

    sql
    SELECT payload.x as x FROM "tests/test_topic_1" WHERE y = 1
  • ペイロードが {"x": {"y": 1}} のメッセージ(例:{"x": {"y": 1}, "other": "field"} も含む)にマッチする。

    sql
    SELECT * FROM "#" WHERE payload.x.y = 1
  • クライアントIDが 'c1' の MQTT クライアントが接続した場合、そのソース IP アドレスとポート番号を抽出する。

    sql
    SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
  • トピックが 't/topic' にマッチし、QoS レベルが 1 のすべてのサブスクリプションにマッチし、clientid を抽出する。

    sql
    SELECT clientid FROM "$events/session_subscribed" WHERE topic = 'my/topic' and qos = 1
  • 上記の例と似ていますが、トピックマッチ演算子 =~ を使ってトピックフィルター 't/#' にマッチさせる。

    sql
    SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
  • キーが "foo" のユーザープロパティを抽出する(ユーザープロパティは MQTT 5.0 の新機能で、古い MQTT バージョンには該当しません)。

    sql
    SELECT pub_props.'User-Property'.foo as foo FROM "t/#"

TIP

  • FROM 句のトピックはダブルクォーテーション("")またはシングルクォーテーション('')で囲む必要があります。
  • WHERE 句の条件で文字列を使う場合はシングルクォーテーション('')で囲みます。
  • FROM 句に複数のトピックがある場合はカンマ(,)で区切ります。例:SELECT * FROM "t/1", "t/2"
  • ペイロードの内部フィールドにはドット記法(.)でアクセス可能です。例:ネストされた JSON なら payload.outer_field.inner_field のように指定します。
  • ペイロードに対してエイリアスを作成するのはパフォーマンスに影響するため避けてください。例:SELECT payload as p は推奨されません。
  • 一部のエスケープシーケンスは使用時にアンエスケープが必要です。詳細は unescape 関数 を参照してください。

FOREACH ステートメントの例

クライアントID c_steve のメッセージがトピック t/1 に届き、メッセージ本文は JSON 形式で、sensors フィールドが複数のオブジェクトを含む配列であるとします。例:

json
{
    "date": "2020-04-24",
    "sensors": [
        {"name": "a", "idx":0},
        {"name": "b", "idx":1},
        {"name": "c", "idx":2}
    ]
}

例1

sensors 配列内の各オブジェクトを、オブジェクトの idx を使ったトピック sensors/${idx} に、内容を name として再パブリッシュしたい場合。上記の入力例では、ルールエンジンは以下の3つのメッセージを発行します。

  1. トピック: sensors/0
    内容: a
  2. トピック: sensors/1
    内容: b
  3. トピック: sensors/2
    内容: c

このルールのアクション設定は以下の通りです。

  • アクションタイプ: メッセージ再パブリッシュ
  • ターゲットトピック: sensors/${idx}
  • ターゲット QoS: 2
  • メッセージ内容テンプレート: ${name}

SQL ステートメントは以下の通りです。

sql
FOREACH
    payload.sensors
FROM "t/#"

この SQL 文の FOREACH 句は配列 sensors を指定しています。FOREACH ステートメントは結果配列の各オブジェクトに対して「メッセージ再パブリッシュ」アクションを実行するため、3回の再パブリッシュが行われます。

例2

sensors 配列のうち、idx フィールドの値が1以上のオブジェクトだけを対象に、トピック sensors/${idx} に以下の内容で再パブリッシュしたい場合。

内容: clientid=${clientid},name=${name},date=${date}

上記の入力例では、idx が0の要素は除外されるため、2つのメッセージが発行されます。

  1. トピック: sensors/1
    内容: clientid=c_steve,name=b,date=2023-04-24
  2. トピック: sensors/2
    内容: clientid=c_steve,name=c,date=2023-04-24

このルールのアクション設定は以下の通りです。

  • アクションタイプ: メッセージ再パブリッシュ
  • ターゲットトピック: sensors/${idx}
  • ターゲット QoS: 2
  • メッセージ内容テンプレート: clientid=${clientid},name=${name},date=${date}

SQL ステートメントは以下の通りです。

sql
FOREACH
    payload.sensors
DO
    clientid,
    item.name as name,
    item.idx as idx
INCASE
    item.idx >= 1
FROM "t/#"

この SQL 文の説明:

  • FOREACH 句は配列 sensors を指定しています。
  • DO 句は各要素に必要なフィールドを選択しています。clientid はメッセージのメタデータから、nameidx は現在のセンサーオブジェクトから取得します。itemsensors 配列の現在の要素を表します。
  • INCASE 句は配列要素のフィルタ条件を指定し、条件に合わない要素は無視されます。

DO および INCASE 句では、現在のオブジェクトを item で参照できますが、FOREACH 句の as 構文で変数名をカスタマイズすることもできます。したがって、上記の SQL は以下のようにも書けます。

sql
FOREACH
    payload.sensors as s
DO
    clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

例3

例2を拡張し、clientid フィールドの c_steve から c_ プレフィックスを取り除きたい場合。

ルールエンジンには FOREACHDOINCASE 句内で呼び出せる組み込み関数が多数あります。c_stevesteve に変換するには、例2の SQL を以下のように変更します。

sql
FOREACH
    payload.sensors as s
DO
    nth(2, tokens(clientid,'_')) as clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

複数の式を FOREACH 句に記述できますが、最後の式は配列を指定する必要があります。

例えば、入力メッセージのペイロードが以下のように構造化されている場合:

json
{
    "date": "2020-04-24",
    "data": {
        "sensors": [
            {"name": "a", "idx":0},
            {"name": "b", "idx":1},
            {"name": "c", "idx":2}
        ]
    }
}

FOREACH 句でペイロードの data に別名を付けてから配列を選択できます。

sql
FOREACH
    payload.data as d
    d.sensors as s
...

これは以下と同等です。

sql
FOREACH
    payload.data.sensors as s
...

この機能は複雑な構造のペイロードを扱う際に便利です。