Apache PulsarへMQTTデータをストリームする
Apache Pulsar は、アプリケーションやシステム間でリアルタイムデータストリームを効率的に送信するために設計された、人気のあるオープンソースの分散型イベントストリーミングプラットフォームです。Apache Pulsarは、より高いスケーラビリティ、より高速なスループット、そして低いレイテンシを提供します。IoTアプリケーションでは、デバイスが生成するデータは通常、軽量なMQTTプロトコルを用いて送信されます。Apache PulsarとEMQX間のデータ統合により、ユーザーはMQTTデータを簡単にApache Pulsarへストリームし、他のデータシステムと連携してIoTデバイスが生成するデータのリアルタイム処理、保存、分析を行うことが可能になります。
本ページでは、EMQX CloudとPulsar間のデータ統合の詳細な概要と、データ統合の作成および検証に関する実践的な手順を提供します。
動作の仕組み
Apache Pulsarデータ統合は、EMQX Cloudの標準機能であり、EMQX Cloudのデバイス接続およびメッセージ送信機能とPulsarの強力なデータ処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理のプロセスが簡素化されています。これにより、複雑なコーディングを必要とせずにMQTTデータをPulsarへ簡単に送信し、Pulsarの強力なデータ処理機能を活用できるため、IoTデータの管理と活用がより効率的かつ便利になります。

EMQX Cloudはルールエンジンと設定されたアクションを通じてMQTTデータをApache Pulsarに転送し、その全体の流れは以下の通りです。
- メッセージのパブリッシュと受信: IoTデバイスはMQTTプロトコルを介して正常に接続を確立し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理: 組み込みのルールエンジンを用いて、特定のソースからのMQTTメッセージをトピックマッチングに基づいて処理できます。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
- Apache Pulsarへのデータストリーミング: ルールがトリガーされると、メッセージ転送アクションがPulsarへメッセージを送信します。データはPulsarのメッセージキーおよび値に簡単に設定でき、MQTTトピックはPulsarトピックにマッピング可能で、データの整理や識別が容易になり、後続のデータ処理や分析を促進します。
MQTTメッセージデータがApache Pulsarに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。
- Pulsarのコンシューマーアプリケーションを作成し、これらのメッセージをサブスクライブして処理します。ビジネスニーズに応じて、MQTTデータを他のデータソースと関連付けたり集約・変換したりして、リアルタイムのデータ同期と統合を実現できます。
- 特定のMQTTメッセージを受信した際に、Pulsarのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動型機能を実現します。
- Pulsar内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知や対応アクションを実行します。
- 複数のMQTTトピックからのデータを統合し、Pulsarの計算機能を用いてリアルタイムの集約・計算・分析を行い、より包括的なデータインサイトを得ることができます。
特長とメリット
Pulsarとのデータ統合により、以下の特長と利点がビジネスにもたらされます。
- 信頼性の高いIoTデータメッセージ配信: EMQXはMQTTメッセージをバッチ処理で確実にPulsarへ送信でき、IoTデバイスとPulsarおよびアプリケーションシステムの統合を実現します。
- MQTTメッセージの変換: ルールエンジンを活用して、EMQXはMQTTメッセージのフィルタリングや変換を行えます。メッセージはPulsarに送信される前にデータ抽出、フィルタリング、付加情報の追加、変換が可能です。
- 柔軟なトピックマッピング: PulsarアクションはMQTTトピックをPulsarトピックに柔軟にマッピングでき、Pulsarメッセージのキー(Key)や値(Value)を簡単に設定できます。
- 柔軟なパーティション選択: PulsarアクションはMQTTトピックやクライアントに基づいて異なる戦略でPulsarのパーティションを選択でき、データの整理や識別に柔軟性を提供します。
- 高スループットシナリオでの処理能力: Pulsarアクションは同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。
はじめる前に
このセクションでは、EMQX CloudでPulsarデータ統合を作成する前に必要な準備について説明します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
Pulsarのインストール
DockerでPulsarを実行します。
docker run --rm -it -p 6650:6650 --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone -nfw -nss詳細な操作手順は、Pulsarドキュメントのクイックスタートセクションを参照してください。
Pulsarトピックの作成
EMQXでデータ統合を作成する前に、関連するPulsarトピックを作成しておく必要があります。以下のコマンドで、publicテナントのdefaultネームスペースに、パーティション数1のmy-topicというトピックを作成します。
docker exec -it pulsar bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic -p 1コネクターの作成
データ統合ルールを作成する前に、まずPulsarサーバーにアクセスするためのPulsarコネクターを作成する必要があります。
デプロイメントに移動し、左側のナビゲーションメニューから データ統合 をクリックします。初めてコネクターを作成する場合は、データ転送 カテゴリの下にある Pulsar を選択します。すでにコネクターを作成済みの場合は、新規コネクター を選択し、続いて データ転送 カテゴリの下の Pulsar を選択します。
コネクター名: システムが自動的にコネクター名を生成します。
接続情報を入力します:
- Servers:
pulsar://xxx:6650を入力します。PulsarとEMQXがリモートで動作している場合は、xxxの部分を適宜設定してください。 - 認証: 実際の状況に応じて、
none、Basic auth、またはtokenを選択します。 - TLSを有効化: 暗号化接続を確立したい場合は、トグルスイッチをオンにします。
- ビジネスニーズに応じて高度な設定を行います(任意)。
- Servers:
テストボタンをクリックします。Pulsarサービスにアクセス可能であれば、connector available のメッセージが返されます。
新規作成ボタンをクリックして作成を完了します。
これで、このコネクターを基にデータブリッジルールを作成できます。
Pulsarプロデューサールールの作成
このセクションでは、EMQX Cloudコンソールを使ってPulsarプロデューサールールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアで 新規ルール をクリックするか、作成したコネクターの アクション 列にある新規ルールアイコンをクリックします。
利用したい機能に基づいて SQLエディター でルールを設定します。ここでは、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目標としています。以下のSQLを使用します。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL例 をクリックし、試してみる でSQLルールを学習・テストできます。
次へ をクリックしてアクションを追加します。
コネクター ドロップダウンから先ほど作成したコネクターを選択します。
EMQX CloudからPulsarサービスへメッセージをパブリッシュするための情報を設定します。
- Pulsarトピック名: 事前に作成した
persistent://public/default/my-topicを入力します。変数はここではサポートされていません。 - パーティション戦略: プロデューサーがPulsarのパーティションにメッセージを振り分ける方法を選択します。
random、roundrobin、key_dispatchのいずれかを選択可能です。 - 圧縮: 圧縮アルゴリズムの使用有無および、Pulsarメッセージ内のレコードを圧縮・解凍するアルゴリズムを指定します。選択肢は
no_compression、snappy、zlibです。 - メッセージキー: Pulsarメッセージのキーを入力します。プレーン文字列またはプレースホルダー(${var})を含む文字列が使用可能です。
- メッセージ値: Pulsarメッセージの値を入力します。プレーン文字列またはプレースホルダー(${var})を含む文字列が使用可能です。
- Pulsarトピック名: 事前に作成した
高度な設定(任意)については、高度な設定を参照してください。
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップで ルールに戻る をクリックし、データ統合の設定が完了します。
ルールのテスト
MQTTX を使って温度・湿度データの送信をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxクライアントID:
test_clientペイロード:
json{ "temp": "27.5", "hum": "41.8" }
以下のPulsarコマンドで、メッセージがトピック
persistent://public/default/my-topicに書き込まれているか確認します。bashdocker exec -it pulsar bin/pulsar-client consume -n 0 -s mysubscriptionid -p Earliest persistent://public/default/my-topic
高度な設定
このセクションでは、Pulsarアクションのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションについて説明します。アクション作成時に 高度な設定 を展開し、ビジネスニーズに応じて以下の設定を行えます。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| Max Inflight | プロデューサーが各パーティションに送信できるメッセージバッチの最大数。 この数を大きく設定するとスループットが向上します。 | 10 |
| Sync Publish Timeout | 同期パブリッシュ操作で、メッセージが正常に配信されたことの応答または確認を待つ最大時間。 このタイムアウト設定は、配信問題やネットワーク障害時にパブリッシャーの無限待機を防ぎ、データ信頼性確保に重要です。 | 3 秒 |
| Socket Send Buffer Size | ネットワーク送信性能を最適化するためのソケットバッファサイズを管理します。 | 1 MB |
| Batch Size | Pulsarメッセージ内にバッチングされる個別リクエストの最大数を指定します。 | 100 |
| Max Batch Bytes | Pulsarバッチ内で収集されるメッセージの最大サイズ(バイト単位)。通常、Pulsarブローカーのデフォルトバッチサイズ制限は1MBですが、EMQXのデフォルト値はPulsarメッセージのエンコードオーバーヘッドを考慮し、特に個々のメッセージが非常に小さい場合に備えて1MBよりやや低く設定されています。この制限を超える単一メッセージは別バッチとして送信されます。 | 900 KB |
| Connect Timeout | TCP接続確立の最大待機時間。認証が有効な場合は認証時間も含みます。 | 5 秒 |
| Buffer Mode | メッセージ送信前にバッファに保存するかどうかを定義します。メモリバッファリングは送信速度を向上させます。memory: メッセージはメモリにバッファされます。EMQXノード再起動時に失われます。disk: メッセージはディスクにバッファされ、EMQXノード再起動後も保持されます。hybrid: メッセージは最初メモリにバッファされ、一定の制限(segment_bytes設定参照)に達すると徐々にディスクにオフロードされます。メモリモード同様、EMQXノード再起動時に失われます。 | Memory |
| Per-partition Buffer Limit | 各Pulsarパーティションに許容される最大バッファサイズ(バイト単位)。この制限に達すると、古いメッセージが破棄されてバッファ領域が回収され、新しいメッセージのためのスペースが確保されます。 メモリ使用量とパフォーマンスのバランスを取るための設定です。 | 2 GB |
| Segment File Bytes | バッファモードがdiskまたはhybridの場合に適用される設定で、メッセージ保存に使われるセグメントファイルのサイズを制御し、ディスクストレージの最適化レベルに影響します。 | 100 MB |
| Memory Overload Protection | バッファモードがmemoryの場合に適用される設定で、高メモリ使用時にEMQXが古いバッファメッセージを自動的に破棄します。過剰なメモリ使用によるシステム不安定化を防ぎ、システムの信頼性を確保します。注意: 高メモリ使用の閾値は設定パラメータ sysmon.os.sysmem_high_watermark で定義されます。この設定はLinuxシステムのみ有効です。 | disabled |
| Start Timeout | コネクターが自動起動したリソースの正常状態到達を待機する最大時間(秒単位)です。これにより、Polarなどの接続リソースが完全に稼働しデータ処理可能になるまで操作を進めないようにします。 | 5 秒 |
| Health Check Interval | アクションの稼働状態をチェックする間隔時間です。 | 15 秒 |