Skip to content

データソースとフィールド

EMQXのルールは、MQTTメッセージMQTTイベント、またはデータブリッジなど、さまざまなデータソースからデータを処理できます。

ルールエンジン構文のセクションで説明したように、FROM句を使用してデータソースを指定し、対応するフィールドをSELECT句やWHERE句で参照できます。本セクションでは、MQTTメッセージMQTTイベント、およびデータブリッジのフィールドについて紹介します。

MQTTメッセージ

EMQXルールを使用してメッセージのパブリッシュを処理する場合、FROM句でメッセージのトピックを指定する必要があります。

例えば、以下のステートメントでは、トピックパターンt/#にパブリッシュされたメッセージのpayload.msgAS句でmsgにリネーム)、clientidusernamepayloadtopicqosのフィールドを選択しています。

例:

sql
SELECT
  payload.msg as msg,
  clientid,
  username,
  payload,
  topic,
  qos
FROM
  "t/#"

出力例:

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "payload": "{\"msg\":\"hello\"}",
  "msg": "hello",
  "clientid": "c_emqx"
}

受信したMQTTメッセージから選択可能なフィールドは以下の通りです:

フィールド説明
idMQTTメッセージID
clientidパブリッシャーのクライアントID
usernameパブリッシャーのユーザー名
payloadMQTTペイロード
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
flagsフラグ
headersメッセージ処理に関連する内部データ
pub_propsPUBLISHプロパティ(MQTT 5.0クライアントのみ)
timestampタイムスタンプ(単位:ミリ秒)
publish_received_atPUBLISHメッセージがEMQXに到達した時間(単位:ミリ秒)
nodeイベントが発生したノード
client_attrsクライアント属性

MQTTイベント

EMQXルールを使用してイベントトピックからデータを抽出し、クライアントのオンライン・オフライン、サブスクリプションなどのイベント通知を取得できます。イベントトピックは"$events/"で始まり、例えば"$events/client/connected"などがあり、ルールのFROM句で指定可能です。

TIP

デフォルトでは、クライアントはMQTTイベントメッセージを直接サブスクライブできません。このセクションではルールを使ってこれらのメッセージをサブスクライブする方法を説明します。MQTTイベントメッセージのデータは、システムトピックをサブスクライブすることでも取得可能です。

以下はサポートされているイベントトピックの一覧です。

イベントトピック一覧

イベントトピック名説明
$events/message/deliveredメッセージ配信
$events/message/ackedメッセージ受領確認
$events/message/droppedルーティング時のメッセージ破棄
$events/message/delivery_dropped配信時のメッセージ破棄
$events/client/connected接続完了
$events/client/disconnected切断
$events/client/connack接続応答
$events/auth/check_authz_complete認可チェック完了
$events/auth/check_authn_complete認証チェック完了
$events/session/subscribedサブスクライブ成功
$events/session/unsubscribedサブスクライブ解除
$events/sys/alarm_activatedシステムアラーム発動
$events/sys/alarm_deactivatedシステムアラーム解除

メッセージ配信イベント ("$events/message_delivered")

EMQX 5.10.0以降、イベントトピックにネームスペースが導入され、論理的かつ階層的な構造に再編成されました。この改善により、イベントトピックの明確化、フィルタリング、管理が向上しています。

後方互換性のため、旧イベントトピックも引き続きサポートされていますが、新規設定ではネームスペース付きの新しいトピックの使用を推奨します。以下の表は旧トピックと新トピックの対応表です。

旧イベントトピック新イベントトピック
$events/client_connected$events/client/connected
$events/client_disconnected$events/client/disconnected
$events/client_connack$events/client/connack
$events/client_check_authz_complete$events/auth/check_authz_complete
$events/client_check_authn_complete$events/auth/check_authn_complete
$events/session_subscribed$events/session/subscribed
$events/session_unsubscribed$events/session/unsubscribed
$events/message_delivered$events/message/delivered
$events/message_acked$events/message/acked
$events/message_dropped$events/message/dropped
$events/delivery_dropped$events/message/delivery_dropped
$events/message_transformation_failed$events/message_transformation/failed
$events/schema_validation_failed$events/schema_validation/failed

:::

メッセージ配信完了イベント ("$events/message/delivered")

このイベントトピックは、メッセージがクライアントに配信された際にルールをトリガーできます。

例えば、"$events/message/delivered"イベントトピックから、パブリッシャーのIDとユーザー名、メッセージトピック、メッセージのQoS、イベントが発生したEMQXノード、イベント発生時刻のフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message/delivered"

出力例:

json
{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

各フィールドの詳細は以下の通りです。

フィールド説明
idMQTTメッセージID
from_clientidパブリッシャーのクライアントID
from_usernameパブリッシャーのユーザー名
clientidサブスクライバーのクライアントID
usernameサブスクライバーのユーザー名
payloadMQTTペイロード
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
flagsフラグ
pub_propsPUBLISHプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
publish_received_atPUBLISHメッセージがEMQXに到達した時間(単位:ミリ秒)
nodeイベントが発生したEMQXノード

メッセージ受領確認イベント ("$events/message/acked")

このイベントトピックは、メッセージ配信がアック(ACK)された際にルールをトリガーできます。

TIP

QoS 1およびQoS 2のメッセージのみ対応しています。

例えば、"$events/message/acked"イベントトピックから、パブリッシャーのIDとユーザー名、メッセージトピック、メッセージのQoS、イベントが発生したEMQXノード、イベント発生時刻のフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message/acked"

出力例:

json
{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

各フィールドの詳細は以下の通りです。

フィールド説明
idMQTTメッセージID
from_clientidパブリッシャーのクライアントID
from_usernameパブリッシャーのユーザー名
clientidサブスクライバーのクライアントID
usernameサブスクライバーのユーザー名
payloadMQTTペイロード
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
flagsフラグ
pub_propsPUBLISHプロパティ(MQTT 5.0のみ)
puback_propsPUBACKプロパティ(MQTT 5.0のみ)
timestampイベント発生時刻(単位:ミリ秒)
publish_received_atPUBLISHメッセージがEMQXに到達した時間(単位:ミリ秒)
nodeイベントが発生したEMQXノード

ルーティング時のメッセージ破棄イベント ("$events/message_dropped")

このイベントトピックは、メッセージがルーティング中に破棄された際にルールをトリガーできます。

例えば、"$events/message/dropped"イベントトピックから、破棄理由、メッセージトピック、メッセージQoS、イベントが発生したEMQXノード、イベント発生時刻のフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message/dropped"

出力例:

json
{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}
フィールド説明
idMQTTメッセージID
reason破棄理由:

no_subscribers: トピックにサブスクライブしているクライアントがいない

receive_maximum_exceeded: awaiting_relキューが満杯

packet_identifier_inuse: 未解放のパケットIDを持つQoS 2メッセージを受信した
clientidパブリッシャーのクライアントID
usernameパブリッシャーのユーザー名
payloadMQTTペイロード
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
flagsフラグ
pub_propsPUBLISHプロパティ(MQTT 5.0のみ)
timestampイベント発生時刻(単位:ミリ秒)
publish_received_atPUBLISHメッセージがEMQXに到達した時間(単位:ミリ秒)
nodeイベントが発生したノード

配信時のメッセージ破棄イベント ("$events/delivery_dropped")

このイベントトピックは、メッセージが配信中に破棄された際にルールをトリガーできます。

例えば、"$events/message/delivery_dropped"イベントトピックから、パブリッシャーのIDとユーザー名、破棄理由、メッセージトピック、QoSのフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/message/delivery_dropped"

出力例:

json
{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}

各フィールドの詳細は以下の通りです。

フィールド説明
idMQTTメッセージID
reason破棄理由:

queue_full: QoS>0のメッセージキューが満杯

no_local: クライアントが自身のパブリッシュしたメッセージを受信しない設定

expired: メッセージまたはセッションの有効期限切れ

qos0_msg: QoS 0メッセージキューが満杯
from_clientidパブリッシャーのクライアントID
from_usernameパブリッシャーのユーザー名
clientidサブスクライバーのクライアントID
usernameサブスクライバーのユーザー名
payloadMQTTペイロード
peerhostクライアントのIPアドレス
topicMQTTトピック
qosメッセージのQoS
flagsフラグ
pub_propsPUBLISHプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
publish_received_atPUBLISHメッセージがEMQXに到達した時間(単位:ミリ秒)
nodeイベントが発生したEMQXノード

接続完了イベント ("$events/client/connected")

このイベントトピックは、クライアントが正常に接続した際にルールをトリガーできます。

例えば、"$events/client/connected"イベントトピックから、クライアントID、ユーザー名、キープアライブ間隔、接続したMQTTクライアントがブリッジとして動作しているかどうかのフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client/connected"

出力例:

json
{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}

以下の表は、受信したMQTTメッセージから選択可能なフィールドを示しています。

フィールド説明
clientidクライアントID
usernameクライアントのユーザー名
mountpointブリッジメッセージのマウントポイント
peernameクライアントのIPアドレスとポート
socknameEMQXがリッスンしているIPアドレスとポート
proto_nameプロトコル名
proto_verプロトコルバージョン
keepaliveMQTTキープアライブ間隔
clean_startMQTTのclean_startフラグ
expiry_intervalMQTTセッションの有効期限
is_bridgeクライアントがブリッジとして動作しているかどうか
connected_atクライアント接続完了時刻(単位:ミリ秒)
conn_propsCONNECTプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
nodeイベントが発生したEMQXノード
client_attrsクライアント属性

切断イベント ("$events/client/disconnected")

このイベントトピックは、クライアントが切断された際にルールをトリガーできます。

例えば、"$events/client/disconnected"イベントトピックから、クライアントID、ユーザー名、切断理由、接続時間、切断時間、イベントが発生したEMQXノードのフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  clientid,
  username,
  reason,
  connected_at,
  disconnected_at,
  node
FROM
  "$events/client/disconnected"

出力例:

json
{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578036,
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}
フィールド説明
reason切断理由:

normal: クライアントが意図的に切断

kicked: REST API経由でEMQXが強制切断

keepalive_timeout: 指定されたキープアライブ時間切れ

not_authorized: 認可失敗

tcp_closed: ピアがネットワーク接続を閉じた

discarded: clean_starttrueの別クライアントが同じClientIDで接続し、前の接続を切断

takenover: clean_startfalseの別クライアントが同じClientIDで接続し、前の接続を引き継ぎ

internal_error: 不正なメッセージ形式やその他不明なエラー発生
clientidクライアントID
usernameクライアントのユーザー名
peernameIPアドレスとポート番号
socknameEMQXがリッスンしているIPアドレスとポート
connected_atクライアント接続開始時刻(単位:ミリ秒)。このタイムスタンプは現在のセッションが確立された時刻を示し、切断イベントがどの接続セッションに属するか識別するために使用されます。遅延した切断イベントが新しい接続状態を上書きしないようにします。
disconnected_atクライアント切断完了時刻(単位:ミリ秒)
disconn_propsDISCONNECTプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
nodeイベントが発生したEMQXノード
client_attrsクライアント属性

接続応答イベント ("$events/client/connack")

このイベントトピックは、EMQXがクライアントにCONNACKパケットを送信した際にルールをトリガーできます。

例:

sql
SELECT
  clientid,
  username,
  reason_code,
  node
FROM
  "$events/client/connack"

出力例:

json
{
  "username": "u_emqx",
  "reason_code": "success",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578536,
  "clientid": "c_emqx"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
reason_code理由コード*
clientidパブリッシャーのクライアントID
usernameパブリッシャーのユーザー名
peernameIPアドレスとポート
socknameEMQXがリッスンしているIPアドレスとポート
proto_nameプロトコル名
proto_verプロトコルバージョン
keepaliveMQTTキープアライブ間隔
clean_startMQTTのclean_startフラグ
expiry_intervalMQTTセッションの有効期限
conn_propsCONNECTプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
nodeイベントが発生したEMQXノード

[^*]: MQTT v5.0プロトコルでは、リターンコードが理由コードに名称変更され、より多くのエラータイプを示す理由コードが追加されています(Reason code and ACK - MQTT 5.0 new features)。

以下はMQTT v3.1.1とMQTT v5.0の理由コード一覧です。

認可チェック完了イベント ("$events/auth/check_authz_complete")

このイベントトピックは、クライアントの認可チェックが完了した際にルールをトリガーできます。

例:

sql
SELECT
  clientid,
  username,
  topic,
  action,
  result,
  authz_source,
  node
FROM
  "$events/auth/check_authz_complete"

出力例:

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "action": "publish",
  "result": "allow",
  "authz_source": "cache",
  "node": "emqx@127.0.0.1",
  "clientid": "c_emqx"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
clientidクライアントID
usernameユーザー名
peerhostクライアントのIPアドレス
topicMQTTトピック
actionパブリッシュまたはサブスクライブのアクション
resultアクセス制御チェックの結果
authz_source認可のソース
timestampタイムスタンプ(単位:ミリ秒)
nodeイベントが発生したEMQXノード
client_attrsクライアント属性

認証チェック完了イベント ("$events/auth/check_authn_complete")

このイベントトピックは、クライアントの認証チェックが完了した際にルールをトリガーできます。

例:

sql
SELECT
  clientid,
  username,
  reason_code,
  is_superuser,
  is_anonymous
FROM
  "$events/auth/check_authn_complete"

出力例:

json
{
  "clientid": "c_emqx",
  "username": "u_emqx",
  "reason_code": "success",
  "is_superuser": true,
  "is_anonymous": false
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
clientidクライアントID
usernameユーザー名
peernameクライアントのIPアドレス
reason_code認証結果
is_superuserこのクライアントがスーパーユーザーかどうか
is_anonymousこのクライアントが匿名ユーザーかどうか
client_attrsクライアント属性

サブスクライバーイベント ("$events/session_subscribed")

このイベントトピックは、クライアントが正常にサブスクライブした際にルールをトリガーできます。

例:

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session/subscribed"

出力例:

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
clientidクライアントID
usernameクライアントのユーザー名
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
sub_propsSUBSCRIBEプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
nodeイベントが発生したEMQXノード
client_attrsクライアント属性

サブスクリプション解除イベント ("$events/session_unsubscribed")

このルールは、クライアントのサブスクリプション解除が正常に完了した際にトリガーされます。

例:

sql
SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session/unsubscribed"

出力例:

json
{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
clientidクライアントID
usernameクライアントのユーザー名
peerhostクライアントのIPアドレス
topicMQTTトピック
qosQoSレベル
unsub_propsUNSUBSCRIBEプロパティ(MQTT 5.0クライアントのみ)
timestampイベント発生時刻(単位:ミリ秒)
nodeイベントが発生したEMQXノード
client_attrsクライアント属性

システムアラーム発動イベント ("$events/sys/alarm_activated")

このイベントトピックは、EMQXのシステムアラームが発動した際にルールをトリガーできます。

例えば、"$events/sys/alarm_activated"イベントトピックから、アラーム名、詳細、説明メッセージ、発動時刻のフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  name,
  details,
  message,
  activated_at,
  node
FROM
  "$events/sys/alarm_activated"

出力例:

json
{
  "name": "too_many_processes",
  "details": {
    "usage": "99%",
    "high_watermark": "80%"
  },
  "message": "99% process usage",
  "activated_at": 1645003578536000,
  "node": "emqx@127.0.0.1"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
nameアラームの短い識別子(例:"too_many_processes"
detailsアラームに関する追加詳細を含むJSONオブジェクト(スキーマは固定ではない)(例:{"usage": "99%", "high_watermark": "80%"}
messageアラームの説明メッセージ(例:"99% process usage"
activated_atアラーム発動時のUnixタイムスタンプ(マイクロ秒単位)
nodeイベントが発生したEMQXノード

システムアラーム解除イベント ("$events/sys/alarm_deactivated")

このルールは、EMQXのシステムアラームが解除された際にトリガーされます。

例えば、"$events/sys/alarm_deactivated"イベントトピックから、アラーム名、詳細、説明メッセージ、発動時刻、解除時刻のフィールドを抽出するには、以下のステートメントを使用します。

例:

sql
SELECT
  name,
  details,
  message,
  activated_at,
  deactivated_at,
  node
FROM
  "$events/sys/alarm_deactivated"

出力例:

json
{
  "name": "too_many_processes",
  "details": {
    "usage": "99%",
    "high_watermark": "80%"
  },
  "message": "99% process usage",
  "activated_at": 1645003578536000,
  "deactivated_at": 1645004000000000,
  "node": "emqx@127.0.0.1"
}

以下の表は抽出可能なフィールドを示しています。

フィールド説明
nameアラームの短い識別子(例:"too_many_processes"
detailsアラームに関する追加詳細を含むJSONオブジェクト(スキーマは固定ではない)(例:{"usage": "99%", "high_watermark": "80%"}
messageアラームの説明メッセージ(例:"99% process usage"
activated_atアラーム発動時のUnixタイムスタンプ(マイクロ秒単位)
deactivated_atアラーム解除時のUnixタイムスタンプ(マイクロ秒単位)
nodeイベントが発生したEMQXノード

データブリッジ

ルールは、データブリッジによってトリガーされたメッセージやイベントを、$bridges/で始まるトピックで扱います。フォーマットは以下の通りです。

$bridges/<type>:<name>

ここで、

  • <type>:<name>はブリッジID、
  • <type>はブリッジの種類、
  • <name>はブリッジ名です。

例えば、MQTTブリッジのイベントは"$bridges/mqtt:*"の形式で参照できます。MQTTデータブリッジmy_mqtt_bridgeが送信するすべてのメッセージに対してルールを設定するには、以下のステートメントを使用します。

例:

sql
SELECT
  *
FROM
  "$bridges/mqtt:my_mqtt_bridge"

出力例:

json
{
  "id": "0005E27C1D24E44FF440000017520000",
  "server": "broker.emqx.io:1883",
  "payload": "hello",
  "topic": "t/a",
  "qos": 1,
  "dup": false,
  "retain": false,
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "message_received_at": 1645002753259
}

返される各フィールドの説明は以下の通りです。

フィールド説明
idMQTTメッセージID
serverリモートMQTTブローカーのサーバー名(例:"broker.emqx.io:1883")
payloadMQTTペイロード
topicMQTTトピック
qosMQTTのQoS
dupMQTTのDUPフラグ
retainMQTTのRetainフラグ
pub_propsPUBLISHプロパティ(MQTT 5.0クライアントのみ)
message_received_atメッセージ受信時のタイムスタンプ(単位:ミリ秒)