Apache NiFiとの接続
Apache NiFiは、異なるシステム間でのデータの信頼性の高い効率的な転送、変換、処理を目的としたビジュアルなデータフロー管理ツールです。リアルタイムデータフロー、ドラッグアンドドロップによるプロセス設計、データプロビナンス、セキュリティ制御などの機能をサポートしています。
本ページでは主に、Apache NiFiをEMQX Cloudのデプロイメントに接続し、Apache NiFiを使った基本的なデータフロー処理タスクの実行方法について説明します。
前提条件
Apache NiFiをEMQX Cloudに接続する前に、以下の準備が完了していることを確認してください。
- MQTTサーバーのデプロイ
- JDKのインストール
- Apache NiFiのデプロイ
MQTTサーバーのデプロイ
アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。
サーバレスデプロイメント
EMQX Cloudコンソールでサーバレスデプロイメントを作成します。
デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。
ブローカーアドレス
ポート番号(サーバレスではTLSポートのみ対応)
サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート
8883を使用してください。デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細はサーバレスポートガイドを参照してください。
Dedicated Flex または BYOC デプロイメント
EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。
作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。
ブローカーアドレス
MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)
デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。
JDKのインストール
Apache NiFi 2.6.0をデプロイするには、Apache NiFiを正常に動作させるためにJDK 21(またはそれ以降のバージョン)をインストールする必要があります。
Debian / Ubuntu
sudo apt update
sudo apt install openjdk-21-jdk
java -versionCentOS 8+ / Fedora 8+ / RHEL
sudo dnf install temurin-21-jdk
java -versionArch Linux / Manjaro
sudo pacman -S jdk-openjdkApache NiFiのデプロイ
Apache NiFiのダウンロードと起動
Apache公式サイトからパッケージをダウンロードし、解凍します。例としてApache NiFi 2.6.0をデプロイする場合:
bash# apache.orgからApache NiFi 2.6.0をダウンロード wget https://dlcdn.apache.org/nifi/2.6.0/nifi-2.6.0-bin.zip # ファイルを解凍 unzip nifi-2.6.0-bin.zip # 解凍後、zipファイルを削除 rm nifi-2.6.0-bin.zipbinディレクトリに移動し、ユーザー名とパスワードを設定してApache NiFiを起動します。bashcd nifi-2.6.0/bin # ユーザー名とパスワードを設定(パスワードは12文字以上が必要) ./nifi.sh set-single-user-credentials <YOUR_USERNAME> <YOUR_PASSWORD> # バックグラウンドでNiFiサービスを起動 ./nifi.sh start # フォアグラウンドでNiFiを実行する場合は以下を使用 # ./nifi.sh run
Apache NiFiへのアクセス
Apache NiFi 2.xはデフォルトでHTTPSを使用してアクセスし、組み込み証明書はローカルアクセスのみをサポートしています。
- ローカルマシンにApache NiFiをデプロイした場合は、ブラウザでhttps://localhost:8443/nifiにアクセスできます。
- リモートサーバーにデプロイした場合は、本チュートリアルで紹介する3つの方法でアクセスエラーを解決できます。
方法1:HTTPアクセスを有効化(開発環境のみ)
設定ファイルを編集しHTTPアクセスを有効にします(本番環境ではHTTPSを推奨します)。
bash# 設定ディレクトリへ移動 cd ~/nifi-2.6.0/conf # お好みのテキストエディタでnifi.propertiesを開く(例:Vim) vim nifi.properties以下の設定を探して変更します:
nifi.remote.input.secure=falsenifi.web.http.host=192.168.31.9(実際の環境に合わせて調整)nifi.web.http.port=8080nifi.web.https.host=nifi.web.https.port=
Apache NiFiを再起動し、ブラウザで
http://<serverIP>:8080/nifiにアクセスします。
方法2:リモートアクセス用HTTPS証明書の設定
Stackoverflow: Apache NIFI 2+ HTTP ERROR 400 Invalid SNI を参照し、証明書と内部ネットワークアクセスの設定を行います。
方法3:SSHトンネル経由でアクセス(一時的なデバッグ用)
SSHトンネル経由でアクセスします(毎回手動でトンネルを設定する必要があり、一時的なデバッグに適します)。
ターミナルを開き、以下のコマンドを入力します:
bashssh -L 8443:localhost:8443 <your-username>@<your-server-IP>認証が成功したら、ブラウザで
https://localhost:8443/nifiにアクセスします。
以下の画面が表示されれば、Apache NiFiのデプロイは完了です。設定したユーザー名とパスワードでログインしてください。

Apache NiFiをEMQX Cloudに接続する
Apache NiFiでは、さまざまなプロセッサを使ってMQTT経由でEMQX Cloudと通信できます。代表的なプロセッサは以下の通りです。
- PublishMQTT:データフローをEMQX Cloudにパブリッシュするために使用します。
- ConsumeMQTT:EMQX Cloudからデータフローを受信するために使用します。
前提条件
Apache NiFiの設定前に、EMQX Cloudで必要な認証情報と権限が設定されていることを確認してください。
- アクセス制御 → 認証でApache NiFi接続用のユーザーを作成する。
- 認可でホワイトリストモードが有効な場合は、アクセス制御 → 認可で該当ユーザーにパブリッシュおよびサブスクライブの権限を付与する。
データフローの例
以下はApache NiFiでのシンプルなログデータ処理フローの例です。
- GenerateFlowFileがシミュレートされたログデータを生成し、PublishMQTTプロセッサに送信します。
- PublishMQTTがログデータをEMQX Cloudにパブリッシュします。
- ConsumeMQTTが同じトピックをサブスクライブし、EMQX Cloudからログデータを受信します。
- LogAttributeがデータフローの属性をローカルのNiFiログに記録し、動作を確認します。

MQTTプロセッサの設定
PublishMQTTとConsumeMQTTの両方でMQTT接続設定が必要です。主な設定項目は以下の通りです。
1. ブローカーURI
ブローカーURIは以下の形式で指定します。
<protocol: 'tcp' | 'ssl' | 'ws' | 'wss'>://<broker-address>:<port>例:
ssl://test.emqxcloud.cn:8883本番環境では暗号化通信を確保するため、SSLまたはWSSの使用を強く推奨します。暗号化プロトコルを使用する場合は、NiFiでSSL Context Serviceの設定が必要です。
SSL Context Serviceの設定
カスタム証明書またはEMQX Cloudが提供するCA証明書のいずれかを使用できます。ここではEMQX CloudのCA証明書を使用する例を示します。
Deployment Overview→MQTT Connection Informationセクションで、CA証明書ファイルemqxcloud-ca.crtをダウンロードします。ダウンロードしたcrtファイルをApache NiFiをデプロイしたサーバーにアップロードします。
以下のコマンドを実行します。
bashkeytool -importcert \ -alias myca \ -file emqxcloud-ca.crt \ -keystore truststore.jks \ -storepass <ReplaceWithYourStorepass>生成された
truststore.jksを特定のディレクトリに配置します。SSL Context Serviceの横にある
...をクリックし、Create new serviceを選択、StandardRestrictedSSLContextServiceを選んでAddをクリックします。SSL Context Serviceの
...をクリックし、Go to serviceを選択します。新しく作成したサービスを選択し、「Edit」をクリックします。
Truststore Filenameに
truststore.jksを配置したディレクトリのパスを設定し、Truststore Passwordにストアパスワードを入力、Truststore TypeはJKSを選択します。設定を保存して画面を閉じ、
...をクリックしてサービスをEnableします。
有効化すると、このSSL Context Serviceは他のプロセッサでも再利用可能です。
2. MQTT仕様バージョン
要件に応じてMQTTプロトコルバージョンを選択します。新規デプロイではMQTT v5.0を推奨します。
3. 認証情報
EMQX Cloudで作成したユーザーのUsernameとPasswordを設定します。
4. その他の設定
必要に応じて、追加の必須または任意の項目を設定してください。
データフローの開始
設定が完了したら:
- 各プロセッサの Verify (✅) ボタンをクリックして設定を検証します。
- プロセッサの状態を Stopped から Start に変更します。
- フロー内のすべてのプロセッサを起動します。
すべてのプロセッサが起動すると、Apache NiFiのログデータ処理パイプラインが完全に構成され、稼働状態になります。
Apache NiFiとEMQX Cloud間のMQTTデータフローの検証
設定完了後、MQTTクライアントを使ってデータフローを検証します。デバッグにはMQTTXの使用を推奨します。
PublishMQTTの出力を検証
MQTTXでPublishMQTTプロセッサで設定したトピックをサブスクライブします。GenerateFlowFileが継続的にパブリッシュするシミュレートされたログメッセージが受信できるはずです。

ConsumeMQTTの入力を検証
MQTTXを使い、ConsumeMQTTプロセッサで設定したトピックに手動でログメッセージをパブリッシュします。メッセージ受信に伴い、ConsumeMQTTの出力カウントが増加することを確認します。

NiFiログを検証
Apache NiFiのアプリケーションログ(デフォルトは
logs/nifi-app.log)を確認します。LogAttributeによって以下のログエントリが生成されているはずです。- GenerateFlowFileが生成したシミュレートログ
- MQTTX経由で手動パブリッシュしたログ

すべての手順が期待通りに動作すれば、Apache NiFiとEMQX Cloudの連携は正常に機能しています。
次のステップ
本ガイドでは、Apache NiFiのデプロイ、EMQX Cloudへの接続、MQTTベースの通信を検証するためのシミュレートログデータ処理フローの構築を行いました。
基本設定が完了したら、ビジネス要件に応じてフロー構造を柔軟に構成できます。言語別のデモ例はGitHubで多数公開されています。