# Apache Pulsar への MQTT データストリーム

[Apache Pulsar](https://pulsar.apache.org/) は、アプリケーションやシステム間でリアルタイムデータストリームを効率的に送信するために設計された、人気のあるオープンソースの分散イベントストリーミングプラットフォームです。Apache Pulsar は優れたスケーラビリティ、高速なスループット、低レイテンシを提供します。IoT アプリケーションにおいては、デバイスが生成するデータは通常、軽量な MQTT プロトコルを用いて送信されます。Apache Pulsar と EMQX 間のデータ統合により、ユーザーは MQTT データを簡単に Apache Pulsar にストリームし、IoT デバイスが生成するデータのリアルタイム処理、保存、分析のために他のデータシステムと接続できます。

本ページでは、EMQX プラットフォームと Pulsar 間のデータ統合について詳細に解説し、データ統合の作成および検証に関する実践的な手順を提供します。

## 動作概要

Apache Pulsar データ統合は、EMQX プラットフォームの標準機能であり、EMQX プラットフォームのデバイス接続およびメッセージ送信機能と、Pulsar の強力なデータ処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理が簡素化されます。これにより、複雑なコーディングなしで MQTT データを Pulsar に送信し、Pulsar の強力なデータ処理機能を活用できるため、IoT データの管理と活用がより効率的かつ便利になります。

![EMQX プラットフォームと Pulsar のデータ統合](./_assets/data_integration_pulsar.jpg)

EMQX プラットフォームは、ルールエンジンと設定されたアクションを通じて MQTT データを Apache Pulsar に転送し、全体の処理は以下の通りです：

1. **メッセージのパブリッシュと受信**：IoT デバイスは MQTT プロトコルを介して正常に接続を確立し、その後特定のトピックにテレメトリおよびステータスデータをパブリッシュします。EMQX プラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンを使い、特定のソースからの MQTT メッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールをマッチさせ、データフォーマットの変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
3. **Apache Pulsar へのデータストリーミング**：ルールがトリガーされると、メッセージを Pulsar に転送するアクションが実行されます。データは Pulsar のメッセージキーおよび値に簡単にマッピング可能です。MQTT トピックも Pulsar トピックにマッピングでき、データの整理や識別が容易になり、後続のデータ処理や分析を促進します。

MQTT メッセージデータが Apache Pulsar に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です：

- Pulsar のコンシューマーアプリケーションを作成し、これらのメッセージをサブスクライブして処理します。ビジネスニーズに応じて、MQTT データと他のデータソースを関連付けたり集約したり変換したりして、リアルタイムのデータ同期と統合を実現できます。
- 特定の MQTT メッセージを受信した際に、Pulsar のルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間およびアプリケーションのイベント駆動型機能を実現します。
- Pulsar 内で MQTT データストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知を行ったり、条件に応じた対応アクションを実行したりします。
- 複数の MQTT トピックからのデータを統合し、Pulsar の計算機能を活用してリアルタイムの集約、計算、分析を行い、より包括的なデータインサイトを得ることができます。

## 特徴と利点

Pulsar とのデータ統合により、以下の特徴とメリットがビジネスにもたらされます：

- **信頼性の高い IoT データメッセージ配信**：EMQX は MQTT メッセージをバッチ処理で安定的に Pulsar に送信でき、IoT デバイスと Pulsar およびアプリケーションシステムの統合を実現します。
- **MQTT メッセージの変換**：ルールエンジンを用いて、EMQX は MQTT メッセージのフィルタリングや変換が可能です。メッセージは Pulsar に送信される前にデータ抽出、フィルタリング、付加、変換などの処理を受けられます。
- **柔軟なトピックマッピング**：Pulsar アクションは MQTT トピックを Pulsar トピックに柔軟にマッピングでき、Pulsar メッセージのキー（Key）や値（Value）を簡単に設定できます。
- **柔軟なパーティション選択**：Pulsar アクションは MQTT トピックやクライアントに基づき、異なる戦略で Pulsar のパーティションを選択可能で、データの整理や識別に柔軟性を提供します。
- **高スループットシナリオでの処理能力**：Pulsar アクションは同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

## はじめる前に

このセクションでは、EMQX プラットフォームで Pulsar データ統合を作成する前に必要な準備について説明します。

### 前提条件

- [データ統合](./introduction.md)に関する知識
- データ統合の[ルール](./rules.md)に関する知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Pulsar のインストール

Docker 上で Pulsar を起動します。

```bash
docker run --rm -it -p 6650:6650 --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone -nfw -nss
```

詳細な操作手順は、[Pulsar ドキュメントのクイックスタート](https://pulsar.apache.org/docs/2.11.x/getting-started-home/)を参照してください。

### Pulsar トピックの作成

EMQX でのデータ統合作成前に、対象の Pulsar トピックを作成しておく必要があります。以下のコマンドで、`public` テナントの `default` ネームスペースに、1 パーティションの `my-topic` トピックを作成します。

```bash
docker exec -it pulsar bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic -p 1
```

## コネクターの作成

データ統合ルールを作成する前に、まず Pulsar サーバーにアクセスするための Pulsar コネクターを作成します。

1. デプロイメントに移動し、左ナビゲーションメニューから **データ統合** をクリックします。初めてコネクターを作成する場合は、**データ転送** カテゴリの下にある **Pulsar** を選択します。既にコネクターを作成済みの場合は、**新規コネクター** を選択し、続いて **データ転送** カテゴリの下の **Pulsar** を選択します。

2. **コネクター名**：システムが自動的にコネクター名を生成します。

3. 接続情報を入力します：

    - **Servers**：`pulsar://xxx:6650` を入力します。Pulsar と EMQX がリモートで稼働している場合は、`xxx` の部分を適宜変更してください。
    - **認証**：実際の状況に応じて認証方式を選択します。`none`、`Basic auth`、または `token` から選べます。
    - **TLS 有効化**：暗号化接続を確立する場合は、トグルスイッチをオンにします。
    - ビジネスニーズに応じて詳細設定を行います（任意）。

4. **テスト** ボタンをクリックします。Pulsar サービスにアクセス可能であれば、**connector available** のメッセージが返されます。

5. **新規作成** ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

## Pulsar プロデューサールールの作成

このセクションでは、EMQX プラットフォームコンソールを使って Pulsar プロデューサールールを作成し、ルールにアクションを追加する方法を説明します。

1. ルールエリアで **新規ルール** をクリックするか、作成したコネクターの **アクション** 列にある新規ルールアイコンをクリックします。

2. 利用したい機能に基づいて **SQL エディター** でルールを設定します。ここでは、クライアントが `temp_hum/emqx` トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目的としています。以下のような SQL を記述します：

   ```sql
    SELECT
     timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL Examples** をクリックし、**Enable Test** を有効にして SQL ルールの学習とテストを行うことをおすすめします。

   :::

3. **次へ** をクリックしてアクションを追加します。

4. **Connector** ドロップダウンから先ほど作成したコネクターを選択します。

5. EMQX プラットフォームから Pulsar サービスへメッセージをパブリッシュするための情報を設定します：

   - **Pulsar Topic Name**：事前に作成した `persistent://public/default/my-topic` を入力します。変数はサポートされていませんのでご注意ください。
   - **Partition Strategy**：プロデューサーがメッセージを Pulsar のパーティションに振り分ける方法を選択します。`random`、`roundrobin`、`key_dispatch` から選べます。
   - **Compression**：圧縮アルゴリズムを使用するかどうか、および Pulsar メッセージのレコードを圧縮／解凍するために使用するアルゴリズムを指定します。選択肢は `no_compression`、`snappy`、`zlib` です。
   - **Message Key**：Pulsar メッセージのキーを指定します。プレーン文字列か、プレースホルダー（${var}）を含む文字列を入力できます。
   - **Message Value**：Pulsar メッセージの値を指定します。プレーン文字列か、プレースホルダー（${var}）を含む文字列を入力できます。

6. 詳細設定（任意）：[高度な設定](#advanced-configurations)を参照してください。

7. **確定** ボタンをクリックしてルール作成を完了します。

8. **新規ルール成功** ポップアップで **ルールに戻る** をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/) を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。

1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - クライアントID：`test_client`
   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. 以下の Pulsar コマンドで、メッセージが `persistent://public/default/my-topic` トピックに書き込まれているか確認します。

   ```bash
   docker exec -it pulsar bin/pulsar-client consume -n 0 -s mysubscriptionid -p Earliest persistent://public/default/my-topic
   ```

## 高度な設定

このセクションでは、Pulsar アクションのパフォーマンスを最適化し、特定のシナリオに応じたカスタマイズを行うための高度な設定オプションについて説明します。アクション作成時に **詳細設定** を展開し、ビジネスニーズに応じて以下の設定を行えます。

| 項目名                     | 説明                                                         | 推奨値             |
| -------------------------- | ------------------------------------------------------------ | ------------------ |
| Max Inflight               | プロデューサーが各パーティションに送信できるメッセージバッチの最大数。<br/>この数を増やすとスループットが向上します。 | `10`               |
| Sync Publish Timeout       | 同期パブリッシュ操作で、メッセージが正常に配信されたことを確認するためにパブリッシャーが待機する最大時間。<br />このタイムアウトは、配信問題やネットワーク障害時にパブリッシャーが無限にブロックされるのを防ぎ、データ信頼性を確保します。 | `3` 秒             |
| Socket Send Buffer Size    | ソケットバッファのサイズを管理し、ネットワーク送信性能を最適化します。 | `1` MB             |
| Batch Size                 | 1 つの Pulsar メッセージ内にバッチングされる個別リクエストの最大数を指定します。 | `100`              |
| Max Batch Bytes            | Pulsar バッチ内でメッセージを収集する最大バイト数。通常、Pulsar ブローカーのデフォルトバッチサイズは 1 MB ですが、EMQX のデフォルト値はメッセージのエンコードオーバーヘッドを考慮し、1 MB よりやや小さく設定されています。個別メッセージがこの制限を超える場合は単独でバッチとして送信されます。 | `900` KB           |
| Connect Timeout            | TCP 接続確立の最大待機時間。認証が有効な場合は認証時間も含みます。 | `5` 秒             |
| Buffer Mode                | メッセージを送信前にバッファに保存するかどうかを定義します。メモリバッファリングは送信速度を向上させます。<br />`memory`：メッセージはメモリにバッファされます。EMQX ノード再起動時に失われます。<br />`disk`：メッセージはディスクにバッファされ、EMQX ノード再起動後も保持されます。<br />`hybrid`：メッセージは最初メモリにバッファされ、一定の制限（`segment_bytes` 設定参照）に達すると徐々にディスクにオフロードされます。メモリモード同様、EMQX ノード再起動時に失われます。 | `memory`           |
| Per-partition Buffer Limit | 各 Pulsar パーティションで許容される最大バッファサイズ（バイト単位）。この制限に達すると、古いメッセージが破棄されてバッファスペースを確保します。<br />メモリ使用量とパフォーマンスのバランス調整に役立ちます。 | `2` GB             |
| Segment File Bytes         | バッファモードが `disk` または `hybrid` の場合に適用される設定で、メッセージ保存用のセグメントファイルサイズを制御し、ディスクストレージの最適化レベルに影響します。 | `100` MB           |
| Memory Overload Protection | バッファモードが `memory` の場合に適用される設定で、高メモリ圧迫時に古いバッファメッセージを自動的に破棄し、システムの安定性を維持します。<br />**注意**：高メモリ使用の閾値は設定パラメータ `sysmon.os.sysmem_high_watermark` で定義されます。この設定は Linux システムのみ有効です。 | `disabled`         |
| Start Timeout              | コネクターが自動起動したリソースの正常状態到達を待機する最大時間（秒）。Polar などの接続リソースが完全に稼働し、データ処理準備が整うまで操作を進めないようにします。 | `5` 秒             |
| Health Check Interval      | アクションの稼働状態をチェックする間隔時間。 | `15` 秒            |
