Skip to content

MongoDB

通过 MongoDB 数据桥接可以将 MQTT 消息和客户端事件存储到 MongoDB 中。

TIP

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用

前置准备

功能清单

快速开始

安装 MongoDB

通过 Docker 安装并启动 MongoDB:

bash
# 启动一个 MongoDB 容器并设置密码为 public
docker run -d --name mongodb -p 27017:27017 mongo

# 进入容器
docker exec -it mongodb bash

# 在容器中连接到 MongoDB 服务器,
mongo

# 创建用户
use admin
db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })

# 创建名为 emqx_data 的数据库
use emqx_data

# 创建名为 emqx_messages 的集合
db.createCollection('emqx_messages')

创建 MongoDB 数据桥接

创建 MongoDB 数据桥接完成对客户端发布消息的存储。

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。
  2. 点击页面右上角的创建
  3. 数据桥接类型中选择 MongoDB,点击下一步
  4. 输入数据桥接名称,要求是大小写英文字母或数字组合。
  5. 部署模式SRV 记录根据情况选择,此处选择 single 与不启用。
  6. 输入 MongoDB 连接信息,数据库名字填写 emqx_data服务器地址填写 127.0.0.1:27017用户名填写 admin密码填写 public集合存储数据的集合,支持通过占位符 ${var_name} 动态设置,本例中填入 emqx_messages。其他配置项保持默认配置即可。
  7. 配置 有效载荷模板,将 clientidtopicqostimestamppayload 字段存储到 MongoDB 中,该模板将通过 MongoDB insert 命令执行,对应模板如下:
json
{
  "clientid": "${clientid}",
  "topic": "${topic}",
  "qos": ${qos},
  "timestamp": ${timestamp},
  "payload": ${payload}
}

TIP

配置有效载荷模板时需注意以下几点:

  • 所有的需要使用双引号 " 包裹;

  • 不支持自动推导的数据类型:

    • 字符类型的字段需要使用 " 包裹,否则将报错;
    • 数值类型字段不需要包裹,否则将被识别为字符类型;
    • 时间戳、日期和时间类型,如不做特殊处理,则将被识别为数值或字符类型,如希望以日期和时间类型存储,需要在规则 SQL 中使用 mongo_date 函数对字段进行处理,参考 时间与日期函数
  • 允许嵌套对象,当 为 JSON 对象时:

    • 模板中禁止使用双引号嵌套该值,否则将导致执行错误;

    • 对象将按自身结构嵌套存储;

    • 如需将对象存储为 JSON 字符,可以在规则 SQL 中使用 json_encode 函数转换,模板中对应的仍然禁止使用双引号包裹。

  1. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考配置参数
  2. 点击创建按钮完成数据桥接创建。此时 EMQX 将提示您数据桥接已成功创建,并询问是否创建相关规则,可点击创建规则按钮或通过点击左侧导航栏的数据集成 -> 规则进行创建。

创建数据转发规则

  1. 点击页面右上角的创建
  2. 输入规则 ID my_rule,在 SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息存储至 MongoDB,请确规则选择出来的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。此处规则 SQL 如下:
sql
SELECT
  *
FROM
  "t/#"

您也可以使用以下 SQL 将 timestamp 保存为日期类型数据、将 JSON 格式的 payload 保存为 JSON 字符串:

sql
SELECT
  *,
  mongo_date(timestamp) as timestamp,
  json_encode(payload) as payload
FROM
  "t/#"
  1. 添加动作,在动作下拉框中选择 使用数据桥接转发 选项,选择先前创建好的 MongoDB 数据桥接。点击页面底部的添加
  2. 点击最下方创建按钮完成规则的创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 MongoDB 存储。

测试数据桥接与规则

使用 MQTTX 向 t/1 主题发布消息:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello MongoDB" }'

查看数据桥接运行统计,命中、发送成功次数均 +1。

查看数据是否已经写入emqx_messages 集合中:

bash
> db.emqx_messages.find().pretty()
{
    "_id" : ObjectId("63db7059df489d01ed000009"),
    "clientid" : "emqx_c",
    "payload" : {
      "msg" : "hello MongoDB"
    },
    "qos" : 0,
    "timestamp" : NumberLong("1675325529070"),
    "topic" : "t/1"
}

使用第二种规则 SQL 时,对应数据内容如下:

bash
> db.emqx_messages.find().pretty()
{
    "_id" : ObjectId("63db7535df489d01ed000013"),
    "clientid" : "emqx_c",
    "payload" : "{ \"msg\": \"hello MongoDB\" }",
    "qos" : 0,
    "timestamp" : ISODate("2023-02-02T08:33:36.715Z"),
    "topic" : "t/1"
}