# 使用 Apache NiFi 连接到部署

Apache NiFi 是一个可视化的数据流管理工具，用于在不同系统之间可靠、高效地传输、转换和处理数据。它支持实时数据流、拖拽式流程设计、数据溯源、安全控制等特性。

本文主要介绍如何使用 Apache NiFi 连接到您的 EMQX Cloud 部署，并使用 Apache NiFi 进行一些简单的数据流处理操作。

## 前置准备

在将 Apache NiFi 连接到 EMQX Cloud 之前，请确保已完成以下准备：

- 部署 MQTT 服务器
- 安装 JDK
- 部署 Apache NiFi

### 部署 MQTT 服务器

<!-- @include: ./deploy_mqtt_broker.md -->

### 安装 JDK

以部署 Apache NiFi 2.6.0 为例，您需要安装 JDK 21（或更高版本）才可以正常运行 Apache NiFi 。

#### Debian / Ubuntu
```bash
sudo apt update
sudo apt install openjdk-21-jdk
java -version
```

#### CentOS 8+ / Fedora 8+ / RHEL
```bash
sudo dnf install temurin-21-jdk
java -version
```

#### Arch Linux / Manjaro
```bash
sudo pacman -S jdk-openjdk
```
### 部署 Apache NiFi

1. 从[Apache官方网站](https://nifi.apache.org/download/)获取程序包并进行解压。以部署 Apache NiFi 2.6.0 为例。

    ```bash
    # 从 apache.org 下载 Apache NiFi 2.6.0 安装包
    wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip

    # 解压
    unzip nifi-2.6.0-bin.zip

    # 解压完成后删除压缩包
    rm nifi-2.6.0-bin.zip
    ```

2. 进入`bin`目录下，配置账号密码后运行 Apache NiFi。

    ```bash
    cd nifi-2.6.0/bin

    # 配置您的账号密码，其中密码最少12个字符
    ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD>

    # 启动NiFi服务并后台运行
    ./nifi.sh start

    # 如需要前台运行NiFi服务，请替换为以下命令
    #./nifi.sh run
    ```

3. Apache NiFi 2.x 默认使用 HTTPS 方式进行访问，且其自带的证书**仅支持本机访问**。
- 如您将 Apache NiFi 部署在本机，使用浏览器访问 https://localhost:8443/nifi 即可。
- 如非本机部署，本教程提供三种方法解决访问报错问题。

     - 修改配置文件，通过 HTTP 访问（**仅限开发环境，在生产环境中强烈建议使用HTTPS**）

        ```bash
        # 进入配置文件夹
        cd ~/nifi-2.6.0/conf
        # 使用您熟悉的文本编辑器打开nifi.properties，以 Vim 为例
        vim nifi.properties
        ```

        查找以下关键字进行修改

        - `nifi.remote.input.secure=false`
        - `nifi.web.http.host=192.168.31.9`（**请根据您的实际情况调整**）
        - `nifi.web.http.port=8080`
        - `nifi.web.https.host=`
        - `nifi.web.https.port=`
        
        然后重启 Apache NiFi 部署，在浏览器使用 `http://<服务器IP>:8080/nifi` 地址进行访问
     - 参照 [Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI](https://stackoverflow.com/questions/78985347/apache-nifi-2-http-error-400-invalid-sni) 进行证书与内网访问的配置

     - 通过 SSH 隧道访问（**每次连接都需手动建立隧道，不方便，适合临时调试**）
       
        打开您的终端，输入以下命令
        ```bash
        ssh -L 8443:localhost:8443 <您的用户名>@<您的服务器IP>
        ```
        验证通过后，打开浏览器访问 `https://localhost:8443/nifi` 即可。


4. 当您看到以下界面，证明您的 Apache NiFi 部署已经完成。输入您设置的 Username 与 Password 进行登录。

    ![Apache NiFi登录界面](_assets/apache_nifi_login.png)

## 将 Apache NiFi 连接至您的 EMQX Cloud 部署

在 Apache NiFi 中，您可以使用一些 Processer 与 EMQX Cloud 进行 MQTT 通信。常用的 Processer 包括：
- **PublishMQTT**，用于将数据流发送到 EMQX Cloud。
- **ConsumeMQTT**，用于从 EMQX Cloud 接收数据流。

在连接部署之前，您需要在 EMQX Cloud 的`访问控制` → `客户端认证`中添加用户以供 Apache NiFi 连接。如您在客户端授权中开启了白名单模式，您还需要在`访问控制` → `客户端授权`中为用户授予必要的权限。

以下是一个 Apache NiFi 日志流数据处理程序示例，其结构如下：
- GenerateFlowFile 用于生成模拟日志数据流，并输入至 PublishMQTT 处理器，将日志发送到 EMQX Cloud。
- ConsumeMQTT 用于从 EMQX Cloud 接收日志数据流。
- 两端的日志数据流最终输入到 LogAttribute 处理器，该处理器用于将日志数据流的属性写入本地日志。

![Apache NiFi日志流数据处理程序示例](_assets/apache_nifi_example.png)

在 PublishMQTT 和 ConsumeMQTT 两个处理器中，您需要进行**连接配置**。配置要点如下。

1. Broker URI 输入格式如下：
    ```
    <连接协议: 'tcp' | 'ssl' | 'ws' | 'wss'>://<连接地址>:<端口>
    ```
    例如：
    ```
    ssl://test.emqxcloud.cn:8883
    ```

    生产环境中，建议使用 SSL/WSS 进行加密数据传输。如选择加密方式，则**必须**配置 SSL Context Service。您可按以下步骤进行配置。

    > 您可能为 EMQX 配置了自己的证书，或者使用 EMQX Cloud 提供的 CA 证书。本教程以使用 EMQX Cloud 提供的证书为例。
    > 
    > 1. 在`部署概览` -> `MQTT 连接信息`部分，点击下载**CA证书文件**，得到 `emqxcloud-ca.crt`。
    > 2. 将证书文件上传到部署 Apache NiFi 的服务器。
    > 3. 执行
    >     ```bash
    >     keytool -importcert \
    >     -alias myca \
    >     -file emqxcloud-ca.crt \
    >     -keystore truststore.jks \ 
    >     -storepass <替换成您的Storepass>
    >     ```
    > 4. 将生成的 `truststore.jks` 放到一个特定的目录下。
    > 5. 点击 `SSL Context Service` 选项后面的`...`，选择`Create new service`，选中`StandardRestrictedSSLContextService`，点击`Add`。
    > 6. 点击 `SSL Context Service` 选项后面的`...`，选择`Go to service`。
    > 7. 选择刚刚创建的 Service，选择 `Edit`。
    > 8. 将 Truststore Filename 设置为您存放 `truststore.jks` 的目录路径，Truststore Password 设置为您的 Storepass，Truststore Type 选择 `JKS`。
    > 9. 退出，并在选项后面的`...`中选择 `Enable`，启用该 Service。

    经过以上的步骤，您配置的 SSL Context Service 可在其他 Processer 中直接使用，无需重复配置。

2. MQTT Specification Version 根据您的需求选择，建议选择`v5.0`。
3. Username 和 Password 设置为您配置的用户登录凭证。
4. 根据您的实际需求设置其他必填项。

以上为必填项，其他可选项根据您的需求设置。设置完成后，点击 Verification 右侧的"✅"，检测通过后，将左下角的`Stopped`设置为`Start`，Processer 开始运行。

将其他 Processer 根据测试需求进行配置后，全部设置为`Start`状态，这样日志流数据处理程序就配置完成了。

## 测试
使用MQTT调试工具（推荐使用[MQTTX](https://github.com/emqx/MQTTX)）对PublishMQTT指定的主题进行订阅。
可以看到，GenerateFlowFile 生成的模拟日志数据已经发送到主题下。
![Apache NiFi Test 1](_assets/apache_nifi_test_1.png)


对 ConsumeMQTT 指定的主题手动发送日志信息，可以看到，程序中 ConsumeMQTT 的输出开始增加。
![Apache NiFi Test 2](_assets/apache_nifi_test_2.png)


查看 Apache NiFi 生成的日志，可以看到，无论是 GenerateFlowFile 生成的模拟日志，还是您手动发送的模拟日志，都已经通过 LogAttribute 将数据流属性输出到 NiFi 自身日志（默认位于`logs/nifi-app.log`）。
![Apache NiFi Test 3](_assets/apache_nifi_test_3.png)


## 更多内容

综上所述，我们实现了部署 Apache NiFi 连接到 EMQX Cloud 并构建一个模拟日志流数据处理程序进行测试的全过程。部署完成后，您可根据自己的业务需求灵活配置流程结构。您可以在 [GitHub](https://github.com/emqx/MQTT-Client-Examples) 上找到更多其他语言的 Demo 示例。

## 参考文献

[Getting started with MQTT in Apache NiFi](https://medium.com/cloudera-inc/getting-started-with-mqtt-in-apache-nifi-64e8cde1de91)