Skip to content

Apache PulsarへMQTTデータをストリーミングする

Apache Pulsarは、アプリケーションやシステム間でリアルタイムデータストリームを効率的に送信するために設計された、人気のあるオープンソースの分散イベントストリーミングプラットフォームです。Apache Pulsarは、より高いスケーラビリティ、より高速なスループット、そして低いレイテンシを提供します。IoTアプリケーションでは、デバイスが生成するデータは通常、軽量なMQTTプロトコルを使って送信されます。Apache PulsarとEMQXのデータ統合により、ユーザーはMQTTデータを簡単にApache Pulsarへストリーミングし、IoTデバイスから生成されるデータのリアルタイム処理、保存、分析のために他のデータシステムと接続できます。

本ページでは、EMQXとPulsar間のデータ統合の詳細な概要と、データ統合の作成および検証に関する実践的な手順を提供します。

動作の仕組み

Apache Pulsarとのデータ統合は、EMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送信機能とPulsarの強力なデータ処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理のプロセスが簡素化されています。これにより、複雑なコーディングなしでMQTTデータをPulsarに送信し、Pulsarの強力なデータ処理機能を活用でき、IoTデータの管理と活用がより効率的かつ便利になります。

EMQX Data Integration - Apache Pulsar

EMQXはルールエンジンと設定されたSinkを通じてMQTTデータをApache Pulsarに転送し、全体のプロセスは以下の通りです:

  1. メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを介して正常に接続を確立し、その後特定のトピックにテレメトリおよびステータスデータをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンを使用して、特定のソースからのMQTTメッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
  3. Apache Pulsarへのデータストリーミング:ルールがメッセージをPulsarへ転送するアクションをトリガーし、データはPulsarのメッセージキーおよび値に簡単に設定できます。MQTTトピックはPulsarトピックにマッピング可能で、データの整理や識別が容易になり、後続のデータ処理や分析が促進されます。

MQTTメッセージデータがApache Pulsarに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:

  • Pulsarのコンシューマーアプリケーションを作成してこれらのメッセージをサブスクライブおよび処理します。ビジネスニーズに応じて、MQTTデータを他のデータソースと関連付けたり集約・変換したりして、リアルタイムのデータ同期と統合を実現できます。
  • 特定のMQTTメッセージを受信した際に、Pulsarのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現します。
  • Pulsar内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知を行ったり、条件に応じた対応アクションを実行したりします。
  • 複数のMQTTトピックからのデータを統合して単一のデータストリームに集約し、Pulsarの計算機能を活用してリアルタイムの集計、計算、分析を行い、より包括的なデータインサイトを得ます。

特長と利点

Pulsarとのデータ統合は、以下の特長と利点をビジネスにもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQXはMQTTメッセージをバッチで確実にPulsarへ送信でき、IoTデバイスとPulsarおよびアプリケーションシステムの統合を実現します。
  • MQTTメッセージ変換:ルールエンジンを活用し、EMQXはMQTTメッセージのフィルタリングや変換を行えます。メッセージはPulsarへ送信される前にデータ抽出、フィルタリング、付加、変換が可能です。
  • 柔軟なトピックマッピング:Pulsar SinkはMQTTトピックをPulsarトピックに柔軟にマッピングでき、Pulsarメッセージ内のキー(Key)や値(Value)の設定も簡単に行えます。
  • 柔軟なパーティション選択:Pulsar SinkはMQTTトピックやクライアントに基づいて異なる戦略でPulsarのパーティションを選択でき、データの整理や識別に柔軟性を提供します。
  • 高スループットシナリオでの処理能力:Pulsar Sinkは同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

はじめる前に

このセクションでは、EMQXダッシュボードでPulsarデータ統合を作成する前に必要な準備について説明します。

前提条件

Pulsarのインストール

DockerでPulsarを実行します。

bash
docker run --rm -it -p 6650:6650 --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone -nfw -nss

詳細な操作手順は、Pulsarドキュメントのクイックスタートセクションを参照してください。

Pulsarトピックの作成

EMQXでデータ統合を作成する前に、関連するPulsarトピックを作成しておく必要があります。以下のコマンドで、publicテナント、defaultネームスペースに1パーティションのトピックmy-topicを作成します。

bash
docker exec -it pulsar bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic -p 1

コネクターの作成

このセクションでは、SinkをPulsarサーバーに接続するためのコネクター作成手順を示します。

以下の手順は、EMQXとPulsarがローカルマシンで実行されていることを前提としています。リモートで実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでPulsarを選択し、Nextをクリックします。
  4. Configurationステップで以下を設定します:
    • コネクター名を入力します。英数字の組み合わせで、例:my_pulsar
    • Bridge RoleはデフォルトでProducerが選択されています。
    • Pulsarサーバーへの接続およびメッセージ書き込み情報を設定します:
      • Serverspulsar://localhost:6650を入力します。リモート環境の場合は適宜変更してください。
      • Authentication:認証方式をnoneBasic auth、またはtokenから選択します。Basic authの場合、EMQXはUsernamePassword:で連結して認証文字列を作成します。
      • Enable TLS:暗号化接続を確立したい場合はトグルスイッチをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
  5. 詳細設定(任意):詳細設定を参照してください。
  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがPulsarサーバーに接続できるかテストできます。
  7. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成に進めます。詳細はCreate a Rule with Pulsar Sinkを参照してください。

Pulsar Sinkを使ったルールの作成

このセクションでは、ソースMQTTトピックt/#からのメッセージを処理し、処理済みデータを設定したSinkを介してPulsarトピックmy-topicに保存するルール作成手順を示します。

  1. EMQXダッシュボードでIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDを入力します。例:my_rule

  4. SQL Editorに以下のステートメントを入力します。これはトピックt/#のMQTTメッセージをPulsarに保存する例です。

    注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT部分に含めていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    注意:初心者の方はSQL ExamplesEnable TestをクリックしてSQLルールを学習・テストできます。

  5. + Add Actionボタンをクリックして、ルールによりトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをPulsarに送信します。

  6. Action TypeドロップダウンからPulsarを選択します。

  7. ActionドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのSinkを選択することも可能です。この例では新規にSinkを作成します。

  8. Sinkの名前を入力します。英数字の組み合わせで指定してください。

  9. Connectorドロップダウンから先ほど作成したmy_pulsarを選択します。新規コネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメータはCreate a Connectorを参照してください。

  10. Sinkの以下オプションを設定します:

    • Pulsar Topic Name:事前に作成したpersistent://public/default/my-topicを入力します。変数はサポートされていません。
    • Partition Strategy:プロデューサーがPulsarのパーティションにメッセージを振り分ける方法を選択します。randomroundrobinkey_dispatchのいずれかを選択可能です。
    • Compression:圧縮アルゴリズムの使用有無と、Pulsarメッセージ内のレコード圧縮・解凍に使うアルゴリズムを指定します。選択肢はno_compressionsnappyzlibです。
    • Retention Period:Pulsarトピックにパブリッシュされたメッセージの保持期間を定義します。この設定により、メッセージがサブスクライバーに消費可能な期間を制御できます。デフォルトはinfinityで、メッセージの自動期限切れはありません。秒数で数値を指定すると、その時間を超えたメッセージは自動的に期限切れとなりトピックから削除されます。
    • Message Key:Pulsarメッセージのキーを指定します。プレーン文字列またはプレースホルダー(${var})を含む文字列を入力可能です。
    • Message Value:Pulsarメッセージの値を指定します。プレーン文字列またはプレースホルダー(${var})を含む文字列を入力可能です。
  11. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はFallback Actionsを参照してください。

  12. 詳細設定(任意)詳細設定を参照してください。

  13. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがPulsarサーバーに接続できるかテスト可能です。

  14. CreateボタンをクリックしてSinkの設定を完了します。新しいSinkがAction Outputsに追加されます。

  15. Create Ruleページに戻り、設定内容を確認してからCreateボタンをクリックしてルールを生成します。

これでルールの作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいPulsar Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#のメッセージがPulsarに送信・保存されていることが確認できます。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信します:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Pulsar" }'

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

以下のPulsarコマンドでトピックpersistent://public/default/my-topicにメッセージが書き込まれているか確認します:

bash
docker exec -it pulsar bin/pulsar-client consume -n 0 -s mysubscriptionid -p Earliest persistent://public/default/my-topic

詳細設定

このセクションでは、Pulsar Sinkのパフォーマンスを最適化し、特定のシナリオに応じて動作をカスタマイズするための高度な設定オプションを説明します。Sink作成時にAdvanced Settingsを展開し、ビジネスニーズに合わせて以下の設定を行えます。

フィールド説明推奨値
Max Inflightプロデューサーが各パーティションに送信できるメッセージバッチの最大数。これを超えると応答を待つ必要があります。
数値を大きくするとスループットが向上します。
10
Sync Publish Timeout同期パブリッシュ操作で、メッセージが正常に配信されたことを確認するまでの最大待機時間(秒)。
このタイムアウトは、配信問題やネットワーク障害時にパブリッシャーが無限にブロックされるのを防ぎ、データ信頼性を確保します。
3
Socket Send Buffer Sizeソケットバッファのサイズを管理し、ネットワーク送信性能を最適化します。1 MB
Batch Size1つのPulsarメッセージ内にバッチングされる個別リクエストの最大数を指定します。100
Max Batch BytesPulsarバッチ内でメッセージを収集する最大バイト数。通常、Pulsarブローカーのデフォルトバッチサイズは1MBですが、EMQXのデフォルト値はメッセージエンコードのオーバーヘッドを考慮し、1MBよりやや小さく設定されています。単一メッセージがこの制限を超える場合は別バッチとして送信されます。900 KB
Query Modeメッセージ送信を最適化するために、asynchronousまたはsynchronousのクエリモードを選択可能です。非同期モードではPulsarへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがPulsar到着前にメッセージを受信する可能性があります。Async
Buffer Modeメッセージを送信前にバッファリングするかどうかを定義します。メモリバッファリングは送信速度を向上させます。
memory:メッセージはメモリにバッファされます。EMQXノード再起動時に失われます。
disk:メッセージはディスクにバッファされ、EMQXノード再起動後も保持されます。
hybrid:初めはメモリにバッファし、一定サイズ(segment_bytes設定参照)に達するとディスクに徐々にオフロードします。メモリモード同様、EMQXノード再起動時に失われます。
memory
Pulsar Per-partition Buffer Limit各Pulsarパーティションの最大バッファサイズ(バイト)。この制限に達すると、古いメッセージが破棄されてバッファスペースが確保されます。
メモリ使用量とパフォーマンスのバランス調整に役立ちます。
2 GB
Segment File Bytesバッファモードがdiskまたはhybridの場合に適用される設定で、メッセージ保存用のセグメントファイルのサイズを制御し、ディスクストレージの最適化に影響します。100 MB
Memory Overload Protectionバッファモードがmemoryの場合に適用される設定で、高メモリ使用時に古いバッファメッセージを自動破棄し、システムの不安定化を防ぎ信頼性を確保します。
:Linuxシステムでのみ有効です。
disabled
Start Timeoutコネクターが自動起動したリソースの正常状態を確認するために待機する最大時間(秒)です。この設定により、Polarなどの接続リソースが完全に稼働しデータ処理可能になるまで操作を進めないようにします。5
Health Check IntervalSinkの稼働状態をチェックする間隔(秒)です。1