Sparkplug B
Sparkplug 是由 Eclipse Foundation 的 TAHU 项目开发的开源规范,旨在为 MQTT 提供一套明确定义的 payload 和状态管理体系。其主要目标是在工业物联网领域实现互操作性和一致性。
Sparkplug B 定义了用于监控控制和数据采集(SCADA)系统、实时控制系统和设备的 MQTT 命名空间。它通过封装结构化数据格式,包括指标、过程变量和设备状态信息,确保了标准化的数据传输,使其呈现为简洁易处理的格式。通过使用Sparkplug B,组织可以提高运营效率,避免数据孤岛,并在 MQTT 网络中实现设备间的无缝通信。
本页面将为您介绍 Sparkplug B 在 EMQX 中的实现,包括数据格式、功能和实用示例。
Sparkplug B 数据格式
Sparkplug B 采用明确定义的 payload 结构来标准化数据通信。它的核心是使用 Protocol Buffers(Protobuf)对 Sparkplug 消息进行结构化,从而实现轻量、高效和灵活的数据交换。
EMQX 通过 Schema Registry 功能提供对 Sparkplug B 的高级支持。使用 Schema Registry,您可以为多种数据格式(包括 Sparkplug B)创建自定义编码器和解码器。通过在 registry 中定义适当的 Sparkplug B schema,您可以在 EMQX 的规则引擎中使用 schema_decode 和 schema_encode 函数访问和处理符合指定格式的数据。
此外,EMQX 还提供对于 Sparkplug B 的内置支持,无需为该特定格式使用 schema registry。在 EMQX 中,spb_encode 和 spb_decode 函数已经可以直接使用,简化了在规则引擎内进行 Sparkplug B 消息的编码和解码。
TIP
先前的 sparkplug_encode 和 sparkplug_decode 函数已被弃用,因为它们对 bytes_value 的处理方式与 Sparkplug 规范不兼容。 请改用更新后的 spb_encode 和 spb_decode 函数。
Sparkplug B 函数
EMQX 提供了两个规则引擎 SQL 函数用于编码和解码 Sparkplug B 数据:spb_encode 和 spb_decode。实用示例部分将帮助您了解如何在不同场景中使用这些函数。
由于规则引擎及其 jq 函数的灵活性,Sparkplug B 编码和解码函数可以执行各种任务。要了解更多有关规则引擎及其 jq 函数的信息,请参阅以下页面:
spb_decode
spb_decode 函数用于解码用 Sparkplug B 格式编码的消息,例如,如果您希望根据 Sparkplug B 格式编码的消息内容将其转发到特定主题,或者以某种方式更改 Sparkplug B 编码的消息,该函数可以将原始 Sparkplug B 编码 payload 转换为更易于处理和分析的用户友好格式。
使用示例:
select
spb_decode(payload) as decoded
from t上面的示例中,payload 指的是要解码的原始 Sparkplug B 消息。
Sparkplug B Protobuf schema 可以进一步揭示消息结构的详细信息。
spb_encode
spb_encode 函数用于将消息编码为 Sparkplug B 数据格式。这在您需要向 MQTT 客户端或系统的其他组件发送 Sparkplug B 格式消息时特别有用。
使用示例:
select
spb_encode(json_decode(payload)) as encoded
from t上面的示例中,payload 指的是要编码为 Sparkplug B 格式的消息数据。
Sparkplug B Alias 映射
Sparkplug B 规范允许设备在上线(发送 NBIRTH / DBIRTH 消息)时为每个指标分配一个数字形式的 alias,并在后续数据上报(发送 NDATA / DDATA 消息)时仅发送 alias 而不再发送完整的指标名称(name),以减少带宽占用。这种机制要求接收方能够维护 Sparkplug B 的会话状态,以便将 alias 还原为对应的 metric 名称。
在实际使用中,EMQX 经常被用作 Sparkplug B 数据的转换与分发中心,并通过规则引擎将数据转发给非 Sparkplug B 客户端(普通 MQTT 客户端、数据平台等)。这些下游系统通常不具备 Sparkplug B 客户端的状态管理能力,因此仅包含 alias 的数据消息难以直接使用。
为了解决这一问题,EMQX 自 6.0.2 起对 spb_decode 进行了增强,支持 Sparkplug B alias 映射,使解码结果更易于下游系统使用。
Sparkplug B Alias 映射工作机制
当启用 alias 映射后,EMQX 按以下流程处理 Sparkplug B 消息:
解析 NBIRTH / DBIRTH
当客户端发布 NBIRTH 或 DBIRTH 消息时,EMQX 会解析其中的 metrics,并记录同时包含
name和alias的 metric 映射关系。按会话维护映射
alias 映射与 MQTT 客户端会话关联,并按 Sparkplug B 语义进行隔离:
- 节点(NBIRTH / NDATA)与设备(DBIRTH / DDATA)各自维护独立映射。
- 不同客户端之间的映射互不影响。
增强 spb_decode
当规则引擎对 NDATA / DDATA 消息调用
spb_decode时,如果 metric 中仅包含alias而不包含name,EMQX 会根据已记录的映射自动补充对应的name字段。解码后的数据始终包含清晰的 metric 名称,便于规则处理和数据转发。会话结束即清理
当客户端断开连接后,其对应的 alias 映射会被清理。EMQX 不会在会话结束后继续保留或恢复 Sparkplug B 的状态。
配置 Alias 映射
Alias 映射功能默认启用。如果您不希望 EMQX 跟踪并还原 Sparkplug B 指标的 alias,可以在配置文件中将其关闭:
schema_registry {
sparkplugb {
enable_alias_mapping = false
}
}注意:
- 只有在 alias mapping 启用期间接收到的 NBIRTH / DBIRTH 消息,才会用于创建 alias 映射。
- 如果客户端已经发送过 birth 消息,则需要先重新连接,并再次发布 NBIRTH / DBIRTH 消息,alias mapping 才会生效。
Alias 映射使用示例
本节使用 EMQX Dashboard 和 MQTTX 演示如何在启用 Sparkplug B alias 映射后,将仅包含 alias 的 DDATA 消息转换为包含完整 metric name 的 JSON 数据,并转发给非 Sparkplug B 客户端。
目标
- Sparkplug B 设备:在 DBIRTH 中声明
name + alias,在 DDATA 中只发送alias。 - EMQX:使用
spb_decode自动补全 metric name。 - 下游订阅者:接收到普通 JSON 消息,无需理解 Sparkplug B 协议。
前置条件
- EMQX 版本为 6.0.2 及以上并且已启用 Sparkplug B alias 映射:
enable_alias_mapping = true - MQTTX
Step 1:在 EMQX Dashboard 创建规则
点击 EMQX Dashboard 左侧菜单中的集成 -> 规则。
点击 + 创建以创建新规则。
配置 SQL。在 SQL 编辑器中输入以下内容:
sqlSELECT spb_decode(payload) AS decoded FROM "spBv1.0/+/DDATA/+/+"说明:
- 该规则匹配所有 Sparkplug B DDATA 消息。
spb_decode(payload):解码 Sparkplug B payload。在 alias mapping 启用的情况下,自动将alias还原为对应的name。
点击 + 添加动作,为规则添加触发的动作。
选择消息重发布作为动作类型。
填写以下配置:
主题:
decoded/sparkplug/dataPayload:
${decoded}
点击添加。
在创建规则页面点击保存。

Step 2:使用 MQTTX 准备订阅者
- 打开 MQTTX,新建连接并连接到 EMQX Broker。
- 订阅解码后的数据主题。在 MQTTX 中添加订阅主题:
decoded/sparkplug/data。
此订阅代表一个非 Sparkplug B 客户端,只期望接收普通 JSON 数据。
Step 3:使用 MQTTX 模拟 Sparkplug B 设备
以下示例中,payload 以逻辑 JSON 展示,实际发送时需使用 Sparkplug B protobuf 编码(Base64)。
发送 DBIRTH(声明 alias)主题:
spBv1.0/group1/DBIRTH/eon1/device1。逻辑 Payload(示意)
json{ "metrics": [ { "name": "Device/Temperature", "alias": 0, "datatype": 9, "value": 72.5 }, { "name": "Device/Pressure", "alias": 1, "datatype": 9, "value": 101.3 } ] }说明:
- 在 Sparkplug B 中,
datatype被定义为一个无符号整数。根据 Sparkplug B 规范,数值9表示 Float 数据类型。 - EMQX 会在此时记录 alias -> name 映射。
- 该步骤必须先于 DDATA 执行。
- 在 Sparkplug B 中,
发送 DDATA(仅 alias)主题:
spBv1.0/group1/DDATA/eon1/device1。逻辑 Payload(示意):
json{ "metrics": [ { "alias": 0, "value": 73.1 }, { "alias": 1, "value": 100.9 } ] }
Step 4:验证解码结果
在 MQTTX 中,你会在订阅的主题 decoded/sparkplug/data 下收到如下 JSON 消息:
{
"metrics": [
{
"alias": 0,
"name": "Device/Temperature",
"value": 73.1
},
{
"alias": 1,
"name": "Device/Pressure",
"value": 100.9
}
]
}可以看到:
- 原始 DDATA 中没有
name字段。 spb_decode自动补全了:"Device/Temperature""Device/Pressure"
- 下游订阅者不需要维护 Sparkplug B 状态,也不需要解析
alias。
实用示例
本节提供使用 spb_decode 和 spb_encode 函数处理 Sparkplug B 消息的实用示例。请注意,所给示例仅展示了一小部分可以执行的操作,并非全部。
试想您有一个 Sparkplug B 编码的消息,具有以下结构:
{
"timestamp": 1678094561521,
"seq": 88,
"metrics": [
{
"timestamp": 1678094561525,
"name": "counter_group1/counter1_1sec",
"int_value": 424,
"datatype": 2
},
{
"timestamp": 1678094561525,
"name": "counter_group1/counter1_5sec",
"int_value": 84,
"datatype": 2
},
{
"timestamp": 1678094561525,
"name": "counter_group1/counter1_10sec",
"int_value": 42,
"datatype": 2
},
{
"timestamp": 1678094561525,
"name": "counter_group1/counter1_run",
"int_value": 1,
"datatype": 5
},
{
"timestamp": 1678094561525,
"name": "counter_group1/counter1_reset",
"int_value": 0,
"datatype": 5
}
]
}提取数据
假设您从一台设备获取了发送到主题 my/sparkplug/topic 上的消息,并希望仅将 counter_group1/counter1_run 指标转发到另一个名为 interesting_counters/counter1_run_updates 的主题,且以 JSON 格式发送。以下步骤演示了如何通过在 EMQX Dashboard 中创建规则并使用 MQTTX 客户端工具测试规则来实现此任务。
在 Dashboard 中创建规则
打开EMQX Dashboard。从左侧导航菜单中点击集成 -> 规则。点击 + 创建进入创建规则页面。
在 SQL 编辑器中输入以下 SQL 语句:
sqlFOREACH jq(' .metrics[] | select(.name == "counter_group1/counter1_run") ', spb_decode(payload)) AS item DO item FROM "my/sparkplug/topic"这里,
jq函数用于遍历指标数组并过滤出名为"counter_group1/counter1_run"的指标。TIP
Sparkplug B 规范建议仅在数据发生更改时发送数据,使 payload 仅呈现指标的子集。如果数组中没有指定名称的条目,则此规则不会输出任何内容。
在页面右侧点击添加动作。从动作下拉列表中选择
消息重发布。将interesting_counters/counter1_run_updates作为重新发布主题输入,并在 Payload 字段中输入${item}。点击添加。在创建规则页面上,点击创建。您可以在规则列表中看到已创建的规则。
测试规则
您可以使用 MQTTX 客户端工具模拟一个 MQTT 客户端,将 Sparkplug B 消息发布到主题 my/sparkplug/topic。然后,您可以验证该消息是否以 JSON 格式转换并转发到主题 interesting_counters/counter1_run_updates:
打开 MQTTX 客户端桌面版并连接到 EMQX。有关使用 MQTTX 的详细信息,请参阅 MQTTX。
创建新的订阅并订阅主题
interesting_counters/counter1_run_updates。在右下角的消息发送区域,将
my/sparkplug/topic作为主题输入。选择Base64作为 payload 类型。复制以下 Base64 编码的 Sparkplug B 消息并粘贴到负载字段。此消息对应于之前给出的 Sparkplug 消息编码示例。
CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA点击发送按钮发送消息。
如果一切按预期工作,您应该会收到以下 JSON 格式的消息:
json{ "timestamp":1678094561525, "name":"counter_group1/counter1_run", "int_value":1, "datatype":5 }
更新数据
试想您发现了一个名为 counter_group1/counter1_run的错误指标,并希望在转发消息之前将其从 Sparkplug B 编码的 payload 中移除。
与提取数据中的演示类似,您可以在 EMQX Dashboard 中创建以下包含消息重发布动作的规则。
FOREACH
jq('
# Save payload
. as $payload |
# Save name of metric to delete
"counter_group1/counter1_run" as $to_delete |
# Filter out metric with name $to_delete
[ .metrics[] | select(.name != $to_delete) ] as $updated_metrics |
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
spb_decode(payload)) AS item
DO spb_encode(item) AS updated_payload
FROM "my/sparkplug/topic"在此规则中,spb_decode 用于解码消息,然后使用 jq 过滤出名为 counter_group1/counter1_run 的指标。然后,DO 子句中的 spb_encode 用于再次对消息进行编码。
在消息重发布动作中,将主题名设置为 ${updated_payload},因为它是在 DO 子句中分配给更新后的 Sparkplug B 编码消息的名称。
同样,您还可以使用 spb_decode 和 spb_encode 来更新指标的值。假设您想将名为 counter_group1/counter1_run 的指标值更新为 0。您可以通过以下规则实现:
FOREACH
jq('
# Save payload
. as $payload |
# Save name of metric to update
"counter_group1/counter1_run" as $to_update |
# Update value of metric with name $to_update
[
.metrics[] |
if .name == $to_update
then .int_value = 0
else .
end
] as $updated_metrics |
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"或者试想您希望添加一个名为 counter_group1/counter1_new 且值为 42 的新指标。您可以通过以下规则实现:
FOREACH
jq('
# Save payload
. as $payload |
# Save old metrics
$payload | .metrics as $old_metrics |
# New value
{
"name": "counter_group1/counter1_new",
"int_value": 42,
"datatype": 5
} as $new_value |
# Create new metrics array
($old_metrics + [ $new_value ]) as $updated_metrics |
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"过滤消息
试想您只希望转发那些名称为 counter_group1/counter1_run 且值大于 0 的指标的消息。您可以通过以下规则实现:
FOREACH
jq('
# Save payload
. as $payload |
# Save name of metric to filter on
"counter_group1/counter1_run" as $to_filter |
.metrics[] | select(.name == $to_filter) | .int_value as $value |
# Filter out messages where value of metric with name $to_filter is 0 or smaller
if $value > 0 then $payload else empty end
',
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"在上述规则中,jq 函数在指标的名称为 counter_group1/counter1_run 且值小于 0 时输出一个空数组。这意味着如果值为 0 或更小,则该消息不会被转发到与规则连接的任何动作。
拆分消息
试想您希望将一个 Sparkplug B 编码的消息拆分为多个消息,其中指标数组中的每个指标都作为单独的 Sparkplug B 编码的消息重新发布。您可以通过以下规则实现:
FOREACH
jq('
# Save payload
. as $payload |
# Output one message for each metric
.metrics[] |
. as $metric |
# Let the current metric be the only one in the metrics array
$payload | .metrics = [ $metric ]
',
spb_decode(payload)) AS item
DO spb_encode(item) AS output_payload
FROM "my/sparkplug/topic"在上述规则中,jq 函数输出包含多个条目的数组(假设指标数组中有多个条目)。与数组中的每个条目相关联的所有动作都会触发。 使用上述规则,您需要将重新发布动作的 payload 设置为 ${output_payload},因为 output_payload 是在 DO 子句中分配给 Sparkplug B 编码消息的名称。
拆分消息并根据内容发送到不同主题
试想您希望拆分 Sparkplug B 编码的消息,但还希望将每个消息根据指标的名称发送到不同的主题,例如,应该通过将指标的名称与字符串 "my_metrics/" 进行拼接来构建输出主题名。您可以通过以下稍作修改的代码实现:
FOREACH
jq('
# Save payload
. as $payload |
# Output one message for each metric
.metrics[] |
. as $metric |
# Let the current metric be the only one in the metrics array
$payload | .metrics = [ $metric ]
',
spb_decode(payload)) AS item
DO
spb_encode(item) AS output_payload,
first(jq('"my_metrics/" + .metrics[0].name', item)) AS output_topic
FROM "my/sparkplug/topic"要配置重新发布动作,请将主题名称设置为 ${output_topic},因为它是在 DO 子句中分配给输出主题的名称,并将 payload 设置为 ${output_payload}。
jq 函数调用被包裹在 DO 子句中,并使用 first 函数来获取第一个且唯一的输出对象。