# Apache PulsarへMQTTデータをストリーミングする

[Apache Pulsar](https://pulsar.apache.org/)は、アプリケーションやシステム間でリアルタイムデータストリームを効率的に送信するために設計された、人気のあるオープンソースの分散イベントストリーミングプラットフォームです。Apache Pulsarは、より高いスケーラビリティ、より高速なスループット、そして低いレイテンシを提供します。IoTアプリケーションでは、デバイスが生成するデータは通常、軽量なMQTTプロトコルを使って送信されます。Apache PulsarとEMQXのデータ統合により、ユーザーはMQTTデータを簡単にApache Pulsarへストリーミングし、IoTデバイスから生成されるデータのリアルタイム処理、保存、分析のために他のデータシステムと接続できます。

本ページでは、EMQXとPulsar間のデータ統合の詳細な概要と、データ統合の作成および検証に関する実践的な手順を提供します。

## 動作の仕組み

Apache Pulsarとのデータ統合は、EMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送信機能とPulsarの強力なデータ処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理のプロセスが簡素化されています。これにより、複雑なコーディングなしでMQTTデータをPulsarに送信し、Pulsarの強力なデータ処理機能を活用でき、IoTデータの管理と活用がより効率的かつ便利になります。

![EMQX Data Integration - Apache Pulsar](./assets/emqx-integration-pulsar.jpg)

EMQXはルールエンジンと設定されたSinkを通じてMQTTデータをApache Pulsarに転送し、全体のプロセスは以下の通りです：

1. **メッセージのパブリッシュと受信**：IoTデバイスはMQTTプロトコルを介して正常に接続を確立し、その後特定のトピックにテレメトリおよびステータスデータをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンを使用して、特定のソースからのMQTTメッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
3. **Apache Pulsarへのデータストリーミング**：ルールがメッセージをPulsarへ転送するアクションをトリガーし、データはPulsarのメッセージキーおよび値に簡単に設定できます。MQTTトピックはPulsarトピックにマッピング可能で、データの整理や識別が容易になり、後続のデータ処理や分析が促進されます。

MQTTメッセージデータがApache Pulsarに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です：

- Pulsarのコンシューマーアプリケーションを作成してこれらのメッセージをサブスクライブおよび処理します。ビジネスニーズに応じて、MQTTデータを他のデータソースと関連付けたり集約・変換したりして、リアルタイムのデータ同期と統合を実現できます。
- 特定のMQTTメッセージを受信した際に、Pulsarのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現します。
- Pulsar内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知を行ったり、条件に応じた対応アクションを実行したりします。
- 複数のMQTTトピックからのデータを統合して単一のデータストリームに集約し、Pulsarの計算機能を活用してリアルタイムの集計、計算、分析を行い、より包括的なデータインサイトを得ます。

## 特長と利点

Pulsarとのデータ統合は、以下の特長と利点をビジネスにもたらします：

- **信頼性の高いIoTデータメッセージ配信**：EMQXはMQTTメッセージをバッチで確実にPulsarへ送信でき、IoTデバイスとPulsarおよびアプリケーションシステムの統合を実現します。
- **MQTTメッセージ変換**：ルールエンジンを活用し、EMQXはMQTTメッセージのフィルタリングや変換を行えます。メッセージはPulsarへ送信される前にデータ抽出、フィルタリング、付加、変換が可能です。
- **柔軟なトピックマッピング**：Pulsar SinkはMQTTトピックをPulsarトピックに柔軟にマッピングでき、Pulsarメッセージ内のキー（Key）や値（Value）の設定も簡単に行えます。
- **柔軟なパーティション選択**：Pulsar SinkはMQTTトピックやクライアントに基づいて異なる戦略でPulsarのパーティションを選択でき、データの整理や識別に柔軟性を提供します。
- **高スループットシナリオでの処理能力**：Pulsar Sinkは同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

## はじめる前に

このセクションでは、EMQXダッシュボードでPulsarデータ統合を作成する前に必要な準備について説明します。

### 前提条件

- EMQXデータ統合の[ルール](./rules.md)に関する知識
- [データ統合](./data-bridges.md)に関する知識

### Pulsarのインストール

DockerでPulsarを実行します。

```bash
docker run --rm -it -p 6650:6650 --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone -nfw -nss
```

詳細な操作手順は、[Pulsarドキュメントのクイックスタートセクション](https://pulsar.apache.org/docs/2.11.x/getting-started-home/)を参照してください。

### Pulsarトピックの作成

EMQXでデータ統合を作成する前に、関連するPulsarトピックを作成しておく必要があります。以下のコマンドで、`public`テナント、`default`ネームスペースに1パーティションのトピック`my-topic`を作成します。

```bash
docker exec -it pulsar bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic -p 1
```

## コネクターの作成

このセクションでは、SinkをPulsarサーバーに接続するためのコネクター作成手順を示します。

以下の手順は、EMQXとPulsarがローカルマシンで実行されていることを前提としています。リモートで実行している場合は設定を適宜調整してください。

1. EMQXダッシュボードに入り、**Integration** -> **Connectors**をクリックします。
2. ページ右上の**Create**をクリックします。
3. **Create Connector**ページで**Pulsar**を選択し、**Next**をクリックします。
4. **Configuration**ステップで以下を設定します：
   - コネクター名を入力します。英数字の組み合わせで、例：`my_pulsar`
   - **Bridge Role**はデフォルトで`Producer`が選択されています。
   - Pulsarサーバーへの接続およびメッセージ書き込み情報を設定します：
     - **Servers**：`pulsar://localhost:6650`を入力します。リモート環境の場合は適宜変更してください。
     - **Authentication**：認証方式を`none`、`Basic auth`、または`token`から選択します。`Basic auth`の場合、EMQXは`Username`と`Password`を`:`で連結して認証文字列を作成します。
     - **Enable TLS**：暗号化接続を確立したい場合はトグルスイッチをオンにします。TLS接続の詳細は[外部リソースアクセスのTLS](../network/overview.md/#tls-for-external-resource-access)を参照してください。
5. 詳細設定（任意）：[詳細設定](#advanced-configurations)を参照してください。
6. **Create**をクリックする前に、**Test Connectivity**をクリックしてコネクターがPulsarサーバーに接続できるかテストできます。
7. ページ下部の**Create**ボタンをクリックしてコネクター作成を完了します。ポップアップダイアログで**Back to Connector List**をクリックするか、**Create Rule**をクリックしてルールとSinkの作成に進めます。詳細は[Create a Rule with Pulsar Sink](#create-a-rule-with-pulsar-sink)を参照してください。

## Pulsar Sinkを使ったルールの作成

このセクションでは、ソースMQTTトピック`t/#`からのメッセージを処理し、処理済みデータを設定したSinkを介してPulsarトピック`my-topic`に保存するルール作成手順を示します。

1. EMQXダッシュボードで**Integration** -> **Rules**をクリックします。

2. ページ右上の**Create**をクリックします。

3. ルールIDを入力します。例：`my_rule`

4. **SQL Editor**に以下のステートメントを入力します。これはトピック`t/#`のMQTTメッセージをPulsarに保存する例です。

   注意：独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを`SELECT`部分に含めていることを確認してください。

   ```sql
   SELECT
     *
   FROM
     "t/#"
   ```

   注意：初心者の方は**SQL Examples**や**Enable Test**をクリックしてSQLルールを学習・テストできます。

5. **+ Add Action**ボタンをクリックして、ルールによりトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをPulsarに送信します。

6. **Action Type**ドロップダウンから`Pulsar`を選択します。

7. **Action**ドロップダウンはデフォルトの`Create Action`のままにします。既に作成済みのSinkを選択することも可能です。この例では新規にSinkを作成します。

8. Sinkの名前を入力します。英数字の組み合わせで指定してください。

9. **Connector**ドロップダウンから先ほど作成した`my_pulsar`を選択します。新規コネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメータは[Create a Connector](#create-a-connector)を参照してください。

10. Sinkの以下オプションを設定します：

    - **Pulsar Topic Name**：事前に作成した`persistent://public/default/my-topic`を入力します。変数はサポートされていません。
    - **Partition Strategy**：プロデューサーがPulsarのパーティションにメッセージを振り分ける方法を選択します。`random`、`roundrobin`、`key_dispatch`のいずれかを選択可能です。
    - **Compression**：圧縮アルゴリズムの使用有無と、Pulsarメッセージ内のレコード圧縮・解凍に使うアルゴリズムを指定します。選択肢は`no_compression`、`snappy`、`zlib`です。
    - **Retention Period**：Pulsarトピックにパブリッシュされたメッセージの保持期間を定義します。この設定により、メッセージがサブスクライバーに消費可能な期間を制御できます。デフォルトは`infinity`で、メッセージの自動期限切れはありません。秒数で数値を指定すると、その時間を超えたメッセージは自動的に期限切れとなりトピックから削除されます。
    - **Message Key**：Pulsarメッセージのキーを指定します。プレーン文字列またはプレースホルダー（${var}）を含む文字列を入力可能です。
    - **Message Value**：Pulsarメッセージの値を指定します。プレーン文字列またはプレースホルダー（${var}）を含む文字列を入力可能です。

11. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細は[Fallback Actions](./data-bridges.md#fallback-actions)を参照してください。

12. **詳細設定（任意）**：[詳細設定](#advanced-configurations)を参照してください。

13. **Create**をクリックする前に、**Test Connectivity**をクリックしてコネクターがPulsarサーバーに接続できるかテスト可能です。

14. **Create**ボタンをクリックしてSinkの設定を完了します。新しいSinkが**Action Outputs**に追加されます。

15. **Create Rule**ページに戻り、設定内容を確認してから**Create**ボタンをクリックしてルールを生成します。

これでルールの作成が完了しました。**Integration** -> **Rules**ページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいPulsar Sinkが表示されます。

また、**Integration** -> **Flow Designer**をクリックするとトポロジーが表示され、トピック`t/#`のメッセージがPulsarに送信・保存されていることが確認できます。

## ルールのテスト

MQTTXを使ってトピック`t/1`にメッセージを送信します：

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Pulsar" }'
```

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

以下のPulsarコマンドでトピック`persistent://public/default/my-topic`にメッセージが書き込まれているか確認します：

```bash
docker exec -it pulsar bin/pulsar-client consume -n 0 -s mysubscriptionid -p Earliest persistent://public/default/my-topic
```

## 詳細設定

このセクションでは、Pulsar Sinkのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションを説明します。Sink作成時に**Advanced Settings**を展開し、ビジネスニーズに合わせて以下の設定を行えます。

| フィールド                          | 説明                                                         | 推奨値             |
| --------------------------------- | ------------------------------------------------------------ | ------------------ |
| Max Inflight                      | プロデューサーが各パーティションに送信できるメッセージバッチの最大数。これを超えると応答を待つ必要があります。<br/>数値を大きくするとスループットが向上します。 | `10`               |
| Sync Publish Timeout              | 同期パブリッシュ操作で、メッセージが正常に配信されたことを確認するまでの最大待機時間（秒）。<br/>このタイムアウトは、配信問題やネットワーク障害時にパブリッシャーが無限にブロックされるのを防ぎ、データ信頼性を確保します。 | `3` 秒             |
| Socket Send Buffer Size           | ソケットバッファのサイズを管理し、ネットワーク送信性能を最適化します。 | `1` MB             |
| Batch Size                        | 1つのPulsarメッセージ内にバッチングされる個別リクエストの最大数を指定します。 | `100`              |
| Max Batch Bytes                   | Pulsarバッチ内でメッセージを収集する最大バイト数。通常、Pulsarブローカーのデフォルトバッチサイズは1MBですが、EMQXのデフォルト値はメッセージエンコードのオーバーヘッドを考慮し、1MBよりやや小さく設定されています。単一メッセージがこの制限を超える場合は別バッチとして送信されます。 | `900` KB           |
| Query Mode                        | メッセージ送信を最適化するために、`asynchronous`または`synchronous`のクエリモードを選択可能です。非同期モードではPulsarへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがPulsar到着前にメッセージを受信する可能性があります。 | `Async`            |
| Buffer Mode                       | メッセージを送信前にバッファリングするかどうかを定義します。メモリバッファリングは送信速度を向上させます。<br/>`memory`：メッセージはメモリにバッファされます。EMQXノード再起動時に失われます。<br/>`disk`：メッセージはディスクにバッファされ、EMQXノード再起動後も保持されます。<br/>`hybrid`：初めはメモリにバッファし、一定サイズ（`segment_bytes`設定参照）に達するとディスクに徐々にオフロードします。メモリモード同様、EMQXノード再起動時に失われます。 | `memory`           |
| Pulsar Per-partition Buffer Limit | 各Pulsarパーティションの最大バッファサイズ（バイト）。この制限に達すると、古いメッセージが破棄されてバッファスペースが確保されます。<br/>メモリ使用量とパフォーマンスのバランス調整に役立ちます。 | `2` GB             |
| Segment File Bytes                | バッファモードが`disk`または`hybrid`の場合に適用される設定で、メッセージ保存用のセグメントファイルのサイズを制御し、ディスクストレージの最適化に影響します。 | `100` MB           |
| Memory Overload Protection        | バッファモードが`memory`の場合に適用される設定で、高メモリ使用時に古いバッファメッセージを自動破棄し、システムの不安定化を防ぎ信頼性を確保します。<br/>**注**：Linuxシステムでのみ有効です。 | `disabled`         |
| Start Timeout                     | コネクターが自動起動したリソースの正常状態を確認するために待機する最大時間（秒）です。この設定により、Polarなどの接続リソースが完全に稼働しデータ処理可能になるまで操作を進めないようにします。 | `5` 秒             |
| Health Check Interval             | Sinkの稼働状態をチェックする間隔（秒）です。 | `1` 秒             |
