编解码举例 - 外部 HTTP
本文介绍了如何通过具有自定义逻辑的外部 HTTP 服务,实现 EMQX Platform 的 Schema Registry 与规则引擎对消息的编码与解码处理。
在某些场景下,您可能需要对消息应用自定义的编码或解码逻辑,而这些逻辑并非 EMQX Platform 原生支持。EMQX Platform 支持通过规则中的 schema_encode
和 schema_decode
函数,将编码/解码处理委托给外部 HTTP 服务来完成。
外部 HTTP API 规范
要实现与 EMQX Platform 的 schema_encode
和 schema_decode
函数配套的外部 HTTP API,服务端需提供一个 POST
接口用于接收 EMQX Platform 发起的编码或解码请求。
请求格式
请求体是一个 JSON 对象,包含以下字段:
payload
:规则引擎中传入schema_encode
或schema_decode
函数的值,已进行 Base64 编码,类型为字符串。type
:字符串,值为encode
或decode
,用于区分当前执行的是schema_encode
还是schema_decode
。schema_name
:当前在 EMQX Platform 中配置的外部 HTTP Schema 名称。opts
:可选字符串,由 EMQX Platform 配置传入,可用于携带额外参数,原样传递给 HTTP 服务。
响应格式
- 服务端必须返回 HTTP 状态码
200
。 - 响应体应为一个 Base64 编码后的字符串,表示最终结果。
- 注意:此 Base64 字符串应为纯文本,不应嵌套在 JSON 对象中返回。
使用示例
假设某设备发布了一条二进制消息,您希望使用自定义的 XOR 操作来对该消息进行编码或解码。本节通过构建一个带有 XOR 编码逻辑的简单 HTTP 服务,展示如何将用户自定义的编解码逻辑通过外部服务接入 EMQX Platform 规则引擎中。
构建外部 HTTP 服务
以下示例展示了如何使用 Python + Flask 编写并运行一个简单的 HTTP 服务,用于将接收到的 Base64 消息进行 XOR 编码处理。
示例:外部 HTTP 服务
确保已安装 Flask:
pip install Flask==3.1.0
示例代码:
from flask import Flask, request
import base64
app = Flask(__name__)
@app.route("/serde", methods=['POST'])
def serde():
# 接收并解码 base64 编码的输入
body = request.get_json(force=True)
print("incoming request:", body)
payload64 = body.get("payload")
payload = base64.b64decode(payload64)
secret = 122
response = bytes(b ^ secret for b in payload)
# 返回的结果也需进行 base64 编码
response64 = base64.b64encode(response)
return response64
运行服务:
# 假设服务保存在当前目录的 `myapp.py` 文件中
flask --app myapp --debug run -h 0.0.0.0 -p 9500
在 EMQX Platform 中创建 External HTTP Schema
- 进入您的部署,点击左侧导航栏的数据智能中心 -> Schema Registry。
- 在内部 Schema 标签页中,点击新建。
- 使用以下参数创建外部 HTTP Schema:
- 名称:
myhttp
- 类型:
External HTTP
- URL:您的 HTTP 服务运行地址,例如
http://server:9500/serde
。
- 名称:
- 点击确认完成创建。
创建规则应用 Schema
通过规则引擎创建一条规则,使用该 Schema 对消息进行编码和解码。
在部署菜单中选择数据集成。
如果您已经创建过连接器或规则,在规则列表区域点击新建规则进入规则创建页面。如果您从未创建过连接器或规则,在数据转发分类中选择消息重新发布。
在新建规则页面中,使用您之前创建的 schema 编写 SQL 语句:
sqlSELECT schema_encode('myhttp', payload) as encoded, schema_decode('myhttp', encoded) as decoded FROM "t/external_http"
语句中的
schema_encode
和schema_decode
会调用配置的外部 HTTP 服务,对 payload 进行处理。点击下一步进入新建输出动作页面,在使用连接器下拉中选择
消息重新发布
。TIP
如果您在第二步中选择消息重新发布,则无需再选择连接器。
在主题字段中填写目标主题:
external_http/out
。在 Payload 字段中填写消息模板:
${.}
。点击确认将动作添加到规则,并点击返回规则列表。
该动作会将解码后的消息以 JSON 格式发布到主题
external_http/out
。${.}
是变量模板,运行时会替换为规则输出的完整内容。
验证规则执行结果
在部署左侧菜单中选择在线调试。
完成连接配置,以 MQTT 客户端身份连接到部署。
- 如果部署的访问控制中添加了认证信息,输入用户名和密码,并点击连接。
- 如果部署没有添加认证信息,选择未添加认证并点击生成认证并连接。
在订阅区域的主题字段中输入
external_http/out
,点击订阅。在发布区域输入主题
t/external_http
,填入任意 payload,点击发布。在消息接收区域检查是否收到响应。例如,发送内容为
hello
时,可能会收到如下消息:json{"encoded":"\u0012\u001F\u0016\u0016\u0015","decoded":"hello"}