Skip to content

Apache NiFiとの接続

Apache NiFiは、異なるシステム間でのデータの信頼性の高い効率的な転送、変換、処理を目的としたビジュアルなデータフロー管理ツールです。リアルタイムデータフロー、ドラッグアンドドロップによるプロセス設計、データプロビナンス、セキュリティ制御などの機能をサポートしています。

本ページでは主に、Apache NiFiをEMQX Cloudのデプロイメントに接続し、Apache NiFiを使った基本的なデータフロー処理タスクの実行方法について説明します。

前提条件

Apache NiFiをEMQX Cloudに接続する前に、以下の準備が完了していることを確認してください。

  • MQTTサーバーのデプロイ
  • JDKのインストール
  • Apache NiFiのデプロイ

MQTTサーバーのデプロイ

アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。

サーバレスデプロイメント

  1. EMQX Cloudコンソールでサーバレスデプロイメントを作成します。

  2. デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。

    • ブローカーアドレス

    • ポート番号(サーバレスではTLSポートのみ対応)

  3. サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート8883を使用してください。

  4. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細はサーバレスポートガイドを参照してください。

Dedicated Flex または BYOC デプロイメント

  1. EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。

  2. 作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。

    • ブローカーアドレス

    • MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)

  3. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。

JDKのインストール

Apache NiFi 2.6.0をデプロイするには、Apache NiFiを正常に動作させるためにJDK 21(またはそれ以降のバージョン)をインストールする必要があります。

Debian / Ubuntu

bash
sudo apt update
sudo apt install openjdk-21-jdk
java -version

CentOS 8+ / Fedora 8+ / RHEL

bash
sudo dnf install temurin-21-jdk
java -version

Arch Linux / Manjaro

bash
sudo pacman -S jdk-openjdk

Apache NiFiのデプロイ

Apache NiFiのダウンロードと起動

  1. Apache公式サイトからパッケージをダウンロードし、解凍します。例としてApache NiFi 2.6.0をデプロイする場合:

    bash
    # apache.orgからApache NiFi 2.6.0をダウンロード
    wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip
    
    # ファイルを解凍
    unzip nifi-2.6.0-bin.zip
    
    # 解凍後、zipファイルを削除
    rm nifi-2.6.0-bin.zip
  2. binディレクトリに移動し、ユーザー名とパスワードを設定してApache NiFiを起動します。

    bash
    cd nifi-2.6.0/bin
    
    # ユーザー名とパスワードを設定(パスワードは12文字以上が必要)
    ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD>
    
    # バックグラウンドでNiFiサービスを起動
    ./nifi.sh start
    
    # フォアグラウンドでNiFiを実行する場合は以下を使用
    # ./nifi.sh run

Apache NiFiへのアクセス

Apache NiFi 2.xはデフォルトでHTTPSを使用してアクセスし、組み込み証明書はローカルアクセスのみをサポートしています。

  • ローカルマシンにApache NiFiをデプロイした場合は、ブラウザでhttps://localhost:8443/nifiにアクセスできます。
  • リモートサーバーにデプロイした場合は、本チュートリアルで紹介する3つの方法でアクセスエラーを解決できます。
方法1:HTTPアクセスを有効化(開発環境のみ)
  1. 設定ファイルを編集しHTTPアクセスを有効にします(本番環境ではHTTPSを推奨します)。

    bash
    # 設定ディレクトリへ移動
    cd ~/nifi-2.6.0/conf
    # お好みのテキストエディタでnifi.propertiesを開く(例:Vim)
    vim nifi.properties
  2. 以下の設定を探して変更します:

    • nifi.remote.input.secure=false

    • nifi.web.http.host=192.168.31.9(実際の環境に合わせて調整)

    • nifi.web.http.port=8080

    • nifi.web.https.host=

    • nifi.web.https.port=

  3. Apache NiFiを再起動し、ブラウザで http://<serverIP>:8080/nifi にアクセスします。

方法2:リモートアクセス用HTTPS証明書の設定

Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI を参照し、証明書と内部ネットワークアクセスの設定を行います。

方法3:SSHトンネル経由でアクセス(一時的なデバッグ用)
  1. SSHトンネル経由でアクセスします(毎回手動でトンネルを設定する必要があり、一時的なデバッグに適します)。

  2. ターミナルを開き、以下のコマンドを入力します:

    bash
    ssh -L 8443:localhost:8443 <your-username>@<your-server-IP>
  3. 認証が成功したら、ブラウザで https://localhost:8443/nifi にアクセスします。

以下の画面が表示されれば、Apache NiFiのデプロイは完了です。設定したユーザー名とパスワードでログインしてください。

Apache NiFiログイン画面

Apache NiFiをEMQX Cloudに接続する

Apache NiFiでは、さまざまなプロセッサを使ってMQTT経由でEMQX Cloudと通信できます。代表的なプロセッサは以下の通りです。

  • PublishMQTT:データフローをEMQX Cloudにパブリッシュするために使用します。
  • ConsumeMQTT:EMQX Cloudからデータフローを受信するために使用します。

前提条件

Apache NiFiの設定前に、EMQX Cloudで必要な認証情報と権限が設定されていることを確認してください。

  • アクセス制御認証でApache NiFi接続用のユーザーを作成する。
  • 認可でホワイトリストモードが有効な場合は、アクセス制御認可で該当ユーザーにパブリッシュおよびサブスクライブの権限を付与する。

データフローの例

以下はApache NiFiでのシンプルなログデータ処理フローの例です。

  • GenerateFlowFileがシミュレートされたログデータを生成し、PublishMQTTプロセッサに送信します。
  • PublishMQTTがログデータをEMQX Cloudにパブリッシュします。
  • ConsumeMQTTが同じトピックをサブスクライブし、EMQX Cloudからログデータを受信します。
  • LogAttributeがデータフローの属性をローカルのNiFiログに記録し、動作を確認します。

Apache NiFiログフローデータ処理例

MQTTプロセッサの設定

PublishMQTTConsumeMQTTの両方でMQTT接続設定が必要です。主な設定項目は以下の通りです。

1. ブローカーURI

ブローカーURIは以下の形式で指定します。

<protocol: 'tcp' | 'ssl' | 'ws' | 'wss'>://<broker-address>:<port>

例:

ssl://test.emqxcloud.cn:8883

本番環境では暗号化通信を確保するため、SSLまたはWSSの使用を強く推奨します。暗号化プロトコルを使用する場合は、NiFiでSSL Context Serviceの設定が必要です。

SSL Context Serviceの設定

カスタム証明書またはEMQX Cloudが提供するCA証明書のいずれかを使用できます。ここではEMQX CloudのCA証明書を使用する例を示します。

  1. Deployment OverviewMQTT Connection Informationセクションで、CA証明書ファイル emqxcloud-ca.crt をダウンロードします。

  2. ダウンロードしたcrtファイルをApache NiFiをデプロイしたサーバーにアップロードします。

  3. 以下のコマンドを実行します。

    bash
    keytool -importcert \
    -alias myca \
    -file emqxcloud-ca.crt \
    -keystore truststore.jks \ 
    -storepass <ReplaceWithYourStorepass>
  4. 生成された truststore.jks を特定のディレクトリに配置します。

  5. SSL Context Serviceの横にある ... をクリックし、Create new service を選択、StandardRestrictedSSLContextService を選んで Add をクリックします。

  6. SSL Context Serviceの ... をクリックし、Go to service を選択します。

  7. 新しく作成したサービスを選択し、「Edit」をクリックします。

  8. Truststore Filenameに truststore.jks を配置したディレクトリのパスを設定し、Truststore Passwordにストアパスワードを入力、Truststore TypeはJKSを選択します。

  9. 設定を保存して画面を閉じ、... をクリックしてサービスを Enable します。

有効化すると、このSSL Context Serviceは他のプロセッサでも再利用可能です。

2. MQTT仕様バージョン

要件に応じてMQTTプロトコルバージョンを選択します。新規デプロイではMQTT v5.0を推奨します。

3. 認証情報

EMQX Cloudで作成したユーザーのUsernamePasswordを設定します。

4. その他の設定

必要に応じて、追加の必須または任意の項目を設定してください。

データフローの開始

設定が完了したら:

  1. 各プロセッサの Verify (✅) ボタンをクリックして設定を検証します。
  2. プロセッサの状態を Stopped から Start に変更します。
  3. フロー内のすべてのプロセッサを起動します。

すべてのプロセッサが起動すると、Apache NiFiのログデータ処理パイプラインが完全に構成され、稼働状態になります。

Apache NiFiとEMQX Cloud間のMQTTデータフローの検証

設定完了後、MQTTクライアントを使ってデータフローを検証します。デバッグにはMQTTXの使用を推奨します。

  1. PublishMQTTの出力を検証

    MQTTXでPublishMQTTプロセッサで設定したトピックをサブスクライブします。GenerateFlowFileが継続的にパブリッシュするシミュレートされたログメッセージが受信できるはずです。

    Apache NiFiテスト1

  2. ConsumeMQTTの入力を検証

    MQTTXを使い、ConsumeMQTTプロセッサで設定したトピックに手動でログメッセージをパブリッシュします。メッセージ受信に伴い、ConsumeMQTTの出力カウントが増加することを確認します。

    Apache NiFiテスト2

  3. NiFiログを検証

    Apache NiFiのアプリケーションログ(デフォルトは logs/nifi-app.log)を確認します。LogAttributeによって以下のログエントリが生成されているはずです。

    • GenerateFlowFileが生成したシミュレートログ
    • MQTTX経由で手動パブリッシュしたログ

    Apache NiFiテスト3

すべての手順が期待通りに動作すれば、Apache NiFiとEMQX Cloudの連携は正常に機能しています。

次のステップ

本ガイドでは、Apache NiFiのデプロイ、EMQX Cloudへの接続、MQTTベースの通信を検証するためのシミュレートログデータ処理フローの構築を行いました。

基本設定が完了したら、ビジネス要件に応じてフロー構造を柔軟に構成できます。言語別のデモ例はGitHubで多数公開されています。

参考資料