Skip to content

将 MQTT 数据写入到 QuasarDB

QuasarDB 是一款高性能的列式时序数据库,专为存储和查询大规模时间戳数据而设计。EMQX 支持与 QuasarDB 集成,使您能够将 MQTT 消息和客户端事件保存到 QuasarDB,以便构建物联网遥测数据管理和分析的数据管道。

本页详细介绍了 EMQX 与 QuasarDB 的数据集成,并提供了实用的连接器和规则创建指导。

工作原理

QuasarDB 数据集成是 EMQX 的开箱即用功能,结合了 EMQX 的设备接入、消息传输能力与 QuasarDB 高性能的时序存储能力。通过内置的规则引擎组件和 Sink,您可以将 MQTT 消息和客户端事件存储到 QuasarDB 中。该集成简化了从 EMQX 到 QuasarDB 的数据摄取过程,无需复杂的编码。

下图展示了 EMQX 和 QuasarDB 之间数据集成的典型架构:

quasardb_integration

将 MQTT 数据摄取到 QuasarDB 的工作流程如下:

  1. 消息发布和接收:物联网设备通过 MQTT 协议成功连接到 EMQX,并将实时 MQTT 数据发布到 EMQX。当 EMQX 接收到这些消息时,它将在其规则引擎中启动匹配过程。
  2. 消息数据处理:当消息到达时,它会通过规则引擎进行处理,然后由 EMQX 中定义的规则处理。规则根据预定义的标准确定哪些消息需要路由到 QuasarDB。如果任何规则指定了载荷转换,那些转换将被应用,例如转换数据格式、过滤出特定信息,或用额外的上下文丰富载荷。
  3. 数据写入到 QuasarDB:规则触发将消息写入 QuasarDB 的操作。借助 SQL 模板,用户可以从规则处理结果中提取数据来构造 INSERT 语句并发送到 QuasarDB 执行,从而将消息的特定字段写入对应的数据表中。
  4. 数据存储和利用:数据现存储在 QuasarDB 中,企业可以利用其时序查询能力进行分析、监控和运营。

特性与优势

与 QuasarDB 的数据集成提供了一系列特性和优势:

  • 实时数据流:EMQX 专为处理实时数据流而构建,确保了从源系统到 QuasarDB 的数据传输的高效性和可靠性,非常适合需要立即洞察和行动的用例。
  • 高性能时序存储:QuasarDB 的列式引擎针对时序工作负载进行了优化,提供快速的写入吞吐量,并能对大量时间戳数据进行高效的范围查询。
  • 数据转换的灵活性:EMQX 提供了强大的基于 SQL 的规则引擎,允许组织在将数据存储到 QuasarDB 之前进行预处理,支持过滤、路由、聚合和丰富等多种数据转换机制。
  • 批量写入支持:QuasarDB Sink 支持批量写入,减少网络往返次数,提升整体写入吞吐量。

准备工作

本节介绍了在 EMQX 中创建 QuasarDB 数据集成之前需要做的准备工作,包括如何安装并配置 ODBC 驱动程序以及安装 QuasarDB。

前置准备

安装并配置 ODBC 驱动程序

QuasarDB 连接器通过 ODBC 连接数据库。在创建连接器之前,您需要在运行 EMQX 的主机上安装并配置 QuasarDB ODBC 驱动程序。

完整安装说明请参考 QuasarDB ODBC 文档。以下步骤展示了在基于 Debian 的系统上使用驱动程序 3.14.1 版本的典型配置流程。

  1. 下载并安装 QuasarDB C API 包和 ODBC 驱动程序:

    bash
    curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/c/qdb-api_3.14.1.deb
    curl -fsSL -O https://download.quasar.ai/quasardb/3.14/3.14.1/api/odbc/qdb-3.14.1-linux-64bit-odbc-driver.tar.gz
    apt-get install -yqq ./qdb-api_3.14.1.deb
    tar -C /tmp/qdb_odbc_driver -xf qdb-3.14.1-linux-64bit-odbc-driver.tar.gz
  2. /etc/odbcinst.ini 中注册驱动程序:

    ini
    [qdb_odbc_driver]
    Description=Quasardb ODBC Driver
    Driver=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so
    Setup=/tmp/qdb_odbc_driver/lib/libqdb_odbc_driver.so
  3. /etc/odbc.ini 中创建数据源名称(DSN)条目:

    ini
    [qdb]
    Driver = qdb_odbc_driver
    Description = QuasarDB ODBC Data Source
    #URI = qdb://172.100.239.30:2836
    #UID = user_name
    #PWD = user_key
    #KEY = cluster_public_key

此处配置的 DSN 名称(如 qdb)即为创建连接器时在数据源名称字段中填写的值。

安装并连接到 QuasarDB

本节介绍如何使用 Docker 启动 QuasarDB 实例。

  1. 拉取并启动 QuasarDB Docker 镜像:

    bash
    docker run -d --name qdb \
      -p 2836:2836 \
      bureau14/qdb:3.14.1

    TIP

    QuasarDB 要求使用 IP 地址而非主机名进行连接。请在 URI 中使用 127.0.0.1 或实际主机 IP,不支持基于主机名的连接。

  2. 使用 QuasarDB Shell 验证实例是否正常运行:

    bash
    docker run -it --rm bureau14/qdbsh --cluster qdb://127.0.0.1:2836

如需启用用户认证或集群密钥认证,请参考 QuasarDB 安全文档

创建数据表

在 QuasarDB 中创建用于接收写入数据的表。以下示例创建一张存储温湿度数据的表:

sql
CREATE TABLE temp_hum (temp DOUBLE, hum DOUBLE);

TIP

QuasarDB 数据表始终包含一个隐式的 $timestamp 索引列,创建表时无需声明,但可以在 INSERT 语句中引用它。

创建连接器

本节介绍如何创建连接器,将 EMQX 连接到 QuasarDB。

  1. 进入 EMQX Dashboard,点击集成 -> 连接器

  2. 点击页面右上角的创建

  3. 创建连接器页面,选择 QuasarDB,然后点击下一步

  4. 输入连接器名称,必须为大小写字母和数字的组合,例如 my_quasardb

  5. 填写连接信息:

    • URI:输入 QuasarDB 集群的 URI,需使用 IP 地址,例如 qdb://127.0.0.1:2836。默认端口为 2836
    • 数据源名称:输入在 /etc/odbc.ini 中定义的 DSN 名称,例如 qdb
    • 用户名:输入用户名(如有)。
    • 密码:输入用户密钥(如有)。
    • 集群公钥:输入集群公钥(如有)。
  6. 高级设置(可选):详情请参考高级设置

  7. 点击创建之前,可先点击测试连接,验证 EMQX 是否能成功连接到 QuasarDB。

  8. 点击创建按钮完成连接器创建。页面将弹出创建成功对话框,询问是否立即创建规则。点击创建规则可直接进入规则创建页面且连接器已预先选中;点击返回连接器列表则返回列表,稍后再创建规则。

创建 QuasarDB Sink 规则

本节介绍如何在 Dashboard 中创建一条规则,处理来自 MQTT 主题 t/# 的消息,并通过配置的 Sink 将处理后的数据保存到 QuasarDB 表 temp_hum 中。

  1. 如果您在上一步点击了创建规则添加动作面板将自动打开,且动作类型已设置为 QuasarDB、连接器已预先选中。可直接跳至第 5 步。

    否则,请进入 EMQX Dashboard,点击集成 -> 规则,点击右上角的创建,然后点击 + 添加动作

  2. 在左侧 SQL 编辑器中输入规则 ID 及以下 SQL,匹配主题 t/# 下的消息:

    注意:如需自定义 SQL 语法,请确保 SELECT 部分包含了 Sink 所需的所有字段。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    如果您是初学者,可点击 SQL 示例启用调试来学习和测试 SQL 规则。

  3. 在右侧添加动作面板中,从动作类型下拉列表选择 QuasarDB动作下拉框保持默认的创建动作

  4. 连接器下拉框中选择刚创建的 my_quasardb 连接器。也可以点击下拉框旁的按钮创建新连接器,配置参数请参考创建连接器

  5. 输入 Sink 的名称和可选描述。

  6. 配置 SQL 模板,定义数据写入 QuasarDB 的方式。

    注意

    SQL 模板仅接受 INSERT 语句,不支持 UPDATE、DELETE 等其他语句类型。

    SQL 模板支持占位符变量,如 ${clientid}。QuasarDB 使用 $timestamp 作为隐式时间戳索引列,可使用 now() 插入当前服务器时间。

    注意

    QuasarDB ODBC 驱动程序不支持预处理语句。在 SQL 模板中,所有解析为 STRINGBLOB 类型的值都必须手动用单引号(')包裹。

    sql
    insert into temp_hum($timestamp, temp, hum)
    values (now(), ${.temp}, ${.hum})
  7. 备选动作(可选):定义一个或多个备选动作,以提高消息投递失败时的可靠性。详情请参考备选动作

  8. 高级设置(可选):详情请参考 Sink 高级设置

  9. 点击创建之前,可先点击测试连接,验证 Sink 能否连接到 QuasarDB。

  10. 点击创建按钮完成 Sink 配置,新 Sink 将添加到动作输出中。

  11. 返回创建规则页面,确认配置信息无误后,点击保存生成规则。

至此,您已成功创建规则。您可以在集成 -> 规则页面看到新创建的规则,点击**动作(Sink)**标签可查看新的 QuasarDB Sink。

您还可以点击集成 -> 流程设计器查看拓扑图,确认主题 t/# 下的消息经规则 my_rule 处理后被转发并保存到 QuasarDB。

测试规则

使用 MQTTX 向主题 t/1 发送一条消息以触发规则:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "temp": "27.5", "hum": "41.8" }'

查看 QuasarDB Sink 的运行统计,应有 1 条新的匹配消息和 1 条新的发出消息。在 QuasarDB 中查询 temp_hum 表,验证数据是否已成功写入。

高级设置

本节介绍 QuasarDB 连接器和 Sink 的高级配置选项。在 Dashboard 中进行配置时,您可以展开高级设置,根据具体需求调整以下参数。

连接器高级设置

字段名称描述默认值
连接池大小连接池中维护的并发连接数。连接池过大可能导致资源耗尽,过小则可能限制吞吐量。8
连接超时与 QuasarDB 建立连接时的最大等待时长。5
启动超时时间连接器等待自动启动的资源进入健康状态的最长时间,超时后才响应资源创建请求。5
健康检查间隔连接器对 QuasarDB 连接执行自动健康检查的时间间隔。15
健康检查超时每次健康检查的最大允许时长。60

Sink 高级设置

字段名称描述默认值
缓存池大小负责处理 EMQX 与 QuasarDB 之间数据流的缓冲工作进程数量。高负载场景下可适当调大以提升吞吐量。16
请求超期请求在缓冲区中的最长有效时长。超过此时间仍未发送,或已发送但未收到 QuasarDB 确认的请求,将被丢弃。45
健康检查间隔Sink 对 QuasarDB 连接执行自动健康检查的时间间隔。15
健康检查间隔抖动在健康检查间隔上叠加的随机延迟,用于避免多个节点同时触发健康检查。多个 Action 或 Source 共享同一连接器时尤为有用。0 毫秒
健康检查超时Sink 每次健康检查的最大允许时长。60
缓存队列最大长度每个缓冲工作进程可缓冲的最大字节数。负载存在突发峰值时,可适当调大此值。256 MB
最大批量请求大小单次向 QuasarDB 写入的最大记录数。设置为 1 时禁用批量写入,改为逐条发送。100
请求模式异步模式下,写入 QuasarDB 不阻塞 MQTT 消息发布,吞吐量更高,但可能造成乱序;同步模式下,每次写入等待确认后再继续。异步
请求飞行队列窗口同时允许的最大在途请求数。请求模式异步 时,如需保证同一 MQTT 客户端的消息严格有序,应将此值设为 1100