# Apache KafkaへのMQTTデータストリーミング

[Apache Kafka](https://kafka.apache.org/) は、アプリケーションやシステム間のリアルタイムデータフローを処理できる、広く利用されているオープンソースの分散イベントストリーミングプラットフォームです。しかし、KafkaはエッジIoT通信向けに設計されておらず、Kafkaクライアントは安定したネットワーク接続とより多くのハードウェアリソースを必要とします。IoT分野では、デバイスやアプリケーションが生成するデータは軽量なMQTTプロトコルで送信されます。EMQXプラットフォームとKafka/Confluentの統合により、ユーザーはMQTTデータをシームレスにKafkaへストリーミングできます。MQTTデータストリームはKafkaトピックに導入され、リアルタイムの処理、保存、分析が可能です。現在、EMQXプラットフォームはKafkaへのデータ転送のみをサポートしています。

![kafka_bridge](./_assets/kafka_bridge.jpg)

本ページでは、Kafkaデータ統合の機能について詳しく紹介し、データ統合の作成に関する実践的なガイダンスを提供します。

## 動作概要

Apache Kafkaデータ統合はEMQXプラットフォームに標準搭載された機能であり、MQTTベースのIoTデータとKafkaの強力なデータ処理能力を橋渡しします。組み込みのルールエンジンコンポーネントを通じて、複雑なコーディングなしに両プラットフォーム間のデータフローと処理を簡素化します。

![kafka_architecture](./_assets/kafka_architecture.png)

Kafkaへのメッセージデータ転送の基本的なワークフローは以下の通りです：

1. **メッセージパブリッシュ**：デバイスはMQTTプロトコルを介してEMQXプラットフォームのデプロイメントに正常に接続し、定期的に状態データを含むメッセージをパブリッシュします。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータ処理**：これらのMQTTメッセージは、組み込みのルールエンジンを通じてトピックマッチングルールに基づいて処理されます。メッセージが到着しルールエンジンを通過すると、エンジンはそのメッセージに対して事前定義された処理ルールを評価します。ペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
3. **Kafkaへの送信**：ルールエンジンで定義されたルールがメッセージをKafkaに転送するアクションをトリガーします。Kafkaデータ統合を使用して、MQTTトピックは事前定義されたKafkaトピックにマッピングされ、処理済みのすべてのメッセージとデータがKafkaトピックに書き込まれます。

Kafkaからのメッセージ消費の基本的なワークフローは以下の通りです：

1. **Kafkaトピックのサブスクライブ**：ルールエンジンの入力ソースでKafkaコンシューマーのアクションを定義し、消費するKafkaトピック名を設定します。EMQXデプロイメントがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータ処理**：組み込みのルールエンジンを通じて、メッセージが到着しルールエンジンを通過すると、ルールエンジンはそのメッセージに対して事前定義された処理ルールを評価します。メッセージペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
3. **MQTTクライアントへの転送**：ルールエンジンで定義されたルールがメッセージをMQTTトピックに転送するアクションをトリガーします。Kafkaデータ統合を使用して、Kafkaトピックは事前定義されたMQTTトピックにマッピングされ、処理済みのすべてのメッセージとデータがMQTTクライアントによって消費されます。

## 特徴と利点

Apache Kafkaとのデータ統合は、以下の特徴と利点をビジネスにもたらします：

- **ペイロード変換**：転送中にメッセージペイロードは定義されたSQLルールで処理可能です。例えば、総メッセージ数、成功/失敗配信数、メッセージレートなどのリアルタイムメトリクスを含むペイロードは、Kafkaに入力される前にデータ抽出、フィルタリング、強化、変換が行えます。
- **効果的なトピックマッピング**：設定されたKafkaデータ統合を通じて、多数のIoTビジネストピックをKafkaトピックにマッピングできます。EMQXはMQTTユーザープロパティをKafkaヘッダーにマッピングすることをサポートし、1対1、1対多、多対多の柔軟なトピックマッピング方法やMQTTトピックフィルター（ワイルドカード）にも対応しています。
- **柔軟なパーティション選択戦略**：MQTTトピックやクライアントに基づいて同一Kafkaパーティションへのメッセージ転送をサポートします。
- **高スループット下での処理能力**：EMQX Kafkaプロデューサーは同期および非同期書き込みモードをサポートし、リアルタイム優先とパフォーマンス優先のデータ書き込み戦略を柔軟に選択可能です。
- **ランタイムメトリクス**：各データブリッジの総メッセージ数、成功/失敗数、現在のレートなどのランタイムメトリクスを閲覧可能です。

これらの特徴により統合能力と柔軟性が向上し、効果的で堅牢なIoTプラットフォームアーキテクチャの構築に役立ちます。増大するIoTデータは安定したネットワーク接続下で送信され、さらに効果的に保存・管理されます。

## はじめる前に

このセクションでは、EMQXプラットフォームでKafkaデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)を理解していること。
- [データ統合](./introduction.md)を理解していること。

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Kafkaのインストールとトピック作成

1. Kafkaをインストールします。

   ```bash
   # Zookeeperのインストール
   docker run -d --restart=always \
       --name zookeeper \
       -p 2181:2181 \
       zookeeper
   
   # Kafkaのインストール（ポート9092を開放）
   docker run -d  --restart=always --name mykafka \
       -p 9092:9092 \
       -e HOST_IP=localhost \
       -e KAFKA_ADVERTISED_PORT=9092 \
       -e KAFKA_ADVERTISED_HOST_NAME=<Server IP> \
       -e KAFKA_BROKER_ID=1 \
       -e KAFKA_LOG_RETENTION_HOURS=12 \
       -e KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=100000 \
       -e KAFKA_ZOOKEEPER_CONNECT=<Server IP>:2181 \
       -e ZK=<Server IP> \
       wurstmeister/kafka
   ```
   
2. トピックを作成します。

   ```bash
   # Kafkaインスタンスに入り、emqxトピックを作成
   $ docker exec -it mykafka /opt/kafka/bin/kafka-topics.sh --zookeeper <broker IP>:2181 --replication-factor 1 --partitions 1 --topic emqx --create
   ```
   

"Created topic emqx." と表示されれば作成成功です。

## メッセージデータをKafkaにストリーミングする

このセクションでは、MQTTプロトコルを介してシミュレートされた温度・湿度データをEMQXデプロイメントに報告し、設定されたデータ統合を通じてKafkaに転送する方法を示します。内容はKafkaプロデューサーコネクターの作成、ルールの作成、ルールのテストを含みます。

### Kafkaプロデューサーコネクターの作成

データ統合ルールを作成する前に、KafkaサーバーにアクセスするためのKafkaプロデューサーコネクターを作成する必要があります。

1. デプロイメントにアクセスし、左ナビゲーションメニューから **データ統合** をクリックします。 
2. 初めてコネクターを作成する場合は、**データ転送** カテゴリの下にある **Kafka Producer** を選択します。すでにコネクターを作成済みの場合は、**新規コネクター** を選択し、続いて **データ転送** カテゴリの下の **Kafka Producer** を選択します。
3. **新規コネクター** ページで以下のオプションを設定します：
   - **コネクター名**：システムが自動的に名前を生成しますが、自分で命名することも可能です。本例では `my_kafkaserver` を使用できます。
   - **Bootstrap Hosts**：ホストリストを入力し、Kafkaサービスにネットワーク経由で正常にアクセスできることを確認します。
   - 他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
4. **テスト** ボタンをクリックします。Kafkaサービスにアクセス可能であれば成功メッセージが返されます。
5. **新規作成** ボタンをクリックして作成を完了します。

### ルールの作成

次に、書き込むデータを指定し、処理済みデータをKafkaに転送するアクションをルールに追加するためのルールを作成します。

1. ルールエリアの **新規ルール** をクリックするか、作成したコネクターの **アクション** 列にある新規ルールアイコンをクリックします。

2. **SQLエディター** にルールマッチングSQL文を入力します。以下のSQL例は、`temp_hum/emqx` トピックに送信されたメッセージから報告時刻 `timestamp`、クライアントID、メッセージ本文（ペイロード）を読み取り、温度と湿度を抽出します。

   ```sql
   SELECT 
   timestamp,
   clientid, 
   payload.temp as temp, 
   payload.hum as hum
   
   FROM
   "temp_hum/emqx"
   ```

   **テスト有効化** を使ってデータ入力をシミュレートし、結果をテストできます。

3. **次へ** をクリックしてアクションを追加します。

4. **コネクター** ドロップダウンから先ほど作成したコネクターを選択します。

5. 以下の情報を設定します：

   - **アクション名**：システムが自動生成しますが、任意で命名可能です。

   - **Kafkaトピック名**：先に作成したトピック `emqx` を入力します。

   - **Kafkaヘッダー**：ビジネス要件に応じてKafkaヘッダーを定義します。

   - メッセージ本文設定では、**メッセージキー** はルールから取得したクライアントIDがデフォルトですが、必要に応じて変更可能です。**メッセージ値** には転送する温度と湿度の値を入力します。

     ```bash
     # Kafkaメッセージ値
     {"temp": ${temp}, "hum": ${hum}}
     ```

   - 他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。

6. **確認** ボタンをクリックしてルール作成を完了します。

7. **新規ルール作成成功** ポップアップで **ルールに戻る** をクリックし、データ統合構成チェーン全体を完了します。

作成成功後、**新規ルール** ページに戻ります。**ルール** リストに新規作成したルールが表示され、**アクション** リストの **アクション（シンク）** にデータインポートアクションの一覧が表示されます。

### ルールのテスト

[MQTTX](https://mqttx.app/) を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック: `temp_hum/emqx`

   - ペイロード:

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```
   
2. メッセージがKafkaに転送されているか確認します。

   ```bash
   # Kafkaインスタンスに入り、emqxトピックを確認
   $ docker exec -it mykafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <broker IP>:9092  --topic emqx --from-beginning
   ```
   
3. コンソールで運用データを確認します。ルールリストのルールIDをクリックすると、ルールの統計情報とそのルール下のすべてのアクションの統計情報が表示されます。

## Kafkaからのメッセージ消費

このセクションでは、EMQXデプロイメントがKafkaからメッセージを消費し、設定されたデータ統合を通じてメッセージデータをMQTTトピックに再パブリッシュする方法を示します。内容はKafkaコンシューマーコネクターの作成、ルールの作成、ルールのテストを含みます。

### Kafkaコンシューマーコネクターの作成

Kafkaコンシューマーを追加する前に、EMQXデプロイメントとKafka間の接続を確立するための**Kafkaコンシューマーコネクター**を作成する必要があります。

1. デプロイメントメニューで **データ統合** を選択し、**データ取り込み** サービスカテゴリの下で **Kafka Consumer** を選択します。すでに他のコネクターを作成済みの場合は、**新規コネクター作成** をクリックし、続いて **データ取り込み** の下の **Kafka Consumer** を選択します。
2. コネクター作成ページで以下の情報を設定します：
   - **コネクター名**：システムが自動的に `connector-` で始まる名前を生成します。
   - **ホストリスト**：Kafkaサービスにネットワーク経由で正常にアクセスできるホストリストを入力します。
   - 認証：必要に応じて認証方式を入力します。ここではBasic認証を使用します。
     - **認証方式**：Kafkaサービスの認証要件に基づき、`plain`、`SHA256`、`SHA512`から選択します。
     - **ユーザー名**：Kafkaサービスのユーザー名、通常はAPIキーです。
     - **パスワード**：Kafkaサービスの認証パスワード、通常はAPIシークレットです。
   - 他の設定はデフォルト値を使用するか、ビジネス要件に応じて設定してください。
3. **接続テスト** ボタンをクリックし、Kafkaサービスに正常にアクセスできれば成功メッセージが返されます。
4. **作成** ボタンをクリックして設定を完了します。

### ルールの作成

次に、入力データとルールエンジン処理のルールを指定し、ルールで処理したデータを対応するトピックに転送する出力アクションをルールに追加します。

1. コネクターリストの **アクション** 列にある新規ルールアイコンをクリックするか、**ルール** リスト上部の **新規ルール** をクリックして **新規ルール** ステップページに入ります。

2. コンシューマールールでは、まず入力アクションを設定する必要があります。ルール編集ページで自動的に入力アクション設定がポップアップするか、**アクション（ソース）** -> **新規アクション（ソース）** を選択し、パネルで **Kafka Consumer** を選択して **次へ** をクリックします。

   - Kafkaコンシューマーコネクターを選択します。
   - **Kafkaトピック**：Kafkaから消費するトピック名を入力します。例：`temp_hum/emqx`。
   - **グループID**：このソースのコンシューマーグループ識別子を指定します。空欄の場合、ソース名に基づいて自動生成されます。文字、数字、アンダースコア、ドット、ハイフンのみ使用可能です。
   - **キーエンコードモード** と **値エンコードモード**：Kafkaメッセージのキーと値のエンコードモードを選択します。
   - **オフセットリセットポリシー**：コンシューマーがKafkaトピックパーティションの読み取りを開始する際のオフセットリセット戦略を選択します。コンシューマーオフセットが存在しないか無効な場合に適用されます。
     - 最新のオフセットから読み取り、コンシューマー開始前のメッセージをスキップしたい場合は `latest` を選択します。
     - パーティションの先頭から読み取り、コンシューマー開始前のメッセージも含めてすべての履歴データを読みたい場合は `earliest` を選択します。

   **確認** ボタンをクリックして設定を完了します。

3. SQLエディターがデータソースフィールドを更新し、`SELECT` で転送するフィールドを選択できます。

   ```sql
   SELECT
     key as key, value as value, topic as topic
   FROM
     "$bridges/kafka_consumer:source-812985f2"
   ```

4. **次へ** をクリックし、出力アクションを作成します。

5. 新規出力アクションで **再パブリッシュ** を選択します。

6. 出力アクション情報を設定します：

   - **トピック**：転送先のMQTTトピックを指定します。`${var}` 形式のプレースホルダーをサポートします。ここでは `sub/${topic}` と入力し、元のトピックに `sub/` プレフィックスを付加して転送します。例えば、元のメッセージトピックが `t/1` の場合、転送先は `sub/t/1` となります。
   - **QoS**：メッセージパブリッシュのQoSを `0`、`1`、`2`、`${qos}` から選択、または他のフィールドからQoSを設定するためのプレースホルダーを入力します。ここでは元のメッセージのQoSに従う `${qos}` を選択します。
   - **保持フラグ**：`true`、`false`、`${flags.retain}` から選択し、メッセージを保持メッセージとしてパブリッシュするかを指定します。プレースホルダーを使って他のフィールドから保持フラグを設定することも可能です。ここでは元のメッセージの保持フラグに従う `${flags.retain}` を選択します。
   - **メッセージテンプレート**：転送するメッセージペイロード生成用のテンプレートです。デフォルトは空欄でルール出力結果を転送します。ここでは `${.}` と入力し、ルールエンジン内の全フィールドを転送します。

7. 他の設定はデフォルト値を使用し、**確認** ボタンをクリックして出力アクションの作成を完了します。

作成成功後、**新規ルール** ページに戻ります。**ルール** リストに新規作成したルールが表示されます。再パブリッシュアクションは現在 **アクション（ソース）** に表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部に再パブリッシュ出力アクションが表示されます。

### ルールのテスト

Kafkaトピック `temp_hum/emqx` にメッセージを送信し、デプロイメント内の転送先トピックをサブスクライブしてルールをテストできます。メッセージが `temp_hum/emqx` トピックにパブリッシュされると、デプロイメント内の `sub/temp_hum/emqx` トピックに転送されます。

以下は、[MQTTX](https://mqttx.app/zh/) を使って転送されたMQTTトピックをサブスクライブし、メッセージを受信する手順です。

1. MQTTXで現在のデプロイメントのトピック `sub/#` をサブスクライブします。

2. Kafkaプロデューサーや他のツールを使って、`temp_hum/emqx` トピックにメッセージをパブリッシュします：

   ```json
   {
      "temp": 55,
      "hum": 32
   }
   ```

3. MQTTXは現在のデプロイメント内の `sub/temp_hum/emqx` トピックでメッセージを受信します：

   ```json
   {
       "key": "",
       "value": {
           "temp": 55,
           "hum": 32
       }
   }
   ```
