# Apache NiFiとの接続

Apache NiFiは、異なるシステム間でのデータの信頼性の高い効率的な転送、変換、処理を目的としたビジュアルなデータフロー管理ツールです。リアルタイムデータフロー、ドラッグアンドドロップによるプロセス設計、データプロビナンス、セキュリティ制御などの機能をサポートしています。

本ページでは主に、Apache NiFiをEMQX Cloudのデプロイメントに接続し、Apache NiFiを使った基本的なデータフロー処理タスクの実行方法について説明します。

## 前提条件

Apache NiFiをEMQX Cloudに接続する前に、以下の準備が完了していることを確認してください。

* MQTTサーバーのデプロイ
* JDKのインストール
* Apache NiFiのデプロイ

### MQTTサーバーのデプロイ

<!-- @include: ./deploy_mqtt_broker.md -->

### JDKのインストール

Apache NiFi 2.6.0をデプロイするには、Apache NiFiを正常に動作させるためにJDK 21（またはそれ以降のバージョン）をインストールする必要があります。

#### Debian / Ubuntu

```bash
sudo apt update
sudo apt install openjdk-21-jdk
java -version
```

#### CentOS 8+ / Fedora 8+ / RHEL

```bash
sudo dnf install temurin-21-jdk
java -version
```

#### Arch Linux / Manjaro

```bash
sudo pacman -S jdk-openjdk
```

### Apache NiFiのデプロイ

#### Apache NiFiのダウンロードと起動

1. [Apache公式サイト](https://nifi.apache.org/download/)からパッケージをダウンロードし、解凍します。例としてApache NiFi 2.6.0をデプロイする場合：

   ```bash
   # apache.orgからApache NiFi 2.6.0をダウンロード
   wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip

   # ファイルを解凍
   unzip nifi-2.6.0-bin.zip

   # 解凍後、zipファイルを削除
   rm nifi-2.6.0-bin.zip
   ```

2. `bin`ディレクトリに移動し、ユーザー名とパスワードを設定してApache NiFiを起動します。

   ```bash
   cd nifi-2.6.0/bin
   
   # ユーザー名とパスワードを設定（パスワードは12文字以上が必要）
   ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD>
   
   # バックグラウンドでNiFiサービスを起動
   ./nifi.sh start
   
   # フォアグラウンドでNiFiを実行する場合は以下を使用
   # ./nifi.sh run
   ```

#### Apache NiFiへのアクセス

Apache NiFi 2.xはデフォルトでHTTPSを使用してアクセスし、組み込み証明書はローカルアクセスのみをサポートしています。

* ローカルマシンにApache NiFiをデプロイした場合は、ブラウザで[https://localhost:8443/nifi](https://localhost:8443/nifi)にアクセスできます。
* リモートサーバーにデプロイした場合は、本チュートリアルで紹介する3つの方法でアクセスエラーを解決できます。

##### 方法1：HTTPアクセスを有効化（開発環境のみ）

1. 設定ファイルを編集しHTTPアクセスを有効にします（本番環境ではHTTPSを推奨します）。

   ```bash
   # 設定ディレクトリへ移動
   cd ~/nifi-2.6.0/conf
   # お好みのテキストエディタでnifi.propertiesを開く（例：Vim）
   vim nifi.properties
   ```

2. 以下の設定を探して変更します：

   * `nifi.remote.input.secure=false`

   * `nifi.web.http.host=192.168.31.9`（実際の環境に合わせて調整）

   * `nifi.web.http.port=8080`

   * `nifi.web.https.host=`

   * `nifi.web.https.port=`

3. Apache NiFiを再起動し、ブラウザで `http://<serverIP>:8080/nifi` にアクセスします。

##### 方法2：リモートアクセス用HTTPS証明書の設定

[Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI](https://stackoverflow.com/questions/78985347/apache-nifi-2-http-error-400-invalid-sni) を参照し、証明書と内部ネットワークアクセスの設定を行います。

##### 方法3：SSHトンネル経由でアクセス（一時的なデバッグ用）

1. SSHトンネル経由でアクセスします（毎回手動でトンネルを設定する必要があり、一時的なデバッグに適します）。

2. ターミナルを開き、以下のコマンドを入力します：

   ```bash
   ssh -L 8443:localhost:8443 <your-username>@<your-server-IP>
   ```

3. 認証が成功したら、ブラウザで `https://localhost:8443/nifi` にアクセスします。

以下の画面が表示されれば、Apache NiFiのデプロイは完了です。設定したユーザー名とパスワードでログインしてください。

![Apache NiFiログイン画面](_assets/apache_nifi_login.png)

## Apache NiFiをEMQX Cloudに接続する

Apache NiFiでは、さまざまなプロセッサを使ってMQTT経由でEMQX Cloudと通信できます。代表的なプロセッサは以下の通りです。

* **PublishMQTT**：データフローをEMQX Cloudにパブリッシュするために使用します。
* **ConsumeMQTT**：EMQX Cloudからデータフローを受信するために使用します。

### 前提条件

Apache NiFiの設定前に、EMQX Cloudで必要な認証情報と権限が設定されていることを確認してください。

- **アクセス制御** → **認証**でApache NiFi接続用のユーザーを作成する。
- **認可**でホワイトリストモードが有効な場合は、**アクセス制御** → **認可**で該当ユーザーにパブリッシュおよびサブスクライブの権限を付与する。

### データフローの例

以下はApache NiFiでのシンプルなログデータ処理フローの例です。

- **GenerateFlowFile**がシミュレートされたログデータを生成し、**PublishMQTT**プロセッサに送信します。
- **PublishMQTT**がログデータをEMQX Cloudにパブリッシュします。
- **ConsumeMQTT**が同じトピックをサブスクライブし、EMQX Cloudからログデータを受信します。
- **LogAttribute**がデータフローの属性をローカルのNiFiログに記録し、動作を確認します。

![Apache NiFiログフローデータ処理例](_assets/apache_nifi_example.png)

### MQTTプロセッサの設定

**PublishMQTT**と**ConsumeMQTT**の両方でMQTT接続設定が必要です。主な設定項目は以下の通りです。

#### 1. ブローカーURI

ブローカーURIは以下の形式で指定します。

```
<protocol: 'tcp' | 'ssl' | 'ws' | 'wss'>://<broker-address>:<port>
```

例：

```
ssl://test.emqxcloud.cn:8883
```

本番環境では暗号化通信を確保するため、SSLまたはWSSの使用を強く推奨します。暗号化プロトコルを使用する場合は、NiFiで**SSL Context Service**の設定が必要です。

##### SSL Context Serviceの設定

カスタム証明書またはEMQX Cloudが提供するCA証明書のいずれかを使用できます。ここではEMQX CloudのCA証明書を使用する例を示します。

1. `Deployment Overview` → `MQTT Connection Information`セクションで、**CA証明書ファイル** `emqxcloud-ca.crt` をダウンロードします。

2. ダウンロードしたcrtファイルをApache NiFiをデプロイしたサーバーにアップロードします。

3. 以下のコマンドを実行します。

   ```bash
   keytool -importcert \
   -alias myca \
   -file emqxcloud-ca.crt \
   -keystore truststore.jks \ 
   -storepass <ReplaceWithYourStorepass>
   ```

4. 生成された `truststore.jks` を特定のディレクトリに配置します。

5. SSL Context Serviceの横にある `...` をクリックし、`Create new service` を選択、`StandardRestrictedSSLContextService` を選んで `Add` をクリックします。

6. SSL Context Serviceの `...` をクリックし、`Go to service` を選択します。

7. 新しく作成したサービスを選択し、「Edit」をクリックします。

8. Truststore Filenameに `truststore.jks` を配置したディレクトリのパスを設定し、Truststore Passwordにストアパスワードを入力、Truststore TypeはJKSを選択します。

9. 設定を保存して画面を閉じ、`...` をクリックしてサービスを `Enable` します。

有効化すると、このSSL Context Serviceは他のプロセッサでも再利用可能です。

#### 2. MQTT仕様バージョン

要件に応じてMQTTプロトコルバージョンを選択します。新規デプロイではMQTT v5.0を推奨します。

#### 3. 認証情報

EMQX Cloudで作成したユーザーの**Username**と**Password**を設定します。

#### 4. その他の設定

必要に応じて、追加の必須または任意の項目を設定してください。

### データフローの開始

設定が完了したら：

1. 各プロセッサの **Verify (✅)** ボタンをクリックして設定を検証します。
2. プロセッサの状態を **Stopped** から **Start** に変更します。
3. フロー内のすべてのプロセッサを起動します。

すべてのプロセッサが起動すると、Apache NiFiのログデータ処理パイプラインが完全に構成され、稼働状態になります。

## Apache NiFiとEMQX Cloud間のMQTTデータフローの検証

設定完了後、MQTTクライアントを使ってデータフローを検証します。デバッグには[MQTTX](https://github.com/emqx/MQTTX)の使用を推奨します。

1. PublishMQTTの出力を検証

   MQTTXで**PublishMQTT**プロセッサで設定したトピックをサブスクライブします。**GenerateFlowFile**が継続的にパブリッシュするシミュレートされたログメッセージが受信できるはずです。

   ![Apache NiFiテスト1](_assets/apache_nifi_test_1.png)

2. ConsumeMQTTの入力を検証

   MQTTXを使い、**ConsumeMQTT**プロセッサで設定したトピックに手動でログメッセージをパブリッシュします。メッセージ受信に伴い、**ConsumeMQTT**の出力カウントが増加することを確認します。

   ![Apache NiFiテスト2](_assets/apache_nifi_test_2.png)

3. NiFiログを検証

   Apache NiFiのアプリケーションログ（デフォルトは `logs/nifi-app.log`）を確認します。**LogAttribute**によって以下のログエントリが生成されているはずです。

   - **GenerateFlowFile**が生成したシミュレートログ
   - MQTTX経由で手動パブリッシュしたログ

   ![Apache NiFiテスト3](_assets/apache_nifi_test_3.png)

すべての手順が期待通りに動作すれば、Apache NiFiとEMQX Cloudの連携は正常に機能しています。

## 次のステップ

本ガイドでは、Apache NiFiのデプロイ、EMQX Cloudへの接続、MQTTベースの通信を検証するためのシミュレートログデータ処理フローの構築を行いました。

基本設定が完了したら、ビジネス要件に応じてフロー構造を柔軟に構成できます。言語別のデモ例は[GitHub](https://github.com/emqx/MQTT-Client-Examples)で多数公開されています。

## 参考資料

- [Getting Started with MQTT in Apache NiFi](https://medium.com/cloudera-inc/getting-started-with-mqtt-in-apache-nifi-64e8cde1de91)
