Skip to content

Rule SQL リファレンス

EMQX のルールでは、データの抽出、フィルタリング、拡張、変換のために SQL ベースの構文を使用します。この SQL ライクな構文には、SELECTFOREACH の2種類のステートメントがあります。

ステートメント説明
SELECTSQL ステートメントの結果が単一のメッセージとなる場合に使用します。
FOREACH1つの入力メッセージからゼロ個以上のメッセージを生成する場合に使用します。

各ルールはちょうど1つのステートメントを持つことができます。SQL ステートメントは豊富な組み込み関数を提供しており、簡単な変換やタイムスタンプの作成などが可能です。

また、SQL ステートメントは式の中に jq プログラム を埋め込むことをサポートしており、必要に応じて複雑なデータ変換を行うことができます。式は SELECT および FOREACH ステートメント内に埋め込むことが可能です。SELECT および FOREACH ステートメントで参照可能なフィールドについては、データソースとフィールドを参照してください。

データ統合アーキテクチャ

SELECT ステートメント

SELECT ステートメントは、入力メッセージから特定のフィールドを選択し、フィールド名の変更やデータ変換、条件に基づくメッセージのフィルタリングを行います。

ルールエンジンSQLにおける SELECT ステートメントの基本形式は以下の通りです。

sql
SELECT <fields_expressions> FROM <topic> [WHERE <conditions>]

SELECT 句で出力に含めるフィールド(メッセージのペイロードおよびメタデータの両方)を指定し、WHERE 句で特定の条件に基づいてメッセージをフィルタリングできます。

FROM

FROM 句はクエリのデータソースを指定するために使用します。特定のトピックや条件に一致するイベントからデータを選択できます。

トピックによる選択

例えば、トピックパターン t/#my/other/topic にパブリッシュされたすべてのメッセージに適用されるルールを定義する場合、以下のように記述します。

sql
SELECT clientid, payload.clientid as myclientid FROM "t/#", "my/other/topic"

ここで、

  • SELECT 句は出力に含めるフィールドを指定しています。

    • clientid: メタデータのクライアントID

    • payload.clientid: メッセージペイロード内のクライアントID。ペイロード内のすべてのフィールドは payload の下に格納されています。

      • as 構文は payload.clientid フィールドを myclientid に名前変更しています。

イベントによる選択

ルールをイベントに紐づけることも可能です。例えば、クライアント c1 が EMQX に接続を開始した際に IP アドレスとポート番号を取得したい場合、以下のように記述します。

sql
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'

TIP

利用可能なすべてのイベントは EMQX ダッシュボードのルール編集画面の Events タブで確認できます。

WHERE

WHERE 句は任意で、FROM 句で指定したトピックやイベントのフィルタに加えて、メッセージが満たすべき追加条件を指定し、メッセージの絞り込みを行います。

例えば、トピック t/# のメッセージのうち、ユーザー名が eric のものだけをフィルタリングする場合は以下のように記述します。

sql
SELECT * FROM "t/#" WHERE username = 'eric'

TIP

WHERE 句で使用するフィールドは、メッセージのメタデータまたはペイロード内に存在するフィールドでなければなりません。そうでない場合はエラーになります。

式の利用

SELECTWHERE 句内でデータ変換に利用できます。例えば、以下の SQL ステートメントは clientid フィールドの値を大文字に変換し、接尾辞を追加して、出力メッセージのフィールド名を cid としています。

sql
SELECT (upper(clientid) + '_UPPERCASE_LETTERS') as cid FROM "t/#"

次の例は括弧で囲まれた算術式を使ったデータ変換の例です。

sql
SELECT (payload.integer_field + 2) * 2 as num FROM "t/#"

複雑な構造を持つペイロードのフィールドにドット表記でアクセスすることも可能です(ペイロードが JSON 形式であることが前提です)。

sql
SELECT payload.a.b.c.deep as my_field FROM "t/#"

以下のステートメントは、WHERE 句で等価演算子(=)を使い、特定の値を持つフィールドをテストする例です。SELECT * はメタデータとペイロードのすべてを出力メッセージに転送します。

sql
SELECT * FROM "t/#" WHERE payload.x.y = 1

WHERE 句では andor 演算子を使って複雑な論理式を作成できます。

sql
SELECT * FROM "t/#" WHERE payload.name = "sensor_1" and payload.temprature > 39

FOREACH ステートメント

FOREACH ステートメントは SELECT ステートメントのより一般的な形と考えられます。1つの入力メッセージからゼロ個以上の出力メッセージを生成できます。特定の条件でデータをフィルタリングし、結果を MQTT トピックやデータブリッジに出力する際に利用します。

ルールエンジンSQLにおける FOREACH ステートメントの基本形式は以下の通りです。

sql
FOREACH <expression_that_evaluates_to_array> [as <name>]
[DO <fields_expressions>]
[INCASE <condition>]
FROM <topic>
[WHERE <condition>]

FOREACH ステートメントは、配列を作成するための FOREACH 句で始まります。FROMWHERE 句は SELECT ステートメントの対応する句と同じ目的で動作します。FOREACH ステートメントには、FOREACHFROMWHERE 句に加えて、以下の2つのオプション句があります。

必須/任意説明
DO任意FOREACH で選択した配列の各要素を変換します。

SELECT ステートメントの SELECT 句に対応し、同じ式を受け入れます。
INCASE任意指定した条件に合致しない配列要素をフィルタリングします。

WHERE 句と同じ式を受け入れます。

TIP

FOREACH 句以外はすべて SELECT ステートメントの対応句が存在するため、FOREACH ステートメントは SELECT ステートメントの一般化と見なせます。以下の2つのステートメントは等価です(jq('.', payload) はペイロードを配列でラップしています)。

sql
FOREACH jq('.', payload) as it
DO it.field_1, it.field_2 
FROM "t/#"
sql
SELECT payload.field_1, payload.field_2
FROM "t/#"

なお、ネストした FOREACH ループはサポートされていません。この制限を回避するには、メッセージの異なるレベルやセットを処理する2つの Republish アクションを設定し、ネストした FOREACH ループの問題を回避できます。

FOREACH 句の as 構文は配列の要素に名前を付けるために使われ、DO 句内で「現在の」要素を参照しやすくします。as name 部分を省略した場合、デフォルトで item という名前が使用されます。

以下は、FOREACH ステートメントを使って2つの値を出力する例です。両方の値は value という1つのフィールドのみを持ち、その値はそれぞれメッセージの field_1field_2 の値です。

sql
FOREACH jq('[.field_1, .field_2]', payload) 
DO item as value
FROM "t/#"

FOREACH ステートメントは入力データが配列形式であることを要求します。入力メッセージにすでに配列が含まれている場合は、直接 FOREACH ステートメントを適用できます。

例えば、トピック t/# にパブリッシュされたメッセージで、センサーの idx が1以上の場合にタイムスタンプ、クライアントID、センサー名、インデックスを出力したい場合、以下のように記述します。

sql
FOREACH
    payload.sensors as sensor  
DO
    timestamp,
    clientid,
    upper(sensor.name) as name,
    sensor.idx as idx
INCASE
    sensor.idx >= 1
FROM "t/#"

ここで、

  • FOREACH 句は入力メッセージのペイロードの sensors フィールドを配列として指定し、配列要素に sensor という名前を付けています。
  • DO 句は出力に含めるフィールドを指定しています。
    • timestamp は入力メッセージのメタデータのタイムスタンプです。
    • clientid は入力メッセージのメタデータのクライアントIDです。
    • sensor.name は組み込み関数 upper で大文字化され、as namename に名前変更されます。ここでの sensorFOREACH 句で指定された配列の現在の要素を指します。
    • sensor.idxas idxidx に名前変更されます。
  • INCASE 句はフィルタ条件を追加し、idx フィールドの値が1以上のセンサーのみを対象とします。
  • FROM 句はトピックパターン t/# のメッセージを対象としています。

ルール作成後は、必ず本番環境に投入する前にルールのテストを行うことを推奨します。ダッシュボードの UI にはサンプルメッセージでルールをテストできる機能があります。SQL ステートメントのテスト方法については、ルールのテストを参照してください。上記のルールは以下の JSON 形式のペイロードを入力としてテストできます。

json
{"sensors": [
    {"idx":0, "name":"t0"},
    {"idx":1, "name":"t1"},
    {"idx":2, "name":"t2"}
  ]
}

入力メッセージに配列が含まれていない場合は、jq 関数を使ってペイロードを配列でラップできます。例えば以下のように記述します。

sql
FOREACH jq('.', payload) 
DO item.field_1, item.field_2 
FROM "t/#"

EMQX は高度な変換のために jq 関数の利用をサポートしています。詳細なコード例は組み込みの jq 関数を参照してください。

式と演算

EMQX のルール構文では、データ変換やメッセージのフィルタリングのために式を使用できます。これらの式は SELECTFOREACHDOINCASEWHERE などの句で利用可能です。この節では式の利用方法について説明します。以下は式を構成するために使える演算子であり、さらに多くの組み込み関数も利用可能です。

算術演算

演算子用途戻り値
+加算、または文字列の連結合計、または連結した文字列
-減算
*乗算
/除算
div整数除算整数の商
mod剰余剰余

論理演算

演算子用途戻り値
>より大きいtrue/false
<より小さいtrue/false
<=以下true/false
>=以上true/false
<>等しくないtrue/false
!=等しくないtrue/false
=完全に等しいか判定true/false
=~トピックマッチ判定true/false
and論理積true/false
or論理和true/false

CASE 式

CASE 式は条件付きの処理を行うために使用します。CASE 式は他言語の if-then-else 文に相当します。以下の例で使い方を示します。

sql
SELECT
  CASE WHEN payload.x < 0 THEN 0
       WHEN payload.x > 7 THEN 7
       ELSE payload.x
  END as x
FROM "t/#"

メッセージが以下の場合、

json
{"x": 8}

出力は以下のようになります。

json
{"x": 7}

さらに例

SELECT ステートメントの例

  • トピック "t/a" のメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/a"
  • トピック "t/a" または "t/b" のメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/a","t/b"
  • トピックが 't/#' にマッチするメッセージからすべてのフィールドを抽出する。

    sql
    SELECT * FROM "t/#"
  • トピックが 't/#' にマッチするメッセージから qosusernameclientid フィールドを抽出する(出力メッセージのペイロードにこれらのフィールドが含まれます)。

    sql
    SELECT qos, username, clientid FROM "t/#"
  • ペイロードに username フィールドがあり、その値が 'Steven' のメッセージから username フィールドを抽出する(FROM 句で # を使うのは、すべてのメッセージに対してルールが評価されるため推奨されません)。

    sql
    SELECT username FROM "#" WHERE username='Steven'
  • 入力メッセージのペイロードから x フィールドを抽出し、出力メッセージでは y として名前変更する。WHERE 句でも新しいエイリアス y を使える。ペイロードが {"x": 1} のメッセージにマッチし、{"x": 2} にはマッチしない。

    sql
    SELECT payload.x as x FROM "tests/test_topic_1" WHERE y = 1
  • ペイロードが {"x": {"y": 1}} のメッセージ(例:{"x": {"y": 1}, "other": "field"} も含む)にマッチする。

    sql
    SELECT * FROM "#" WHERE payload.x.y = 1
  • クライアントIDが 'c1' の MQTT クライアントが接続した場合、そのソースIPアドレスとポート番号を抽出する。

    sql
    SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
  • トピックパターン 't/topic' にマッチし、QoS レベルが 1 のすべてのサブスクリプションにマッチし、clientid を出力メッセージに抽出する。

    sql
    SELECT clientid FROM "$events/session_subscribed" WHERE topic = 'my/topic' and qos = 1
  • 上記の例と似ていますが、トピックマッチ演算子 =~ を使ってトピックフィルター 't/#' にマッチさせる。

    sql
    SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
  • キー "foo" のユーザープロパティを抽出する(ユーザープロパティは MQTT 5.0 プロトコルで新規追加されたため、古い MQTT バージョンには該当しません)。

    sql
    SELECT pub_props.'User-Property'.foo as foo FROM "t/#"

TIP

  • FROM 句のトピックはダブルクォーテーション ("") またはシングルクォーテーション ('') で囲む必要があります。
  • WHERE 句の条件で文字列を使う場合はシングルクォーテーション ('') で囲みます。
  • FROM 句に複数のトピックがある場合はカンマ(,)で区切ります。例:SELECT * FROM "t/1", "t/2"
  • ペイロードの内部フィールドにはドット記法 (.) でアクセスできます。例:ネストした JSON 構造の場合、payload.outer_field.inner_field のように指定します。
  • ペイロードにエイリアスを付けるとパフォーマンスに影響するため、SELECT payload as p のような使い方は避けてください。
  • 一部のエスケープシーケンスは使用時にアンエスケープが必要です。詳細は unescape 関数を参照してください。

FOREACH ステートメントの例

クライアントID c_steve のメッセージがトピック t/1 に送られてくるとします。メッセージボディは JSON 形式で、sensors フィールドは複数のオブジェクトを含む配列です。例は以下の通りです。

json
{
    "date": "2020-04-24",
    "sensors": [
        {"name": "a", "idx":0},
        {"name": "b", "idx":1},
        {"name": "c", "idx":2}
    ]
}

例1

sensors 配列の各オブジェクトを、オブジェクトの idx を使ったトピック sensors/${idx} に再パブリッシュし、内容はオブジェクトの name とします。上記の入力例では、ルールエンジンは以下の3つのメッセージを発行します。

  1. トピック: sensors/0
    内容: a
  2. トピック: sensors/1
    内容: b
  3. トピック: sensors/2
    内容: c

このルールのアクション設定は以下の通りです。

  • アクションタイプ: メッセージ再パブリッシュ
  • 送信先トピック: sensors/${idx}
  • 送信先 QoS: 2
  • メッセージ内容テンプレート: ${name}

SQL ステートメントは以下のように記述します。

sql
FOREACH
    payload.sensors
FROM "t/#"

上記の SQL では、FOREACH 句で配列 sensors を指定しています。FOREACH ステートメントは結果配列の各オブジェクトに対して「メッセージ再パブリッシュ」アクションを実行するため、3回実行されます。

例2

sensors 配列のうち、id フィールドの値が1以上のオブジェクトだけを、トピック sensors/${idx} に再パブリッシュし、内容は clientid=${clientid},name=${name},date=${date} とします。上記の入力例では、id が0の要素はフィルタリングされるため、2つのメッセージが発行されます。

  1. トピック: sensors/1
    内容: clientid=c_steve,name=b,date=2023-04-24
  2. トピック: sensors/2
    内容: clientid=c_steve,name=c,date=2023-04-24

このルールのアクション設定は以下の通りです。

  • アクションタイプ: メッセージ再パブリッシュ
  • 送信先トピック: sensors/${idx}
  • 送信先 QoS: 2
  • メッセージ内容テンプレート: clientid=${clientid},name=${name},date=${date}

SQL ステートメントは以下のように記述します。

sql
FOREACH
    payload.sensors
DO
    clientid,
    item.name as name,
    item.idx as idx
INCASE
    item.idx >= 1
FROM "t/#"

上記 SQL では、FOREACH 句で配列 sensors を指定し、DO 句で各操作に必要なフィールドを選択しています。clientid はメッセージのメタデータから、nameidx は現在のセンサーオブジェクトから取得します。itemsensors 配列の現在のオブジェクトを表します。INCASE 句は配列オブジェクトのフィルタ条件を指定し、条件に合わないオブジェクトは無視されます。

DO および INCASE 句では、item を使って現在のオブジェクトにアクセスできますが、FOREACH 句の as 構文で変数名をカスタマイズすることも可能です。したがって、上記の SQL は以下のようにも書けます。

sql
FOREACH
    payload.sensors as s
DO
    clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

例3

例2を拡張し、clientid フィールドの c_stevec_ プレフィックスを削除します。

ルールエンジンには FOREACHDOINCASE 句内で呼び出せる組み込み関数が多数あります。c_stevesteve に変換したい場合、例2の SQL を以下のように変更します。

sql
FOREACH
    payload.sensors as s
DO
    nth(2, tokens(clientid,'_')) as clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

複数の式を FOREACH 句に配置できますが、最後の式は必ず走査する配列を指定する必要があります。

例えば、入力メッセージのペイロードが以下のように構造化されている場合、

json
{
    "date": "2020-04-24",
    "data": {
        "sensors": [
            {"name": "a", "idx":0},
            {"name": "b", "idx":1},
            {"name": "c", "idx":2}
        ]
    }
}

FOREACH 句でペイロードのデータに別名を付けてから配列を選択できます。

sql
FOREACH
    payload.data as d
    d.sensors as s
...

これは以下と同等です。

sql
FOREACH
    payload.data.sensors as s
...

この機能は、複雑な構造のペイロードを扱う際に便利です。