Apache PulsarへのMQTTデータストリーミング
Apache Pulsarは、アプリケーションやシステム間でリアルタイムデータストリームを効率的に送信するために設計された、人気のあるオープンソースの分散イベントストリーミングプラットフォームです。Apache Pulsarは、より高いスケーラビリティ、より高速なスループット、そして低いレイテンシを提供します。IoTアプリケーションでは、デバイスが生成するデータは通常、軽量なMQTTプロトコルを使用して送信されます。Apache PulsarとEMQXのデータ統合により、ユーザーはMQTTデータを簡単にApache Pulsarにストリーミングし、IoTデバイスから生成されたデータのリアルタイム処理、保存、分析のために他のデータシステムと接続できます。
本ページでは、EMQXとPulsar間のデータ統合について詳細に解説し、データ統合の作成および検証の実践的な手順を提供します。
動作概要
Apache Pulsarとのデータ統合は、EMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送信機能とPulsarの強力なデータ処理機能を組み合わせています。組み込みのルールエンジンコンポーネントを利用することで、両プラットフォーム間のデータストリーミングおよび処理プロセスが簡素化されます。これにより、複雑なコーディングなしにMQTTデータをPulsarに送信し、Pulsarの強力なデータ処理機能を活用できるため、IoTデータの管理と活用がより効率的かつ便利になります。

EMQXはルールエンジンと設定されたSinkを通じてMQTTデータをApache Pulsarに転送し、全体の流れは以下の通りです:
- メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを介して正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンを使い、特定のトピックに基づいてMQTTメッセージを処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- Apache Pulsarへのデータストリーミング:ルールがトリガーされると、メッセージをPulsarに転送するアクションが実行されます。データはPulsarのメッセージキーおよび値に簡単にマッピング可能です。MQTTトピックをPulsarトピックにマッピングすることもでき、データの整理や識別が容易になり、後続のデータ処理や分析を促進します。
MQTTメッセージデータがApache Pulsarに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:
- Pulsarのコンシューマーアプリケーションを作成してこれらのメッセージをサブスクライブおよび処理します。ビジネスニーズに応じて、MQTTデータを他のデータソースと関連付けたり、集約・変換したりして、リアルタイムのデータ同期と統合を実現できます。
- 特定のMQTTメッセージを受信した際に、Pulsarのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動型機能を実現します。
- Pulsar内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検知してアラート通知を行ったり、条件に応じた対応を実施したりします。
- 複数のMQTTトピックからのデータを統合し、Pulsarの計算能力を活用してリアルタイムの集計・計算・分析を行い、より包括的なデータインサイトを得ることができます。
特長とメリット
Pulsarとのデータ統合により、以下の特長と利点が得られます:
- 信頼性の高いIoTデータメッセージ配信:EMQXはMQTTメッセージをバッチで確実にPulsarに送信でき、IoTデバイスとPulsarおよびアプリケーションシステムの統合を実現します。
- MQTTメッセージの変換:ルールエンジンを利用して、EMQXはMQTTメッセージの抽出、フィルタリング、付加情報の追加、変換を行い、Pulsarに送信します。
- 柔軟なトピックマッピング:Pulsar SinkはMQTTトピックをPulsarトピックに柔軟にマッピングでき、Pulsarメッセージのキー(Key)や値(Value)を簡単に設定可能です。
- 柔軟なパーティション選択:Pulsar SinkはMQTTトピックやクライアントに基づき、異なる戦略でPulsarのパーティションを選択でき、データの整理や識別に柔軟性を提供します。
- 高スループットシナリオでの処理能力:Pulsar Sinkは同期・非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。
はじめる前に
このセクションでは、EMQXダッシュボードでPulsarデータ統合を作成する前に必要な準備について説明します。
前提条件
Pulsarのインストール
DockerでPulsarを起動します。
docker run --rm -it -p 6650:6650 --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone -nfw -nss詳細な操作手順は、Pulsarドキュメントのクイックスタートを参照してください。
Pulsarトピックの作成
EMQXでデータ統合を作成する前に、該当するPulsarトピックを作成しておく必要があります。以下のコマンドで、publicテナントのdefaultネームスペースに1パーティションのmy-topicトピックを作成します。
docker exec -it pulsar bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic -p 1コネクターの作成
このセクションでは、SinkをPulsarサーバーに接続するためのコネクターの作成方法を説明します。
以下の手順は、EMQXとPulsarをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create ConnectorページでPulsarを選択し、Nextをクリックします。
- Configurationステップで以下を設定します:
- コネクター名を入力します。英数字の組み合わせで、例:
my_pulsar。 - Bridge Roleはデフォルトで
Producerが選択されています。 - Pulsarサーバーへの接続およびメッセージ書き込みに関する情報を設定します:
- Servers:
pulsar://localhost:6650を入力します。リモート環境の場合は適宜変更してください。 - Authentication:認証方式を選択します。
none、Basic auth、tokenがあります。Basic authの場合、EMQXはUsernameとPasswordを:で結合して認証文字列を作成します。 - Enable TLS:暗号化接続を確立したい場合はトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
- Servers:
- コネクター名を入力します。英数字の組み合わせで、例:
- 高度な設定(任意):高度な設定を参照してください。
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがPulsarサーバーに接続できるかテストできます。
- ページ下部のCreateボタンをクリックしてコネクターを作成します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成を続行できます。詳細はCreate a Rule with Pulsar Sinkを参照してください。
Pulsar Sinkを使ったルールの作成
このセクションでは、ソースMQTTトピックt/#のメッセージを処理し、処理済みデータを設定済みのSinkを介してPulsarトピックmy-topicに保存するルールをダッシュボードで作成する方法を説明します。
EMQXダッシュボードで、Integration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDを入力します。例:
my_rule。SQL Editorに以下のステートメントを入力します。これはトピック
t/#のMQTTメッセージをPulsarに保存する例です。注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドが
SELECT部分に含まれていることを確認してください。sqlSELECT * FROM "t/#"注意:初心者の方はSQL ExamplesとEnable TestをクリックしてSQLルールを学習・テストできます。
+ Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをPulsarに送信します。
Action Typeドロップダウンから
Pulsarを選択します。Actionドロップダウンはデフォルトの
Create Actionのままにします。既存のSinkを選択することも可能です。この例では新しいSinkを作成します。Sinkの名前を入力します。英数字の組み合わせで指定してください。
Connectorドロップダウンから先ほど作成した
my_pulsarを選択します。隣のボタンから新しいコネクターを作成することも可能です。設定パラメーターはコネクターの作成を参照してください。Sinkの以下のオプションを設定します:
- Pulsar Topic Name:先に作成した
persistent://public/default/my-topicを入力します。変数はサポートされていません。 - Partition Strategy:プロデューサーがメッセージをPulsarのパーティションに割り振る方法を選択します。
random、roundrobin、key_dispatchがあります。 - Compression:Pulsarメッセージのレコード圧縮に使用するアルゴリズムを指定します。
no_compression、snappy、zlibのいずれかを選択可能です。 - Retention Period:Pulsarトピックにパブリッシュされたメッセージの保持期間を定義します。この設定により、メッセージがサブスクライバーに利用可能な期間を制御できます。デフォルトは
infinityで、メッセージの自動期限切れはありません。秒数で数値を指定すると、その時間を超えたメッセージは自動的に期限切れとなりトピックから削除されます。 - Message Key:Pulsarメッセージのキーを指定します。プレースホルダー(${var})を含む文字列も可能です。
- Message Value:Pulsarメッセージの値を指定します。こちらもプレースホルダーを含む文字列が使用可能です。
- Pulsar Topic Name:先に作成した
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらがトリガーされます。詳細はフォールバックアクションを参照してください。
高度な設定(任意):高度な設定を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてコネクターがPulsarサーバーに接続できるかテストします。
CreateボタンをクリックしてSinkの設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認してからCreateをクリックしルールを生成します。
これでルールの作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると、新しいPulsar Sinkが表示されます。
また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#のメッセージがPulsarに送信・保存されていることが確認できます。
ルールのテスト
MQTTXを使ってトピックt/1にメッセージを送信します:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Pulsar" }'Sinkの稼働状況を確認すると、1件の新規受信メッセージと1件の新規送信メッセージがあるはずです。
以下のPulsarコマンドで、メッセージがトピックpersistent://public/default/my-topicに書き込まれているか確認します:
docker exec -it pulsar bin/pulsar-client consume -n 0 -s mysubscriptionid -p Earliest persistent://public/default/my-topic高度な設定
このセクションでは、Pulsar Sinkのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションについて説明します。Sink作成時にAdvanced Settingsを展開し、ビジネスニーズに応じて以下の設定を行えます。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| Max Inflight | プロデューサーが各パーティションに送信できるメッセージバッチの最大数。これを超えると応答を待つ必要があります。 数値を大きくするとスループットが向上します。 | 10 |
| Sync Publish Timeout | 同期パブリッシュ操作で、メッセージが正常に配信された確認を待つ最大時間(秒)。 配信問題やネットワーク障害時に無限待機を防ぎ、データ信頼性を確保します。 | 3秒 |
| Socket Send Buffer Size | ネットワーク送信性能を最適化するためのソケットバッファサイズ管理。 | 1 MB |
| Batch Size | 1つのPulsarメッセージにバッチングする個別リクエストの最大数。 | 100 |
| Max Batch Bytes | Pulsarバッチ内で収集可能なメッセージの最大バイト数。通常Pulsarブローカーのデフォルトは1 MBですが、EMQXはメッセージエンコードのオーバーヘッドを考慮し、1 MB未満に設定しています。単一メッセージがこの制限を超える場合は別バッチで送信されます。 | 900 KB |
| Query Mode | メッセージ送信を最適化するためにasynchronousまたはsynchronousのモードを選択可能。非同期モードではPulsarへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがPulsar到着前にメッセージを受信する可能性があります。 | Async |
| Buffer Mode | メッセージ送信前のバッファリング方法を定義。メモリバッファリングは送信速度を向上させます。memory: メモリにバッファ。EMQXノード再起動時にメッセージは失われます。disk: ディスクにバッファ。ノード再起動後もメッセージは保持されます。hybrid: 初期はメモリにバッファし、一定サイズ(segment_bytes設定参照)に達すると徐々にディスクにオフロードします。メモリモード同様、ノード再起動時にメッセージは失われます。 | memory |
| Pulsar Per-partition Buffer Limit | 各Pulsarパーティションの最大バッファサイズ(バイト)。上限に達すると古いメッセージが破棄され、新しいメッセージ用にバッファ領域が確保されます。 メモリ使用量と性能のバランス調整に役立ちます。 | 2 GB |
| Segment File Bytes | バッファモードがdiskまたはhybridの場合に適用。メッセージ保存用の分割ファイルサイズを制御し、ディスクストレージの最適化に影響します。 | 100 MB |
| Memory Overload Protection | バッファモードがmemoryの場合に適用。メモリ使用率が高い場合に古いバッファメッセージを自動破棄し、システムの安定性を維持します。注意:Linuxシステムのみ有効です。 | disabled |
| Start Timeout | コネクターが自動起動したリソースの正常状態を待つ最大時間(秒)。Polarなどのインスタンスが完全に稼働し、データ処理準備が整うまで操作を進めないようにします。 | 5秒 |
| Health Check Interval | Sinkの稼働状態をチェックする間隔。 | 1秒 |