使用 Apache NiFi 连接到部署
Apache NiFi 是一个可视化的数据流管理工具,用于在不同系统之间可靠、高效地传输、转换和处理数据。它支持实时数据流、拖拽式流程设计、数据溯源、安全控制等特性。
本文主要介绍如何使用 Apache NiFi 连接到您的 EMQX Cloud 部署,并使用 Apache NiFi 进行一些简单的数据流处理操作。
前置准备
在将 Apache NiFi 连接到 EMQX Cloud 之前,请确保已完成以下准备:
- 部署 MQTT 服务器
- 安装 JDK
- 部署 Apache NiFi
部署 MQTT 服务器
要将应用程序连接到 EMQX Cloud,您需要创建并配置一个部署。
Serverless 部署
- 在 EMQX Cloud 控制台中创建一个 Serverless 部署。
- 部署创建并运行后,进入部署的概览 页面,获取 MQTT 连接信息,包括:
- Broker 地址
- 端口号(Serverless 部署仅支持 TLS 端口)
- Serverless 部署仅支持 TLS 连接。请务必从概览页面下载 CA 证书,并使用
8883端口进行 TLS 连接。 - 在部署中通过访问控制 -> 客户端认证配置默认认证信息(用户名/密码)。
更多详情请参考:Serverless 连接指引。
专有版或 BYOC 部署
- 您可以在 EMQX Cloud 控制台中创建一个专有版或 BYOC 部署。
- 创建完成后,进入部署的概览页面,获取 MQTT 连接信息,包括:
- Broker 地址
- MQTT 和 WebSocket 的 TCP 与 TLS 端口(支持 TCP 和 TLS 连接)
- 在部署中通过访问控制 -> 客户端认证 配置默认认证信息(用户名/密码)。
详细端口配置请参考:专有版 / BYOC 连接指引。
安装 JDK
以部署 Apache NiFi 2.6.0 为例,您需要安装 JDK 21(或更高版本)才可以正常运行 Apache NiFi 。
Debian / Ubuntu
sudo apt update
sudo apt install openjdk-21-jdk
java -versionCentOS 8+ / Fedora 8+ / RHEL
sudo dnf install temurin-21-jdk
java -versionArch Linux / Manjaro
sudo pacman -S jdk-openjdk部署 Apache NiFi
从Apache官方网站获取程序包并进行解压。以部署 Apache NiFi 2.6.0 为例。
bash# 从 apache.org 下载 Apache NiFi 2.6.0 安装包 wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip # 解压 unzip nifi-2.6.0-bin.zip # 解压完成后删除压缩包 rm nifi-2.6.0-bin.zip进入
bin目录下,配置账号密码后运行 Apache NiFi。bashcd nifi-2.6.0/bin # 配置您的账号密码,其中密码最少12个字符 ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD> # 启动NiFi服务并后台运行 ./nifi.sh start # 如需要前台运行NiFi服务,请替换为以下命令 #./nifi.sh runApache NiFi 2.x 默认使用 HTTPS 方式进行访问,且其自带的证书仅支持本机访问。
如您将 Apache NiFi 部署在本机,使用浏览器访问 https://localhost:8443/nifi 即可。
如非本机部署,本教程提供三种方法解决访问报错问题。
修改配置文件,通过 HTTP 访问(仅限开发环境,在生产环境中强烈建议使用HTTPS)
bash# 进入配置文件夹 cd ~/nifi-2.6.0/conf # 使用您熟悉的文本编辑器打开nifi.properties,以 Vim 为例 vim nifi.properties查找以下关键字进行修改
nifi.remote.input.secure=falsenifi.web.http.host=192.168.31.9(请根据您的实际情况调整)nifi.web.http.port=8080nifi.web.https.host=nifi.web.https.port=
然后重启 Apache NiFi 部署,在浏览器使用
http://<服务器IP>:8080/nifi地址进行访问参照 Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI 进行证书与内网访问的配置
通过 SSH 隧道访问(每次连接都需手动建立隧道,不方便,适合临时调试)
打开您的终端,输入以下命令
bashssh -L 8443:localhost:8443 <您的用户名>@<您的服务器IP>验证通过后,打开浏览器访问
https://localhost:8443/nifi即可。
当您看到以下界面,证明您的 Apache NiFi 部署已经完成。输入您设置的 Username 与 Password 进行登录。

将 Apache NiFi 连接至您的 EMQX Cloud 部署
在 Apache NiFi 中,您可以使用一些 Processer 与 EMQX Cloud 进行 MQTT 通信。常用的 Processer 包括:
- PublishMQTT,用于将数据流发送到 EMQX Cloud。
- ConsumeMQTT,用于从 EMQX Cloud 接收数据流。
在连接部署之前,您需要在 EMQX Cloud 的访问控制 → 客户端认证中添加用户以供 Apache NiFi 连接。如您在客户端授权中开启了白名单模式,您还需要在访问控制 → 客户端授权中为用户授予必要的权限。
以下是一个 Apache NiFi 日志流数据处理程序示例,其结构如下:
- GenerateFlowFile 用于生成模拟日志数据流,并输入至 PublishMQTT 处理器,将日志发送到 EMQX Cloud。
- ConsumeMQTT 用于从 EMQX Cloud 接收日志数据流。
- 两端的日志数据流最终输入到 LogAttribute 处理器,该处理器用于将日志数据流的属性写入本地日志。

在 PublishMQTT 和 ConsumeMQTT 两个处理器中,您需要进行连接配置。配置要点如下。
Broker URI 输入格式如下:
<连接协议: 'tcp' | 'ssl' | 'ws' | 'wss'>://<连接地址>:<端口>例如:
ssl://test.emqxcloud.cn:8883生产环境中,建议使用 SSL/WSS 进行加密数据传输。如选择加密方式,则必须配置 SSL Context Service。您可按以下步骤进行配置。
您可能为 EMQX 配置了自己的证书,或者使用 EMQX Cloud 提供的 CA 证书。本教程以使用 EMQX Cloud 提供的证书为例。
- 在
部署概览->MQTT 连接信息部分,点击下载CA证书文件,得到emqxcloud-ca.crt。 - 将证书文件上传到部署 Apache NiFi 的服务器。
- 执行bash
keytool -importcert \ -alias myca \ -file emqxcloud-ca.crt \ -keystore truststore.jks \ -storepass <替换成您的Storepass> - 将生成的
truststore.jks放到一个特定的目录下。 - 点击
SSL Context Service选项后面的...,选择Create new service,选中StandardRestrictedSSLContextService,点击Add。 - 点击
SSL Context Service选项后面的...,选择Go to service。 - 选择刚刚创建的 Service,选择
Edit。 - 将 Truststore Filename 设置为您存放
truststore.jks的目录路径,Truststore Password 设置为您的 Storepass,Truststore Type 选择JKS。 - 退出,并在选项后面的
...中选择Enable,启用该 Service。
经过以上的步骤,您配置的 SSL Context Service 可在其他 Processer 中直接使用,无需重复配置。
- 在
MQTT Specification Version 根据您的需求选择,建议选择
v5.0。Username 和 Password 设置为您配置的用户登录凭证。
根据您的实际需求设置其他必填项。
以上为必填项,其他可选项根据您的需求设置。设置完成后,点击 Verification 右侧的"✅",检测通过后,将左下角的Stopped设置为Start,Processer 开始运行。
将其他 Processer 根据测试需求进行配置后,全部设置为Start状态,这样日志流数据处理程序就配置完成了。
测试
使用MQTT调试工具(推荐使用MQTTX)对PublishMQTT指定的主题进行订阅。 可以看到,GenerateFlowFile 生成的模拟日志数据已经发送到主题下。 
对 ConsumeMQTT 指定的主题手动发送日志信息,可以看到,程序中 ConsumeMQTT 的输出开始增加。 
查看 Apache NiFi 生成的日志,可以看到,无论是 GenerateFlowFile 生成的模拟日志,还是您手动发送的模拟日志,都已经通过 LogAttribute 将数据流属性输出到 NiFi 自身日志(默认位于logs/nifi-app.log)。 
更多内容
综上所述,我们实现了部署 Apache NiFi 连接到 EMQX Cloud 并构建一个模拟日志流数据处理程序进行测试的全过程。部署完成后,您可根据自己的业务需求灵活配置流程结构。您可以在 GitHub 上找到更多其他语言的 Demo 示例。