Skip to content

使用 Apache NiFi 连接到部署

Apache NiFi 是一个可视化的数据流管理工具,用于在不同系统之间可靠、高效地传输、转换和处理数据。它支持实时数据流、拖拽式流程设计、数据溯源、安全控制等特性。

本文主要介绍如何使用 Apache NiFi 连接到您的 EMQX Cloud 部署,并使用 Apache NiFi 进行一些简单的数据流处理操作。

前置准备

在将 Apache NiFi 连接到 EMQX Cloud 之前,请确保已完成以下准备:

  • 部署 MQTT 服务器
  • 安装 JDK
  • 部署 Apache NiFi

部署 MQTT 服务器

要将应用程序连接到 EMQX Cloud,您需要创建并配置一个部署。

Serverless 部署

  1. 在 EMQX Cloud 控制台中创建一个 Serverless 部署
  2. 部署创建并运行后,进入部署的概览 页面,获取 MQTT 连接信息,包括:
    • Broker 地址
    • 端口号(Serverless 部署仅支持 TLS 端口)
  3. Serverless 部署仅支持 TLS 连接。请务必从概览页面下载 CA 证书,并使用 8883 端口进行 TLS 连接。
  4. 在部署中通过访问控制 -> 客户端认证配置默认认证信息(用户名/密码)。

更多详情请参考:Serverless 连接指引

专有版或 BYOC 部署

  1. 您可以在 EMQX Cloud 控制台中创建一个专有版BYOC 部署。
  2. 创建完成后,进入部署的概览页面,获取 MQTT 连接信息,包括:
    • Broker 地址
    • MQTT 和 WebSocket 的 TCP 与 TLS 端口(支持 TCP 和 TLS 连接)
  3. 在部署中通过访问控制 -> 客户端认证 配置默认认证信息(用户名/密码)。

详细端口配置请参考:专有版 / BYOC 连接指引

安装 JDK

以部署 Apache NiFi 2.6.0 为例,您需要安装 JDK 21(或更高版本)才可以正常运行 Apache NiFi 。

Debian / Ubuntu

bash
sudo apt update
sudo apt install openjdk-21-jdk
java -version

CentOS 8+ / Fedora 8+ / RHEL

bash
sudo dnf install temurin-21-jdk
java -version

Arch Linux / Manjaro

bash
sudo pacman -S jdk-openjdk

部署 Apache NiFi

  1. Apache官方网站获取程序包并进行解压。以部署 Apache NiFi 2.6.0 为例。

    bash
    # 从 apache.org 下载 Apache NiFi 2.6.0 安装包
    wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip
    
    # 解压
    unzip nifi-2.6.0-bin.zip
    
    # 解压完成后删除压缩包
    rm nifi-2.6.0-bin.zip
  2. 进入bin目录下,配置账号密码后运行 Apache NiFi。

    bash
    cd nifi-2.6.0/bin
    
    # 配置您的账号密码,其中密码最少12个字符
    ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD>
    
    # 启动NiFi服务并后台运行
    ./nifi.sh start
    
    # 如需要前台运行NiFi服务,请替换为以下命令
    #./nifi.sh run
  3. Apache NiFi 2.x 默认使用 HTTPS 方式进行访问,且其自带的证书仅支持本机访问

  • 如您将 Apache NiFi 部署在本机,使用浏览器访问 https://localhost:8443/nifi 即可。

  • 如非本机部署,本教程提供三种方法解决访问报错问题。

    • 修改配置文件,通过 HTTP 访问(仅限开发环境,在生产环境中强烈建议使用HTTPS

      bash
      # 进入配置文件夹
      cd ~/nifi-2.6.0/conf
      # 使用您熟悉的文本编辑器打开nifi.properties,以 Vim 为例
      vim nifi.properties

      查找以下关键字进行修改

      • nifi.remote.input.secure=false
      • nifi.web.http.host=192.168.31.9请根据您的实际情况调整
      • nifi.web.http.port=8080
      • nifi.web.https.host=
      • nifi.web.https.port=

      然后重启 Apache NiFi 部署,在浏览器使用 http://<服务器IP>:8080/nifi 地址进行访问

    • 参照 Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI 进行证书与内网访问的配置

    • 通过 SSH 隧道访问(每次连接都需手动建立隧道,不方便,适合临时调试

      打开您的终端,输入以下命令

      bash
      ssh -L 8443:localhost:8443 <您的用户>@<您的服务器IP>

      验证通过后,打开浏览器访问 https://localhost:8443/nifi 即可。

  1. 当您看到以下界面,证明您的 Apache NiFi 部署已经完成。输入您设置的 Username 与 Password 进行登录。

    Apache NiFi登录界面

将 Apache NiFi 连接至您的 EMQX Cloud 部署

在 Apache NiFi 中,您可以使用一些 Processer 与 EMQX Cloud 进行 MQTT 通信。常用的 Processer 包括:

  • PublishMQTT,用于将数据流发送到 EMQX Cloud。
  • ConsumeMQTT,用于从 EMQX Cloud 接收数据流。

在连接部署之前,您需要在 EMQX Cloud 的访问控制客户端认证中添加用户以供 Apache NiFi 连接。如您在客户端授权中开启了白名单模式,您还需要在访问控制客户端授权中为用户授予必要的权限。

以下是一个 Apache NiFi 日志流数据处理程序示例,其结构如下:

  • GenerateFlowFile 用于生成模拟日志数据流,并输入至 PublishMQTT 处理器,将日志发送到 EMQX Cloud。
  • ConsumeMQTT 用于从 EMQX Cloud 接收日志数据流。
  • 两端的日志数据流最终输入到 LogAttribute 处理器,该处理器用于将日志数据流的属性写入本地日志。

Apache NiFi日志流数据处理程序示例

在 PublishMQTT 和 ConsumeMQTT 两个处理器中,您需要进行连接配置。配置要点如下。

  1. Broker URI 输入格式如下:

    <连接协议: 'tcp' | 'ssl' | 'ws' | 'wss'>://<连接地址>:<端口>

    例如:

    ssl://test.emqxcloud.cn:8883

    生产环境中,建议使用 SSL/WSS 进行加密数据传输。如选择加密方式,则必须配置 SSL Context Service。您可按以下步骤进行配置。

    您可能为 EMQX 配置了自己的证书,或者使用 EMQX Cloud 提供的 CA 证书。本教程以使用 EMQX Cloud 提供的证书为例。

    1. 部署概览 -> MQTT 连接信息部分,点击下载CA证书文件,得到 emqxcloud-ca.crt
    2. 将证书文件上传到部署 Apache NiFi 的服务器。
    3. 执行
      bash
      keytool -importcert \
      -alias myca \
      -file emqxcloud-ca.crt \
      -keystore truststore.jks \ 
      -storepass <替换成您的Storepass>
    4. 将生成的 truststore.jks 放到一个特定的目录下。
    5. 点击 SSL Context Service 选项后面的...,选择Create new service,选中StandardRestrictedSSLContextService,点击Add
    6. 点击 SSL Context Service 选项后面的...,选择Go to service
    7. 选择刚刚创建的 Service,选择 Edit
    8. 将 Truststore Filename 设置为您存放 truststore.jks 的目录路径,Truststore Password 设置为您的 Storepass,Truststore Type 选择 JKS
    9. 退出,并在选项后面的...中选择 Enable,启用该 Service。

    经过以上的步骤,您配置的 SSL Context Service 可在其他 Processer 中直接使用,无需重复配置。

  2. MQTT Specification Version 根据您的需求选择,建议选择v5.0

  3. Username 和 Password 设置为您配置的用户登录凭证。

  4. 根据您的实际需求设置其他必填项。

以上为必填项,其他可选项根据您的需求设置。设置完成后,点击 Verification 右侧的"✅",检测通过后,将左下角的Stopped设置为Start,Processer 开始运行。

将其他 Processer 根据测试需求进行配置后,全部设置为Start状态,这样日志流数据处理程序就配置完成了。

测试

使用MQTT调试工具(推荐使用MQTTX)对PublishMQTT指定的主题进行订阅。 可以看到,GenerateFlowFile 生成的模拟日志数据已经发送到主题下。 Apache NiFi Test 1

对 ConsumeMQTT 指定的主题手动发送日志信息,可以看到,程序中 ConsumeMQTT 的输出开始增加。 Apache NiFi Test 2

查看 Apache NiFi 生成的日志,可以看到,无论是 GenerateFlowFile 生成的模拟日志,还是您手动发送的模拟日志,都已经通过 LogAttribute 将数据流属性输出到 NiFi 自身日志(默认位于logs/nifi-app.log)。 Apache NiFi Test 3

更多内容

综上所述,我们实现了部署 Apache NiFi 连接到 EMQX Cloud 并构建一个模拟日志流数据处理程序进行测试的全过程。部署完成后,您可根据自己的业务需求灵活配置流程结构。您可以在 GitHub 上找到更多其他语言的 Demo 示例。

参考文献

Getting started with MQTT in Apache NiFi