Skip to content

jq 関数

jq は、主に JSON でエンコードされたデータの変換やクエリに特化した強力なコマンドラインツール兼プログラミング言語です。

EMQX のルールでは、SQL ライクなルールを定義してメッセージを処理・ルーティングできます。これらのルールには、ブローカーを通過する JSON ペイロードに対して複雑な変換を行うために jq 関数を含めることが可能です。

jq 関数が初めての方は、リファレンス セクションを参照して素早く学習を始めることができます。

TIP

jq 関数は、ルールの SQL 言語だけでは難しい変換を行う際に便利です。

ただし、効率的なメッセージ処理を維持するために、ルール内での長時間実行される計算は避けることを推奨します。また、バグのある jq プログラムを防ぐために、タイムアウト機能(設定項目 rule_engine.jq_function_default_timeout)の利用を推奨します。

jq 関数

ルールエンジンの SQL における jq 文の基本フォーマットは以下の通りです。

jq('<JQ_program>', '<JSON_input>', <timeout_value>)

ここで、

  1. <JQ_program>:有効な jq プログラムを含む文字列
  2. <JSON_input>:jq プログラムの入力となる JSON エンコードされた文字列またはオブジェクト
  3. <timeout_value>:省略可能な整数型のタイムアウト値(ミリ秒単位)、デフォルトは 10 秒

jq 関数は、指定された入力に対して jq プログラムを実行し、生成されたオブジェクトのリストを返します。実行がタイムアウト前に終了しなかった場合や jq プログラムで例外が発生した場合は、エラーをスローします。

利用例

以下は簡単な jq 関数の呼び出し例とその結果です。

JSON データの操作

この例では、JSON データのアクセス、変換、値の計算など、jq を使った様々な操作方法を示しています。

コード例:

SQL
jq('.', '{"temperature": 10}') =
[json_decode('{"temperature": 10}')]

jq('.', json_decode('{"temperature": 10}')) =
[json_decode('{"temperature": 10}')]

jq('.temperature', '{"temperature": 10}') =
[10]

jq('{temperature_C:.temperature,
     temperature_F: (.temperature * 1.8 + 32)}',
   '{"temperature": 10}') =
[json_decode('{"temperature_C": 10, "temperature_F": 50}')]

jq('.temperature,(.temperature * 1.8 + 32)', '{"temperature": 10}') =
[10, 50]

外れ値を除いた平均値の計算

例えば、以下の JSON オブジェクトは日付と複数のセンサーを含み、それぞれのセンサーには名前と特定の日のセンサー読み取り値のデータポイントの配列があります。

json
{
  "date": "2020-04-24",
  "sensors": [
    {
      "name": "a",
      "data": [3, 1, 2, 4, 5, 5]
    },
    {
      "name": "b",
      "data": [1, -100, 2, 3, 4, 5, 2000]
    },
    {
      "name": "c",
      "data": [3, 7, 9]
    }
  ]
}

jq 関数と FOREACH 文を組み合わせて、jq の出力オブジェクトを複数のメッセージに分割し、それぞれに日付フィールドと外れ値を除いたセンサーのデータフィールドの平均値を含めることができます。

sql
FOREACH   jq('def rem_first:
                 if length > 2 then del(.[0]) else . end;
              def rem_last:
                 if length > 1 then del(.[-1]) else . end;
              .date as $date |
              .sensors[] |
                (.data | sort | rem_first | rem_last | add / length) as $average |
                {$average, $date}',
             payload)
FROM    "jq_demo/complex_rule/jq/#"

すると、3つの出力メッセージは以下のペイロードになります。

メッセージ 1:

json
{
  "average": 3.5,
  "date": "2020-04-24"
}

メッセージ 2:

json
{
  "average": 3,
  "date": "2020-04-24"
}

メッセージ 3:

json
{
  "average": 7,
  "date": "2020-04-24"
}

1つのメッセージを複数のメッセージに分割

この例では、複数のセンサー測定値を含む入力メッセージを処理し、センサータイプごとに別々のメッセージに分割します。動作は以下の通りです。

  • FOREACH は jq 関数を使って、入力メッセージを sensor_typevalue フィールドを持つオブジェクトの配列に変換します。
  • DO 節は出力メッセージに必要なフィールドを選択します。
  • FROM 節は、トピックフィルター car/measurements にマッチするメッセージにルールを適用します。
sql
FOREACH
    ## データは配列である必要があります
    jq('
       [{
         sensor_type: "temperature",
         value: .temperature
        },
        {
         sensor_type: "humidity",
         value: .humidity
        },
        {
         sensor_type: "pressure",
         value: .pressure
        },
        {
         sensor_type: "light",
         value: .light
        },
        {
         sensor_type: "battery",
         value: .battery
        },
        {
         sensor_type: "speed",
         value: .speed
        }]',
        payload) as sensor  
DO
    payload.client_id,
    payload.timestamp,
    sensor.sensor_type,
    sensor.value
FROM "car/measurements"

メッセージ分割の別の方法

この例は、複数のセンサー測定値を含む入力メッセージをセンサータイプごとに別々のメッセージに分割する別のアプローチを示しています。

FOREACH 節内の jq 関数は入力とすべてのセンサータイプを保存し、各センサータイプごとに関連フィールドを持つオブジェクトを出力します。

sql
FOREACH
    jq('
       # 入力を保存
       . as $payload |
       
       # すべてのセンサータイプ
       [ 
         "temperature",
         "humidity",
         "pressure",
         "light",
         "battery",
         "speed" 
       ] as $sensor_types |
       
       # 各センサータイプのオブジェクトを出力
       $sensor_types[] |
       {
         client_id: $payload.client_id,
         timestamp: $payload.timestamp,
         sensor_type: .,
         value: $payload[.] 
       }
       ',
       payload) as sensor  
FROM "car/measurements"

リファレンス

jq 関数が初めての方には、以下の資料をおすすめします。