将 MQTT 数据写入到 Couchbase
TIP
Couchbase 数据集成是 EMQX 企业版功能。
Couchbase 是一个多用途的分布式数据库,它结合了关系型数据库的优势(如 SQL 和 ACID 事务)与 JSON 的多功能性。Couchbase 的基础架构性能极高且可扩展,广泛应用于各行业的用户画像、动态产品目录、生成式 AI 应用、向量搜索、高速缓存等场景。
工作原理
Couchbase 数据集成是 EMQX 中开箱即用的功能,旨在将 MQTT 的实时数据捕获和传输能力与 Couchbase 的强大数据处理功能结合在一起。通过内置的规则引擎组件,这种集成简化了将数据从 EMQX 导入 Couchbase 进行存储和分析的过程,无需复杂的编码。
下图展示了 EMQX 与 Couchbase 数据集成的典型架构。
将 MQTT 数据导入 Couchbase 的工作流程如下:
- 消息发布和接收:工业物联网设备通过 MQTT 协议成功连接到 EMQX,并基于其操作状态、读数或触发事件,将机器、传感器和生产线的实时 MQTT 数据发布到 EMQX。当 EMQX 接收到这些消息时,它会启动其规则引擎中的匹配过程。
- 消息数据处理:消息到达后,它会通过规则引擎并由 EMQX 中定义的规则进行处理。这些规则基于预定义的条件,决定哪些消息需要路由到 Couchbase。如果规则指定了负载转换,则会进行相应的转换,如数据格式转换、过滤特定信息或用额外的上下文丰富负载。
- 数据导入到 Couchbase:一旦规则引擎识别出某条消息需要存储到 Couchbase,它会触发一个操作,将消息转发到 Couchbase。处理后的数据将无缝地写入 Couchbase 数据库的数据集中。
- 数据存储和利用:数据存储在 Couchbase 中后,企业可以利用其强大的查询能力支持各种用例。例如,在动态产品目录的场景中,企业可以使用 Couchbase 高效管理和检索产品信息,支持实时库存更新,并向客户提供个性化推荐,从而提升购物体验并增加销售额。
功能和优势
与 Couchbase 的数据集成提供了一系列功能和优势,旨在确保数据传输、存储和利用的高效性:
- 实时数据流:EMQX 专为处理实时数据流而设计,确保从源系统到 Couchbase 的数据传输高效且可靠。它使组织能够实时捕获和分析数据,非常适合需要即时见解和行动的使用场景。
- 高性能和可扩展性:EMQX 的分布式架构和 Couchbase 的列式存储格式在数据量增加时实现无缝扩展。这确保了即使在处理大数据集时,性能和响应速度也能保持一致。
- 数据转换的灵活性:EMQX 提供了强大的基于 SQL 的规则引擎,允许组织在将数据存储到 Couchbase 之前进行预处理。它支持多种数据转换机制,如过滤、路由、聚合和丰富,帮助组织根据自身需求调整数据。
- 易于部署和管理:EMQX 提供了一个用户友好的界面,用于配置数据源、预处理数据规则和 Couchbase 存储设置。这简化了数据集成过程的设置和持续管理。
- 高级分析功能:Couchbase 强大的基于 SQL 的查询语言和对复杂分析功能的支持,使用户能够从物联网数据中获得有价值的见解,实现预测性分析、异常检测等功能。
准备工作
本节介绍在 EMQX Dashboard 中创建 Couchbase 数据集成之前需要完成的准备工作。
前置准备
启动 Couchbase 服务器
本节介绍如何使用 Docker 启动 Couchbase 服务器。
使用以下命令启动 Couchbase 服务器。
bashdocker run -t --name db -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase/server:enterprise-7.2.0
运行该命令时,Docker 会下载并安装 Couchbase 服务器。Couchbase 服务器在 Docker 虚拟环境中启动后,您应该会看到以下消息:
Starting Couchbase Server -- Web UI available at http://<ip>:8091 and logs available in /opt/couchbase/var/lib/couchbase/logs
在浏览器中打开 Couchbase Web 控制台,访问
http://localhost:8091
。
- 点击 Setup New Cluster 并为您的集群命名。为了便于入门,将完整管理员凭据设置为
admin
和password
。
接受条款和条件,然后点击 Finish with Defaults,使用默认值完成配置。
输入完配置信息后,点击右下角的 Save & Finish 按钮。这将根据配置设置服务器,并打开 Couchbase Web 控制台仪表板。在左侧导航面板中选择 Buckets,然后点击 ADD BUCKET 按钮。
输入 bucket 的名称,例如
emqx
,然后点击 Create 以创建 bucket。为默认集合创建主索引:
bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "create primary index on default:emqx_data._default._default;"
您可以在官方文档页面找到有关在 Docker 中运行 Couchbase 的更多信息。
创建连接器
本节演示如何创建一个连接器,将 Sink 连接到 Couchbase 服务器。
以下步骤假设您在本地机器上运行 EMQX 和 Couchbase。如果 Couchbase 和 EMQX 远程运行,请相应调整设置。
- 进入 EMQX Dashboard,点击集成 -> 连接器。
- 点击页面右上角的创建。
- 在连接器类型中,选择 Couchbase,然后点击 下一步。
- 在配置信息步骤中,配置以下信息:
- 连接器名称:输入连接器名称,名称应为大小写字母和数字的组合,例如:
my_couchbase
。 - 服务器地址:
127.0.0.1
- 用户名:
admin
- 密码:
password
- 连接器名称:输入连接器名称,名称应为大小写字母和数字的组合,例如:
- 高级设置(可选):请参阅高级配置。
- 在点击创建之前,您可以点击测试连接按钮,测试连接器是否能连接到 Couchbase 服务器。
- 点击页面底部的创建按钮,完成连接器的创建。在弹出对话框中,您可以点击返回连接器列表或点击创建规则继续创建规则和接收器,以指定要转发到 Couchbase 的数据。有关详细步骤,请参阅创建 Couchbase Sink 规则。
创建 Couchbase Sink 规则
本节演示如何在 Dashboard 中创建规则,以处理来自源 MQTT 主题 t/#
的消息,并通过配置的接收器将处理后的结果转发到 Couchbase。
进入 EMQX Dashboard,从左侧导航菜单点击集成 -> 规则。
点击页面右上角的创建。
输入规则 ID,例如
my_rule
。在 SQL 编辑器中输入规则,例如希望将
t/#
主题的 MQTT 消息转发至 Couchbase,可通过如下规则 SQL 实现:sqlSELECT * FROM "t/#"
点击 + 添加动作 按钮,定义规则触发时将执行的操作。通过此操作,EMQX 将规则处理后的数据发送到 Couchbase。
从动作类型下拉列表中选择
Couchbase
。保持动作下拉框为默认的创建动作
选项,您也可以选择一个之前已经创建好的 Couchbase Sink。此处将创建一个全新的 Sink 并添加到规则中。输入 Sink 名称。名称应为大小写字母和数字的组合。
从连接器下拉框中选择刚创建的
my_couchbase
。您也可以通过点击下拉框旁的按钮创建新的连接器。有关配置参数,请参阅创建连接器。在 SQL 模板中输入以下命令:
sqlinsert into emqx_data (key, value) values (${.id}, ${.payload})
其中,
${.id}
和${.payload}
分别表示 MQTT 消息的 id 和 payload。在转发消息之前,EMQX 会用相应的内容替换它们。高级设置(可选):请参阅高级配置。
在点击创建之前,您可以点击测试连接按钮,确保能够连接到 Couchbase 服务器。
点击创建按钮完成 Sink 配置。在创建规则页面,您将在动作输出选项卡下看到新的 Sink。
在创建规则页面,验证配置信息并点击创建按钮生成规则。您创建的规则将显示在规则列表中,状态应为已连接。
现在,您已成功创建规则,您可以在规则页面上看到新规则。点击 动作 (Sink) 选项卡,您会看到新的 Couchbase Sink。
您还可以点击 集成 -> Flow 设计器 查看拓扑。您可以看到主题 t/#
下的消息在被规则 my_rule
解析后,被发送并保存到 Couchbase。
测试规则
您可以使用 EMQX Dashboard 内置的 WebSocket 客户端来测试规则是否按预期工作。
点击 Dashboard 左侧导航菜单中的 问题分析 -> WebSocket 客户端,访问 WebSocket 客户端。按照以下步骤设置 WebSocket 客户端并向主题 t/test
发送消息:
填写当前 EMQX 实例的连接信息。如果您在本地运行 EMQX,您可以使用默认值,除非您更改了 EMQX 的默认配置(例如,您可能已配置了身份验证,这可能需要输入用户名和密码)。
点击连接连接客户端到 EMQX 实例。
向下滚动到发布区域并输入以下内容:
- 主题:
t/test
- Payload:
Hello World Couchbase from EMQX
- QoS:
2
- 主题:
点击发布发送消息。Couchbase 服务器中的
emqx_data
bucket 中应已插入一个条目。您可以通过在终端运行以下命令来检查:bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "SELECT * FROM emqx_data._default._default LIMIT 5;"
如果一切正常,上述命令应输出类似以下内容(
requestID
和指标可能有所不同):json{ "requestID": "88be238c-5b63-453d-ac16-c0368a5be2bc", "signature": { "*": "*" }, "results": [ { "_default": "Hello World Couchbase from EMQX" } ], "status": "success", "metrics": { "elapsedTime": "3.189125ms", "executionTime": "3.098709ms", "resultCount": 1, "resultSize": 61, "serviceLoad": 2 } }
高级配置
本节深入探讨 EMQX Couchbase 连接器可用的高级配置选项。在 Dashboard 中配置连接器时,导航到 高级设置,根据您的具体需求调整以下参数。
字段 | 描述 | 推荐值 |
---|---|---|
HTTP 流水线** | 指定可以在不等待单个响应的情况下连续发送到服务器的 HTTP 请求数量。此选项取一个正整数值,表示将被流水线化的 HTTP 请求的最大数量。 设置为 1 时,表示传统的请求-响应模型,其中每个 HTTP 请求都会发送,然后客户端会等待服务器响应,然后再发送下一个请求。较高的值可以通过允许批量发送多个请求来更有效地使用网络资源,从而减少往返时间。 | 100 |
连接池大小 | 指定与 Couchbase 服务对接时连接池中可以维持的并发连接数。此选项通过限制或增加 EMQX 和 Couchbase 之间的活动连接数,有助于管理应用程序的可扩展性和性能。 注意:设置适当的连接池大小取决于多种因素,如系统资源、网络延迟和应用程序的特定工作负载。过大的池大小可能导致资源耗尽,而过小的大小可能限制吞吐量。 | 8 |
连接超时 | 指定连接器在尝试与 Couchbase 服务器建立连接时将等待的最长时间(以秒为单位)。 注意:精心选择的超时设置对于平衡系统性能和资源利用率至关重要。建议在各种网络条件下测试系统,以找到适合您的特定用例的最佳超时值。 | 15 |
启动超时时间 | 确定连接器在响应资源创建请求之前,将等待自动启动的资源达到健康状态的最长时间间隔(以秒为单位)。此设置有助于确保连接器在验证连接资源(例如 Couchbase 中的数据库实例)完全运行并准备好处理数据事务之前,不会继续进行操作。 | 5 |
健康检查间隔 | 指定连接器对 Couchbase 连接执行自动健康检查的时间间隔(以秒为单位)。 | 15 |