Rule SQL リファレンス
EMQX のルールでは、データの抽出、フィルタリング、エンリッチメント、および変換のために SQL ベースの構文を使用します。この SQL ライクな構文には、SELECT と FOREACH の2種類のステートメントがあります。
| ステートメント | 説明 |
|---|---|
SELECT | SQL ステートメントの結果が単一のメッセージとなる場合に使用します。 |
FOREACH | 単一の入力メッセージからゼロ個以上のメッセージを生成する場合に使用します。 |
各ルールは正確に1つのステートメントを持つことができます。SQL ステートメントは豊富な組み込み関数を提供しており、簡単な変換やタイムスタンプの作成などが可能です。
また、SQL ステートメントは式の中に jq プログラム を埋め込むことをサポートしており、必要に応じて複雑なデータ変換を行うことができます。式は SELECT および FOREACH ステートメント内に埋め込むことができます。SELECT および FOREACH ステートメントで参照可能なフィールドについては、データソースとフィールドを参照してください。

SELECT ステートメント
SELECT ステートメントは、入力メッセージから特定のフィールドを選択し、フィールド名の変更、データの変換、条件に基づくメッセージのフィルタリングを行います。
ルールエンジンの SQL における SELECT ステートメントの基本形式は以下の通りです。
SELECT <fields_expressions> FROM <topic> [WHERE <conditions>]SELECT 句で出力に含めるフィールド(メッセージのペイロードおよびメタデータの両方)を指定し、WHERE 句で特定の条件に基づいてメッセージをフィルタリングできます。
FROM 句
FROM 句はクエリのデータソースを指定するために使用します。特定のトピックや条件に合致するイベントからデータを選択できます。
トピックによる選択
例えば、トピックパターン t/# と my/other/topic にパブリッシュされたすべてのメッセージに適用されるルールを定義したい場合、以下のように記述します。
SELECT clientid, payload.clientid as myclientid FROM "t/#", "my/other/topic"ここで、
SELECT句は出力に含めるフィールドを指定します。clientid: メタデータ内のクライアントIDpayload.clientid: メッセージペイロード内のクライアントID。ペイロード内のすべてのフィールドはpayloadの下に格納されています。as構文はpayload.clientidフィールドの名前をmyclientidに変更します。
イベントによる選択
ルールをイベントに紐付けることも可能です。例えば、クライアント c1 が EMQX への接続を開始したときに IP アドレスとポート番号を取得したい場合、以下のように記述します。
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'TIP
利用可能なすべてのイベントは EMQX ダッシュボードのルール編集画面の Events タブで確認できます。
WHERE 句
WHERE 句はオプションで、FROM 句で指定したトピックやイベントのフィルタに加えて、メッセージが満たすべき追加条件を指定し、メッセージの絞り込みを行います。
例えば、トピック t/# のメッセージのうち、ユーザー名が eric のものだけをフィルタリングする場合は以下のように記述します。
SELECT * FROM "t/#" WHERE username = 'eric'TIP
WHERE 句で使用するフィールドは、メッセージのメタデータまたはペイロード内に存在するフィールドでなければなりません。存在しない場合はエラーになります。
式の利用
式は SELECT および WHERE 句でデータ変換に使用できます。例えば、以下の SQL 文は clientid フィールドの値を大文字に変換し、サフィックスを追加して、出力メッセージのフィールド名を cid としています。
SELECT (upper(clientid) + '_UPPERCASE_LETTERS') as cid FROM "t/#"次の例は括弧付きの算術式を使ったデータ変換の例です。
SELECT (payload.integer_field + 2) * 2 as num FROM "t/#"複雑な構造を持つペイロードのフィールドにドット記法でアクセスすることも可能です(ペイロードが JSON 形式であることが前提です)。
SELECT payload.a.b.c.deep as my_field FROM "t/#"以下の例は、WHERE 句で等価演算子(=)を使ってフィールドの値を判定する例です。SELECT * はすべてのメタデータとペイロードを出力メッセージに転送します。
SELECT * FROM "t/#" WHERE payload.x.y = 1WHERE 句では and および or 演算子を使って複雑なブール式を作成できます。
SELECT * FROM "t/#" WHERE payload.name = "sensor_1" and payload.temprature > 39FOREACH ステートメント
FOREACH ステートメントは SELECT ステートメントのより一般的な形と考えられます。各入力メッセージに対してゼロ個以上の出力メッセージを生成できます。特定の条件に基づいてデータをフィルタリングし、結果を MQTT トピックやデータブリッジに出力する際に使用します。
ルールエンジン SQL における FOREACH ステートメントの基本形式は以下の通りです。
FOREACH <expression_that_evaluates_to_array> [as <name>]
[DO <fields_expressions>]
[INCASE <condition>]
FROM <topic>
[WHERE <condition>]FOREACH ステートメントは、入力メッセージから配列を作成する FOREACH 句で始まります。FROM および WHERE 句は SELECT ステートメントの同名句と同様の目的で機能します。FOREACH ステートメントには FOREACH、FROM、WHERE 句のほかに、以下の2つのオプション句があります。
| 句 | 必須/任意 | 説明 |
|---|---|---|
DO | 任意 | FOREACH で選択した配列の各要素を変換します。SELECT ステートメントの SELECT 句に対応し、同じ式を受け入れます。 |
INCASE | 任意 | 指定した条件に合致しない配列要素をフィルタリングします。WHERE 句と同じ式を受け入れます。 |
TIP
FOREACH 句以外はすべて SELECT ステートメントの対応する句と同じ意味を持つため、FOREACH ステートメントは SELECT ステートメントの一般化とみなせます。以下の2つのステートメントは等価です(jq('.', payload) はペイロードを配列でラップしています)。
FOREACH jq('.', payload) as it
DO it.field_1, it.field_2
FROM "t/#"SELECT payload.field_1, payload.field_2
FROM "t/#"ネストした FOREACH ループはサポートされていません。この制限を回避するには、異なるレベルやデータセットを処理する2つの Republish アクションを設定し、ネストされた FOREACH ループの問題を回避できます。
FOREACH 句の as 構文は配列要素に名前を付けるために使われ、DO 句内で「現在の」要素を参照しやすくします。as name 部分を省略した場合、デフォルトで item という名前が使われます。
以下は、FOREACH ステートメントで2つの値を出力する例です。両方の値は value という1つのフィールドのみを含み、その値はそれぞれメッセージの field_1 と field_2 の値です。
FOREACH jq('[.field_1, .field_2]', payload)
DO item as value
FROM "t/#"FOREACH ステートメントは入力データが配列形式であることを要求します。入力メッセージがすでに配列を含む場合は、直接 FOREACH ステートメントを適用できます。
例えば、トピック t/# にパブリッシュされたメッセージで、センサーの idx が1以上の場合にタイムスタンプ、クライアントID、センサー名、インデックスを出力したい場合、以下のように記述します。
FOREACH
payload.sensors as sensor
DO
timestamp,
clientid,
upper(sensor.name) as name,
sensor.idx as idx
INCASE
sensor.idx >= 1
FROM "t/#"ここで、
FOREACH句は入力メッセージのペイロード内のsensorsフィールドを配列として指定し、配列要素にsensorという名前を付けています。DO句は出力に含めるフィールドを指定しています。timestampは入力メッセージのメタデータからのタイムスタンプです。clientidは入力メッセージのメタデータからのクライアントIDです。sensor.nameは組み込み関数upperで大文字化され、as nameで名前を変更しています。ここでのsensorはFOREACH句で指定された配列の現在の要素を指します。sensor.idxはas idxで名前を変更しています。
INCASE句は追加のフィルタ条件を指定し、idxフィールドの値が1以上のセンサーのみを対象とします。FROM句はトピックパターンt/#にマッチするメッセージを対象としています。
ルール作成後は、本番環境に投入する前に必ずテストすることを推奨します。ダッシュボードの UI にはサンプルメッセージでルールをテストできる機能があります。SQL ステートメントのテスト方法の詳細は ルールのテスト を参照してください。上記のルールは以下の JSON 形式のペイロードを入力としてテストできます。
{"sensors": [
{"idx":0, "name":"t0"},
{"idx":1, "name":"t1"},
{"idx":2, "name":"t2"}
]
}入力メッセージに配列が含まれていない場合は、jq 関数を使ってペイロードを配列でラップできます。例えば以下のように記述します。
FOREACH jq('.', payload)
DO item.field_1, item.field_2
FROM "t/#"EMQX は高度な変換のために jq 関数の使用をサポートしています。詳細なコード例は 組み込み jq 関数 を参照してください。
式と演算
EMQX のルール構文では、データ変換やメッセージのフィルタリングのために式を使用できます。これらの式は SELECT、FOREACH、DO、INCASE、WHERE などの句で利用可能です。このセクションでは式の使用方法について説明します。以下は式を構成するために使用できる演算子の一覧です。また、組み込み関数も多数利用可能です。
算術演算
| 演算子 | 用途 | 戻り値 |
|---|---|---|
+ | 加算、または文字列の連結 | 合計、または連結された文字列 |
- | 減算 | 差分 |
* | 乗算 | 積 |
/ | 除算 | 商 |
div | 整数除算 | 整数の商 |
mod | 剰余 | 剰余 |
論理演算
| 演算子 | 用途 | 戻り値 |
|---|---|---|
> | より大きい | true/false |
< | より小さい | true/false |
<= | 以下 | true/false |
>= | 以上 | true/false |
<> | 等しくない | true/false |
!= | 等しくない | true/false |
= | 2つのオペランドが完全に等しいかを判定。値の比較に使用可能 | true/false |
=~ | トピックがトピックフィルターにマッチするか判定。トピックマッチング専用 | true/false |
and | 論理積(AND) | true/false |
or | 論理和(OR) | true/false |
CASE 式
CASE 式は条件付きの処理を行うために使用できます。CASE 式は他の言語の if-then-else 文に相当します。以下の例で使い方を示します。
SELECT
CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/#"メッセージが以下の場合、
{"x": 8}出力は以下のようになります。
{"x": 7}さらに例
SELECT ステートメントの例
トピック "t/a" のメッセージからすべてのフィールドを抽出する。
sqlSELECT * FROM "t/a"トピック "t/a" または "t/b" のメッセージからすべてのフィールドを抽出する。
sqlSELECT * FROM "t/a","t/b"トピックが 't/#' にマッチするメッセージからすべてのフィールドを抽出する。
sqlSELECT * FROM "t/#"トピックが 't/#' にマッチするメッセージから
qos、username、clientidフィールドを抽出する(出力メッセージのペイロードにこれらのフィールドが含まれます)。sqlSELECT qos, username, clientid FROM "t/#"ペイロードに
usernameフィールドがあり、その値が 'Steven' のメッセージからusernameフィールドを抽出する(FROM句で#を使うのは全メッセージを対象にルールを評価するため推奨されません)。sqlSELECT username FROM "#" WHERE username='Steven'入力メッセージのペイロードから
xフィールドを抽出し、出力メッセージでyに名前を変更する。yはWHERE句でも使用可能。この SQL 文はペイロードが{"x": 1}のメッセージにマッチし、{"x": 2}にはマッチしません。sqlSELECT payload.x as x FROM "tests/test_topic_1" WHERE y = 1ペイロードが
{"x": {"y": 1}}のメッセージ(例:{"x": {"y": 1}, "other": "field"}も含む)にマッチする。sqlSELECT * FROM "#" WHERE payload.x.y = 1クライアントIDが 'c1' の MQTT クライアントが接続した場合、そのソース IP アドレスとポート番号を抽出する。
sqlSELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'トピックが 't/topic' にマッチし、QoS レベルが 1 のすべてのサブスクリプションにマッチし、
clientidを抽出する。sqlSELECT clientid FROM "$events/session_subscribed" WHERE topic = 'my/topic' and qos = 1上記の例と似ていますが、トピックマッチ演算子
=~を使ってトピックフィルター 't/#' にマッチさせる。sqlSELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1キーが "foo" のユーザープロパティを抽出する(ユーザープロパティは MQTT 5.0 の新機能で、古い MQTT バージョンには該当しません)。
sqlSELECT pub_props.'User-Property'.foo as foo FROM "t/#"
TIP
FROM句のトピックはダブルクォーテーション("")またはシングルクォーテーション('')で囲む必要があります。WHERE句の条件で文字列を使う場合はシングルクォーテーション('')で囲みます。FROM句に複数のトピックがある場合はカンマ(,)で区切ります。例:SELECT * FROM "t/1", "t/2"。- ペイロードの内部フィールドにはドット記法(
.)でアクセス可能です。例:ネストされた JSON ならpayload.outer_field.inner_fieldのように指定します。 - ペイロードに対してエイリアスを作成するのはパフォーマンスに影響するため避けてください。例:
SELECT payload as pは推奨されません。 - 一部のエスケープシーケンスは使用時にアンエスケープが必要です。詳細は unescape 関数 を参照してください。
FOREACH ステートメントの例
クライアントID c_steve のメッセージがトピック t/1 に届き、メッセージ本文は JSON 形式で、sensors フィールドが複数のオブジェクトを含む配列であるとします。例:
{
"date": "2020-04-24",
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}例1
sensors 配列内の各オブジェクトを、オブジェクトの idx を使ったトピック sensors/${idx} に、内容を name として再パブリッシュしたい場合。上記の入力例では、ルールエンジンは以下の3つのメッセージを発行します。
- トピック: sensors/0
内容: a - トピック: sensors/1
内容: b - トピック: sensors/2
内容: c
このルールのアクション設定は以下の通りです。
- アクションタイプ: メッセージ再パブリッシュ
- ターゲットトピック:
sensors/${idx} - ターゲット QoS: 2
- メッセージ内容テンプレート:
${name}
SQL ステートメントは以下の通りです。
FOREACH
payload.sensors
FROM "t/#"この SQL 文の FOREACH 句は配列 sensors を指定しています。FOREACH ステートメントは結果配列の各オブジェクトに対して「メッセージ再パブリッシュ」アクションを実行するため、3回の再パブリッシュが行われます。
例2
sensors 配列のうち、idx フィールドの値が1以上のオブジェクトだけを対象に、トピック sensors/${idx} に以下の内容で再パブリッシュしたい場合。
内容: clientid=${clientid},name=${name},date=${date}
上記の入力例では、idx が0の要素は除外されるため、2つのメッセージが発行されます。
- トピック: sensors/1
内容: clientid=c_steve,name=b,date=2023-04-24 - トピック: sensors/2
内容: clientid=c_steve,name=c,date=2023-04-24
このルールのアクション設定は以下の通りです。
- アクションタイプ: メッセージ再パブリッシュ
- ターゲットトピック:
sensors/${idx} - ターゲット QoS: 2
- メッセージ内容テンプレート:
clientid=${clientid},name=${name},date=${date}
SQL ステートメントは以下の通りです。
FOREACH
payload.sensors
DO
clientid,
item.name as name,
item.idx as idx
INCASE
item.idx >= 1
FROM "t/#"この SQL 文の説明:
FOREACH句は配列sensorsを指定しています。DO句は各要素に必要なフィールドを選択しています。clientidはメッセージのメタデータから、nameとidxは現在のセンサーオブジェクトから取得します。itemはsensors配列の現在の要素を表します。INCASE句は配列要素のフィルタ条件を指定し、条件に合わない要素は無視されます。
DO および INCASE 句では、現在のオブジェクトを item で参照できますが、FOREACH 句の as 構文で変数名をカスタマイズすることもできます。したがって、上記の SQL は以下のようにも書けます。
FOREACH
payload.sensors as s
DO
clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"例3
例2を拡張し、clientid フィールドの c_steve から c_ プレフィックスを取り除きたい場合。
ルールエンジンには FOREACH、DO、INCASE 句内で呼び出せる組み込み関数が多数あります。c_steve を steve に変換するには、例2の SQL を以下のように変更します。
FOREACH
payload.sensors as s
DO
nth(2, tokens(clientid,'_')) as clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"複数の式を FOREACH 句に記述できますが、最後の式は配列を指定する必要があります。
例えば、入力メッセージのペイロードが以下のように構造化されている場合:
{
"date": "2020-04-24",
"data": {
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
}FOREACH 句でペイロードの data に別名を付けてから配列を選択できます。
FOREACH
payload.data as d
d.sensors as s
...これは以下と同等です。
FOREACH
payload.data.sensors as s
...この機能は複雑な構造のペイロードを扱う際に便利です。