# Confluent CloudへのMQTTデータストリーミング

Confluent Cloudは、Apache Kafkaをベースにしたフルマネージドのイベントストリーミングプラットフォームであり、フルマネージドサービスとして提供されています。本チュートリアルでは、EMQX PlatformでConfluent Cloudとのデータ統合を作成し、MQTTデータをConfluent Cloudにストリーミングする方法を紹介します。データ統合を通じて、クライアントはMQTTプロトコルを使用して温度および湿度データをEMQX Platformに報告し、そのデータをConfluent Cloudにストリーミングできます。また、[MQTTX](https://www.emqx.com/en/products/mqttx)を使ったデータ統合のテスト方法も示します。

本ページでは、Confluentデータ統合の機能について詳細に解説し、作成手順を実践的に案内します。内容にはKafkaコネクターの作成、ルールの作成、ルールのテストが含まれます。MQTTプロトコルを通じてシミュレートされた温度・湿度データをEMQX Platformに報告し、設定したデータ統合を介してConfluentに保存する方法を示します。

## 動作概要

Confluentデータ統合はEMQXのすぐに使える機能であり、MQTTベースのIoTデータとConfluentの強力なデータ処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを使用することで、両プラットフォーム間のデータフローと処理を簡素化し、複雑なコーディングを不要にします。

Confluentへのメッセージデータ転送の基本的なワークフローは以下の通りです：

1. **メッセージのパブリッシュと受信**：車両に接続されたIoTデバイスはMQTTプロトコルでEMQXに正常に接続し、定期的に状態データを含むメッセージをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：これらのMQTTメッセージは、組み込みのルールエンジンとメッセージサーバーの連携により、トピックマッチングルールに従って処理されます。メッセージが到着してルールエンジンを通過すると、事前定義された処理ルールが評価されます。ペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの変換が適用されます。
3. **Confluentへのブリッジ**：ルールエンジンで定義されたルールは、メッセージをConfluentに転送するアクションをトリガーします。Confluent機能を使って、MQTTトピックはConfluent内の事前定義されたKafkaトピックにマッピングされ、すべての処理済みメッセージとデータがこれらのトピックに書き込まれます。

## 特長と利点

Confluentとのデータ統合は、以下の特長とメリットをビジネスにもたらします：

- 大規模メッセージ伝送の信頼性：EMQXとConfluent Cloudはどちらも高信頼なクラスター機構を用いて安定かつ信頼性の高いメッセージ伝送チャネルを構築し、大規模IoTデバイスからのメッセージのロスをゼロにします。両者はノード追加による水平スケールが可能で、リソースを動的に調整して突発的な大規模メッセージにも対応し、メッセージ伝送の可用性を確保します。
- 強力なデータ処理能力：EMQXのローカルルールエンジンとConfluent Cloudは、デバイスからアプリケーションまでの異なる段階で信頼性の高いストリーミングデータ処理機能を提供します。リアルタイムのデータフィルタリング、形式変換、集約分析などをシナリオに応じて実現し、より複雑なIoTメッセージ処理ワークフローを可能にし、データ分析アプリケーションのニーズに応えます。
- 強力な統合機能：Confluent Cloudが提供する多様なコネクターを通じて、EMQXは他のデータベース、データウェアハウス、データストリーム処理システムなどと容易に統合でき、柔軟なデータ分析アプリケーションのための完全なIoTデータワークフローを構築します。
- 高スループット処理能力：同期・非同期の両書き込みモードをサポートし、リアルタイム優先とパフォーマンス優先のデータ書き込み戦略を区別可能で、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。
- 効果的なトピックマッピング：ブリッジ設定を通じて多数のIoTビジネストピックをKafkaトピックにマッピング可能です。EMQXはMQTTユーザープロパティをKafkaヘッダーにマッピングすることをサポートし、1対1、1対多、多対多など多様なトピックマッピング方式を採用し、MQTTトピックフィルター（ワイルドカード）もサポートします。

これらの機能は統合能力と柔軟性を高め、効果的かつ堅牢なIoTプラットフォームアーキテクチャの構築を支援します。増大するIoTデータは安定したネットワーク接続で伝送され、さらに効果的に保存・管理されます。

## はじめる前に

本節では、EMQX PlatformコンソールでConfluentデータ統合を設定するための準備作業について説明します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### Confluent Cloudクラスターのセットアップ

Confluentデータ統合を作成する前に、Confluent Cloudコンソールでクラスターを作成し、Confluent Cloud CLIを使ってトピックとAPIキーを作成する必要があります。

クラスターは4種類あり、それぞれネットワークアクセスが異なります。詳細は[Confluentドキュメント](https://docs.confluent.io/cloud/current/networking/overview.html#cloud-networking-support-public)をご参照ください。クラスターの種類により、デプロイに必要なネットワーク環境が異なります。

| 特長               | Basic | Standard | Enterprise | Dedicated |
| ------------------ | ----- | -------- | ---------- | --------- |
| パブリックネットワーク | あり  | あり     | なし       | あり      |
| プライベートネットワーク | なし  | なし     | あり       | あり      |

以下では、パブリックネットワークおよびプライベートネットワークでのConfluent Cloudクラスターのセットアップ方法を示します。

#### パブリックネットワークでのクラスターセットアップ

Basic / Standard / Dedicatedクラスターをお持ち、または作成したい場合は、パブリックネットワーク構成を設定できます。

##### クラスターの作成

1. Confluent Cloudコンソールにログインし、クラスターを作成します。本デモではBasicクラスターを例に選択し、**Begin Configuration**をクリックします。
2. リージョン/ゾーンを選択します。レイテンシを考慮し、EMQX PlatformのデプロイリージョンがConfluent Cloudのリージョンと一致することを推奨します。**Continue**をクリックします。
3. クラスター名を入力し、**Launch cluster**をクリックします。これでクラウド上に稼働中のクラスターが作成されます。

##### トピックの作成

1. ナビゲーションメニューから**Topics**をクリックし、**Create topic**をクリックします。
2. **Topic name**欄に`emqx`と入力し、**Create with defaults**を選択します。

##### APIキーの作成

1. ナビゲーションメニューから**API Keys**をクリックし、**Add key**をクリックします。
2. APIキーのスコープを選択します。`Global access`を選択し、**Next**をクリックします。
3. APIキーを作成し、後で設定に使用するためキーをダウンロードします。

##### EMQX PlatformでNATゲートウェイを有効化

1. EMQX Platformコンソールにログインし、デプロイの概要ページに入ります。
2. ページ下部の**NAT Gateway**タブをクリックし、**Subscribe Now**をクリックします。[NAT Gateway](../vas/nat-gateway.md)について詳しくはこちら。

以上の手順でパブリックネットワークの前提設定が完了します。

#### プライベートネットワークでのクラスターセットアップ

Enterprise / Dedicatedクラスターをお持ち、または作成したい場合は、プライベートネットワーク構成を設定できます。

1. Confluent Cloudコンソールにログインし、クラスターを作成します。本デモではDedicatedクラスターを例に選択し、**Begin Configuration**をクリックします。

2. リージョン/ゾーンを選択します。EMQX PlatformのデプロイリージョンがConfluent Cloudのリージョンと一致していることを確認し、**Continue**をクリックします。

3. ネットワーク設定で**VPC Peering**を選択し、このクラスターがVPCピアリング接続のみでアクセス可能になるようにします。クラスター用のCIDRブロックを指定し、**Continue**をクリックします。

4. ニーズに応じて暗号化キーの管理方法を選択し、**Continue**をクリックします。

5. カードのバインド後、クラスターを起動する準備が整います。

#### Confluent Cloud CLIによるクラスター管理

Confluent Cloudでクラスターが稼働したら、Confluent Cloud CLIを使って管理できます。以下は基本的なCLIコマンド例です。

**Confluent Cloud CLIのインストール**

```bash
curl -sL --http1.1 https://cnfl.io/cli | sh -s -- -b /usr/local/bin
```

すでにCLIをインストール済みの場合は、以下で更新可能です：

```bash
confluent update
```

**アカウントにログイン**

```bash
confluent login --save
```

**環境を選択**

```bash
confluent environment use env-xxxxx
```

**クラスターを選択**

```bash
confluent kafka cluster use lkc-xxxxx
```

**APIキーとシークレットの使用**

既存のAPIキーをCLIに追加する場合：

```bash
confluent api-key store --resource lkc-xxxxx
Key: <API_KEY>
Secret: <API_SECRET>
```

APIキーとシークレットを持っていない場合は、作成可能です：

```bash
confluent api-key create --resource lkc-xxxxx
```

CLIに追加後、以下でAPIキーとシークレットを使用できます：

```bash
confluent api-key use "API_Key" --resource lkc-xxxxx
```

**トピックの作成**

```bash
confluent kafka topic create <topic-name>
```

トピック一覧の確認：

```bash
confluent kafka topic list
```

**トピックへのメッセージ送信**

```bash
confluent kafka topic produce <topic-name>
```

**トピックからのメッセージ受信**

```bash
confluent kafka topic consume -b <topic-name>
```

##### EMQX PlatformデプロイとのVPCピアリング接続の確立

クラスター作成後、Confluent Cloudコンソールでピアリングを追加する必要があります。

1. ナビゲーションメニューから**Cluster settings**をクリックし、**Networking**タブを開きます。**Add Peering**ボタンをクリックします。

2. VPC情報を入力します。これはEMQX Platformコンソールのデプロイ概要ページの**VPC Peering Connection**セクションから取得可能です。入力後、**Save**ボタンをクリックします。

3. VPCピアリング接続の状態を確認します。
   - 状態が`Inactive`の場合、Cloudデプロイに移動してピアリングリクエストを承認します。Confluent CloudクラスターのVPC情報を入力し、**Confirm**をクリックします。
   - 状態が`running`に変われば、VPCピアリング接続の作成に成功しています。

以上の手順でプライベートネットワークの前提設定が完了します。

## コネクターの作成

データ統合ルールを作成する前に、KafkaサーバーにアクセスするためのConfluentコネクターを作成する必要があります。

1. デプロイメントに移動し、左ナビゲーションメニューから**Data Integration**をクリックします。
2. 初めてコネクターを作成する場合は、**Data Forward**カテゴリの下にある**Confluent**を選択します。すでにコネクターを作成済みの場合は、**New Connector**を選択し、続いて**Data Forward**カテゴリの**Confluent**を選択します。
3. **New Connector**ページで以下のオプションを設定します：
   - **Connector Name**：システムが自動でコネクター名を生成します。
   - **Bootstrap Hosts**：ホストリストを入力し、Kafkaサービスにネットワーク経由で正常にアクセスできることを確認します。
   - **Username and Password**：先ほどConfluent Cloud CLIで作成したAPIキーとシークレットを入力します。
   - その他のオプションはデフォルトのままか、ビジネスニーズに応じて設定してください。
4. **Test**ボタンをクリックし、Confluentサービスにアクセス可能であれば成功メッセージが表示されます。
5. **New**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータをConfluentに転送するアクションをルールに追加するため、新しいルールを作成します。

1. 新しいルールを作成し、**SQL**入力欄に以下のSQL文を入力します。このデモで使用するルールは、`temp_hum/emqx`トピックからメッセージを読み取り、JSONオブジェクトに`client_id`、`topic`、`timestamp`情報を付加して拡充します。

   - `up_timestamp`：メッセージが報告された時刻
   - `client_id`：メッセージをパブリッシュしたクライアントのID
   - `temp`：メッセージペイロード内の温度データ
   - `hum`：メッセージペイロード内の湿度データ

   ```sql
   SELECT 
      timestamp as up_timestamp, 
      clientid as client_id, 
      payload.temp as temp,
      payload.hum as hum
      FROM
      "temp_hum/emqx"
   ```

2. テストペイロード、トピック、クライアント情報を入力し、**SQL Test**をクリックしてSQLルールをテストします。以下のような結果が表示されれば、SQLテストは成功です。

3. **Next**をクリックしてルールにアクションを追加します。以下の設定項目をビジネスニーズに応じて入力します：
   - Kafka Topic：`testtopic-in`と入力します。変数はここではサポートされません。
   - Kafka Headers：Kafkaメッセージに関連するメタデータやコンテキスト情報を入力します（任意）。プレースホルダーの値はオブジェクトである必要があります。Kafka Header Value Encod Typeドロップダウンリストからヘッダー値のエンコードタイプを選択できます。**Add**をクリックしてキー・バリューのペアを追加可能です。
   - Message Key：Kafkaメッセージのキーです。ここに文字列を入力します。純粋な文字列か、プレースホルダー（${var}）を含む文字列が利用できます。
   - Message Value：Kafkaメッセージの値です。ここに文字列を入力します。純粋な文字列か、プレースホルダー（${var}）を含む文字列が利用できます。
   - Partition Strategy：プロデューサーがKafkaのパーティションにメッセージを分配する方法を選択します。
   - Compression：Kafkaメッセージ内のレコードを圧縮／解凍するために圧縮アルゴリズムを使用するかどうかを指定します。

4. 高度な設定（任意）：[高度な設定](https://docs.emqx.com/en/enterprise/v5.5/data-integration/confluent-sink.html#advanced-configuration)を参照してください。

5. **Confirm**ボタンをクリックして作成を完了します。作成後、ページはデータ統合の初期ページに戻ります。

これでルールが正常に作成され、Integration -> Rulesページで新規作成したルールを確認でき、Actionsタブには新規作成された`action-xxx`が表示されます。

## ルールのテスト

Confluentプロデューサールールが期待通りに動作するかテストするため、[MQTTX](https://mqttx.app/)を使ってクライアントがEMQXにMQTTメッセージをパブリッシュする動作をシミュレートできます。

1. MQTTXを使って`temp_hum/emqx`トピックにメッセージを送信します。

   ```bash
   mqttx pub -i emqx_c -t t/1 -m '{"temp":"23.5","hum":"32.6"}'
   ```

2. データ統合ページで`rule_id`をクリックしてルールとアクションの統計を確認します。マッチしたメッセージが1件、新たに通過したメッセージが1件あるはずです。

3. Confluentコンソールでデータを確認するか、以下のConfluentコマンドを使って`emqx`トピックにメッセージが書き込まれているかチェックしてください：

   ```bash
   confluent kafka topic consume -b emqx
   ```
