Confluent CloudへのMQTTデータストリーム
Confluent Cloudは、Apache Kafkaをベースにしたフルマネージドのイベントストリーミングプラットフォームであり、完全に管理されたサービスとして提供されています。本チュートリアルでは、EMQX CloudでConfluent Cloudとのデータ統合を作成し、MQTTデータをConfluent Cloudにストリームする方法を紹介します。データ統合を通じて、クライアントはMQTTプロトコルを使用して温度と湿度のデータをEMQX Cloudに報告し、そのデータをConfluent Cloudにストリームできます。また、MQTTXを使用してデータ統合をテストする方法も示します。
本ページでは、Confluentデータ統合の機能的特徴を詳しく紹介し、作成手順について実践的なガイダンスを提供します。内容はKafkaコネクターの作成、ルールの作成、ルールのテストを含みます。MQTTプロトコルを介してシミュレートされた温度と湿度のデータをEMQX Cloudに報告し、設定したデータ統合を通じてConfluentに保存する方法を示します。
動作の仕組み
Confluentデータ統合はEMQXのすぐに使える機能であり、MQTTベースのIoTデータとConfluentの強力なデータ処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを使用することで、両プラットフォーム間のデータフローと処理を簡素化し、複雑なコーディングを不要にします。
メッセージデータをConfluentに転送する基本的なワークフローは以下の通りです:
- メッセージのパブリッシュと受信:車両に接続されたIoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的に状態データを含むメッセージをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:これらのMQTTメッセージは、組み込みのルールエンジンとメッセージサーバーの協調動作により、トピックマッチングルールに従って処理されます。メッセージが到着してルールエンジンを通過すると、事前定義された処理ルールが評価されます。ペイロード変換を指定するルールがあれば、データフォーマットの変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
- Confluentへのブリッジ:ルールエンジンで定義されたルールは、メッセージをConfluentに転送するアクションをトリガーします。Confluent機能を利用して、MQTTトピックはConfluent内の事前定義されたKafkaトピックにマッピングされ、すべての処理済みメッセージとデータがこれらのトピックに書き込まれます。
特徴と利点
Confluentとのデータ統合は、以下の特徴とメリットをビジネスにもたらします:
- 大規模メッセージ伝送の信頼性:EMQXとConfluent Cloudはどちらも高信頼のクラスター機構を採用し、安定かつ信頼性の高いメッセージ伝送チャネルを構築します。大規模IoTデバイスからのメッセージの損失ゼロを保証します。両者ともノードの追加による水平スケールが可能で、突発的な大規模メッセージにも動的にリソースを調整し、メッセージ伝送の可用性を確保します。
- 強力なデータ処理能力:EMQXのローカルルールエンジンとConfluent Cloudは、デバイスからアプリケーションまでの異なる段階で信頼性の高いストリーミングデータ処理機能を提供します。リアルタイムのデータフィルタリング、フォーマット変換、集約分析など、シナリオに応じた複雑なIoTメッセージ処理ワークフローを実現し、データ分析アプリケーションのニーズに応えます。
- 強力な統合機能:Confluent Cloudが提供する多様なコネクターを通じて、EMQXは他のデータベース、データウェアハウス、データストリーム処理システムなどと容易に統合でき、柔軟なデータ分析アプリケーションのための完全なIoTデータワークフローを構築します。
- 高スループット処理能力:同期・非同期の両方の書き込みモードをサポートし、リアルタイム優先とパフォーマンス優先のデータ書き込み戦略を区別できます。異なるシナリオでレイテンシとスループットのバランスを柔軟に調整可能です。
- 効果的なトピックマッピング:ブリッジ設定を通じて、多数のIoTビジネストピックをKafkaトピックにマッピングできます。EMQXはMQTTユーザープロパティをKafkaヘッダーにマッピングすることをサポートし、1対1、1対多、多対多の柔軟なトピックマッピング方式を採用し、MQTTトピックフィルター(ワイルドカード)もサポートします。
これらの特徴により、統合機能と柔軟性が向上し、効果的かつ堅牢なIoTプラットフォームアーキテクチャの構築を支援します。増大するIoTデータは安定したネットワーク接続で伝送され、さらに効果的に保存・管理されます。
はじめる前に
このセクションでは、EMQX CloudコンソールでConfluentデータ統合を設定するための準備作業について説明します。
前提条件
Confluent Cloudクラスターのセットアップ
Confluentデータ統合を作成する前に、Confluent CloudコンソールでConfluentクラスターを作成し、Confluent Cloud CLIを使ってトピックとAPIキーを作成する必要があります。
クラスターは4種類あり、それぞれ異なるネットワークアクセスを持ちます。詳細はConfluentドキュメントをご参照ください。クラスターの種類により、デプロイに必要なネットワーク環境が異なります。
| 機能 | Basic | Standard | Enterprise | Dedicated |
|---|---|---|---|---|
| パブリックネットワーク | YES | YES | NO | YES |
| プライベートネットワーク | NO | NO | YES | YES |
以下のセクションでは、パブリックネットワークおよびプライベートネットワークでのConfluent Cloudクラスターのセットアップ方法を示します。
パブリックネットワークでのクラスターセットアップ
Confluent Basic / Standard / Dedicatedクラスターをお持ち、または作成したい場合は、パブリックネットワークの設定を行います。
クラスターの作成
- Confluent Cloudコンソールにログインし、クラスターを作成します。このデモではBasicクラスターを例に選択し、Begin Configurationをクリックします。
- リージョン/ゾーンを選択します。レイテンシを考慮し、EMQX CloudのデプロイリージョンとConfluent Cloudのリージョンを一致させることを推奨します。Continueをクリックします。
- クラスター名を入力し、Launch clusterをクリックします。これでクラウド上に稼働中のクラスターが作成されます。
APIキーの作成
- ナビゲーションメニューからAPI Keysを選択し、Add keyをクリックします。
- APIキーのスコープを選択します。
Global accessを選択し、Nextをクリックします。 - APIキーを作成し、後の設定のためにキーをダウンロードします。
EMQX CloudでのNATゲートウェイ有効化
- EMQX Cloudコンソールにログインし、デプロイの概要ページに入ります。
- ページ下部のNAT Gatewayタブをクリックし、Subscribe Nowをクリックします。NAT Gatewayについて詳しくはこちらをご覧ください。
以上の手順でパブリックネットワークの前提設定は完了です。
プライベートネットワークでのクラスターセットアップ
Confluent Enterprise / Dedicatedクラスターをお持ち、または作成したい場合は、プライベートネットワークの設定を行います。EnterpriseクラスターはPrivate Linkのみ対応し、DedicatedクラスターはPrivate LinkとVPC Peeringの両方をサポートします。
Confluent Cloudコンソールにログインし、クラスターを作成します。このデモではDedicatedクラスターを例に選択し、Begin Configurationをクリックします。
リージョン/ゾーンを選択します。EMQX CloudのデプロイリージョンとConfluent Cloudのリージョンが一致していることを確認し、Continueをクリックします。
ネットワーク設定でVPC Peeringを選択し、このクラスターがVPCピアリング接続のみでアクセス可能になるようにします。
クラスターのCIDRブロックを指定し、Continueをクリックします。3つのCIDR範囲(各アベイラビリティゾーン用)を指定した場合は、3つすべての範囲を含むサポートチケットを提出する必要があります。SREチームが接続に必要な追加設定を行います。
ニーズに応じて暗号化キーの管理方法を選択し、Continueをクリックします。
カードのバインド後、クラスターを起動する準備が整います。
Confluent Cloud CLIによるクラスター管理
Confluent Cloudでクラスターが稼働中になったら、Confluent Cloud CLIを使って管理できます。以下は基本的なコマンド例です。
Confluent Cloud CLIのインストール
curl -sL --http1.1 https://cnfl.io/cli | sh -s -- -b /usr/local/bin既にCLIをインストール済みの場合は、以下でアップデート可能です。
confluent updateアカウントにログイン
confluent login --save環境を選択
confluent environment use env-xxxxxクラスターを選択
confluent kafka cluster use lkc-xxxxxAPIキーとシークレットの使用
既存のAPIキーをCLIに追加する場合:
confluent api-key store --resource lkc-xxxxx
Key: <API_KEY>
Secret: <API_SECRET>APIキーとシークレットをまだ持っていない場合は作成可能です:
confluent api-key create --resource lkc-xxxxxCLIに追加後、以下でAPIキーを使用できます:
confluent api-key use "API_Key" --resource lkc-xxxxxトピックの作成
confluent kafka topic create <topic-name>トピック一覧の確認:
confluent kafka topic listトピックへのメッセージ送信
confluent kafka topic produce <topic-name>トピックからのメッセージ受信
confluent kafka topic consume -b <topic-name>EMQX CloudデプロイメントとのVPCピアリング接続の確立
クラスター作成後、Confluent Cloudコンソールでピアリングを追加する必要があります。
Confluent Cloud環境のNetwork Managementタブに移動し、For dedicated clusterタブを選択します。接続したい特定のネットワークを選び、Ingress connectionsタブの下のボタンをクリックしてピアリングを追加します。
EMQX CloudからのVPC情報を入力します。(この情報はデプロイメントコンソールのVPC Peeringセクションから取得可能です)
Confluentでピアリングが追加されたら、Network Overviewページに移動し、
Confluent Cloud VPC IDをコピーします。EMQX Cloudコンソールに戻り、コピーしたConfluent Cloud VPC IDを入力してConfirmをクリックします。ステータスが
Runningに変われば、VPCピアリング接続は正常に確立されています。
以上でプライベートネットワークの前提設定は完了です。
コネクターの作成
データ統合ルールを作成する前に、KafkaサーバーにアクセスするためのConfluentコネクターを作成する必要があります。
- デプロイメントに移動し、左ナビゲーションメニューからData Integrationをクリックします。
- 初めてコネクターを作成する場合は、Data Forwardカテゴリの中からConfluentを選択します。既にコネクターを作成済みの場合は、New Connectorを選択し、続いてData Forwardカテゴリの中からConfluentを選択します。
- New Connectorページで以下のオプションを設定します:
- Connector Name:システムが自動的にコネクター名を生成します。
- Bootstrap Hosts:ホストリストを入力します。Kafkaサービスにネットワーク経由で正常にアクセス可能であることを確認してください。
- Username and Password:先にConfluent Cloud CLIで作成したAPIキーとシークレットを入力します。
- その他のオプションはデフォルトのままにするか、ビジネスニーズに応じて設定してください。
- Testボタンをクリックします。Confluentサービスにアクセスできれば成功メッセージが表示されます。
- Newボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをConfluentに転送するアクションをルールに追加するためのルールを作成します。
新しいルールを作成し、SQL入力欄に以下のSQL文を入力します。このデモで使用するルールは、
temp_hum/emqxトピックからメッセージを読み取り、JSONオブジェクトにclient_id、topic、timestamp情報を追加して強化します。up_timestamp:メッセージが報告された時間client_id:メッセージをパブリッシュしたクライアントのIDtemp:メッセージペイロード内の温度データhum:メッセージペイロード内の湿度データ
sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"テストペイロード、トピック、クライアント情報を入力し、SQL TestをクリックしてSQLルールをテストします。以下のような結果が表示されれば、SQLテストは成功です。
Nextをクリックしてルールにアクションを追加します。ビジネスニーズに応じて以下の設定項目を入力します:
- Kafka Topic:
testtopic-inを入力します。注意:ここでは変数はサポートされません。 - Kafka Headers:Kafkaメッセージに関連するメタデータやコンテキスト情報を入力します(任意)。プレースホルダーの値はオブジェクトである必要があります。Kafka Header Value Encoding Typeドロップダウンリストからヘッダー値のエンコードタイプを選択できます。Addをクリックしてさらにキー・バリューのペアを追加可能です。
- Message Key:Kafkaメッセージのキーを入力します。純粋な文字列またはプレースホルダー(${var})を含む文字列が可能です。
- Message Value:Kafkaメッセージの値を入力します。純粋な文字列またはプレースホルダー(${var})を含む文字列が可能です。
- Partition Strategy:プロデューサーがKafkaパーティションにメッセージを分配する方法を選択します。
- Compression:Kafkaメッセージ内のレコードを圧縮/解凍するために圧縮アルゴリズムを使用するかどうかを指定します。
- Kafka Topic:
詳細設定(任意):高度な設定を参照してください。
Confirmボタンをクリックして作成を完了します。作成後、ページはデータ統合の初期ページに戻ります。
これでルールが正常に作成され、Integration -> Rulesページで新規作成したルールを確認でき、Actionsタブで新規作成されたaction-xxxも確認できます。
ルールのテスト
Confluentプロデューサールールが期待通りに動作するかテストするため、MQTTXを使ってクライアントがEMQXにMQTTメッセージをパブリッシュする動作をシミュレートできます。
MQTTXを使って
temp_hum/emqxトピックにメッセージを送信します。bashmqttx pub -i emqx_c -t t/1 -m '{"temp":"23.5","hum":"32.6"}'データ統合ページで
rule_idをクリックし、ルールとアクションの統計情報を確認します。マッチしたメッセージが1件、新たに通過したメッセージが1件あるはずです。Confluentコンソールでデータを確認するか、以下のConfluentコマンドを使って
emqxトピックにメッセージが書き込まれているか確認してください:bashconfluent kafka topic consume -b emqx