Skip to content

RabbitMQへのMQTTデータ取り込み

RabbitMQは、Advanced Message Queuing Protocol(AMQP)を実装した広く使われているオープンソースのメッセージブローカーです。分散システム間のメッセージングにおいて堅牢でスケーラブルなプラットフォームを提供します。EMQXはRabbitMQとの統合をサポートしており、MQTTメッセージやイベントをRabbitMQに転送することが可能です。また、RabbitMQサーバーからデータを消費し、EMQXの特定のトピックにパブリッシュすることもでき、RabbitMQからMQTTへのメッセージ配信を実現します。

本ページでは、EMQXとRabbitMQ間のデータ統合について詳細に解説し、データ統合の作成および検証に関する実践的な手順を提供します。

動作概要

RabbitMQデータ統合は、MQTTベースのIoTデータとRabbitMQの強力なメッセージキュー処理機能を橋渡しするためにEMQXに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQXからRabbitMQへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

RabbitMQ Sinkを例にとると、以下の図はEMQXとRabbitMQ間の典型的なデータ統合アーキテクチャを示しています:

EMQX Integration RabbitMQ

MQTTデータをRabbitMQに取り込む流れは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXに正常に接続し、リアルタイムのMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQXで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、RabbitMQにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、ペイロードの付加的なコンテキストによる強化などの変換が適用されます。
  3. RabbitMQへのメッセージ取り込み:ルールによる処理が完了すると、RabbitMQへのメッセージ転送アクションがトリガーされます。処理済みのメッセージはシームレスにRabbitMQに書き込まれます。
  4. データの永続化と活用:RabbitMQはメッセージをキューに保存し、適切なコンシューマーに配信します。メッセージは他のアプリケーションやサービスによって消費され、データ分析、可視化、保存などのさらなる処理に利用されます。

特徴と利点

RabbitMQとのデータ統合は、以下の特徴とメリットをビジネスにもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQXはデバイスからクラウドへの信頼性の高い接続とメッセージ配信を保証し、RabbitMQはメッセージの永続化と異なるサービス間での信頼性の高い配信を担い、全体のデータ信頼性を確保します。
  • MQTTメッセージの変換:ルールエンジンを利用して、EMQXはMQTTメッセージのフィルタリングや変換を行えます。データ抽出、フィルタリング、強化、変換を経てRabbitMQに送信可能です。
  • 柔軟なメッセージマッピング:RabbitMQデータ統合はMQTTトピックとRabbitMQのルーティングキーおよびエクスチェンジの柔軟なマッピングをサポートし、MQTTとRabbitMQ間のシームレスな統合を実現します。
  • 高可用性およびクラスター対応:EMQXとRabbitMQは共に高可用なメッセージブローカークラスターの構築をサポートし、ノード障害時にもサービス継続を保証します。クラスター機能により優れたスケーラビリティも提供します。
  • 高スループットシナリオでの処理能力:RabbitMQデータ統合は同期および非同期の書き込みモードをサポートし、レイテンシとスループットのバランスをシナリオに応じて柔軟に調整可能です。

はじめる前に

このセクションでは、RabbitMQデータ統合の作成を開始する前に必要な準備について説明します。RabbitMQサーバーの作成方法やテスト用のエクスチェンジ・キューの作成方法も含みます。

前提条件

RabbitMQサーバーの起動

ここではDockerを使用してRabbitMQサーバーを起動する方法を紹介します。

管理プラグインを有効にしたRabbitMQサーバーを起動するには、以下のコマンドを実行してください。管理プラグインによりWebインターフェースでRabbitMQを確認できます。

bash
docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management

詳細はDocker HubのRabbitMQのドキュメントをご覧ください。

メッセージ受信用のエクスチェンジとキューの作成

RabbitMQサーバー起動後、RabbitMQ管理Webインターフェースを使って、EMQXから転送されるメッセージ受信用のテスト用エクスチェンジとキューを作成できます。既にテスト用のエクスチェンジとキューがある場合はこのセクションはスキップ可能です。

  1. ブラウザで http://localhost:15672/ にアクセスし、RabbitMQ管理Webインターフェースを開きます。ログイン画面で以下のデフォルト認証情報を入力し、Loginをクリックします。
    • Username: guest
    • Password: guest
  2. 上部メニューのExchangesタブをクリックし、Add a new exchangeを展開して以下を入力します:
    • Name: test_exchange と入力
    • Type: ドロップダウンから direct を選択
    • Durability: Durable を選択し、RabbitMQサーバー再起動後もエクスチェンジが存在するように設定
    • Auto delete: No
    • Internal: No
    • Arguments: 空欄のまま
  3. Add exchangeボタンをクリックしてエクスチェンジを作成します。
  4. 上部メニューのQueuesタブをクリックし、Add a new queueを展開して以下を入力します:
    • Type: Default for virtual host
    • Name: test_queue と入力
    • Durability: Durable を選択し、キューを永続化
    • Arguments: 空欄のまま
  5. Add queueボタンをクリックしてキューを作成します。新しいtest_queueAll queuesセクションに表示されます。
  6. キュー名のtest_queueをクリックして詳細ページを開き、Bindingsを展開します。Add binding to this queueセクションに以下を入力します:
    • From exchange: test_exchange と入力
    • Routing key: test_routing_key と入力
    • Arguments: 空欄のまま
  7. Bindボタンをクリックし、test_queuetest_exchangeに指定したルーティングキーでバインドします。

メッセージ送信用のキュー作成

RabbitMQ管理Webインターフェースを使って、RabbitMQメッセージ送信用のキューを作成できます。

  1. RabbitMQ管理Webインターフェースにログインします。
  2. 上部メニューのQueuesタブをクリックし、Add a new queueを展開して以下を入力します。
    • Type: Default for virtual host
    • Name: message-send と入力
    • Durability: Durable を選択し、キューを永続化
    • Arguments: 空欄のまま
  3. Add queueボタンをクリックしてキューを作成します。新しいmessage-sendAll queuesセクションに表示されます。

コネクターの作成

このセクションでは、Rabbit Sink/SourceをRabbitMQサーバーに接続するためのコネクター作成方法を説明します。

以下の手順はEMQXとRabbitMQをローカルマシンで実行していることを前提としています。RabbitMQが別の場所にある場合は設定を適宜調整してください。

  1. ダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. 画面右上のCreateをクリックします。
  3. Create ConnectorページでRabbitMQを選択し、Nextをクリックします。
  4. コネクター名を入力します。大文字・小文字の英数字の組み合わせで、例:my_rabbitmq
  5. 接続情報を入力します。
    • Server: RabbitMQサーバーがローカルならlocalhost、リモートなら実際のホスト名/IPを入力
    • Port: デフォルトは5672、異なる場合は実際のポートを入力
    • Username: guest
    • Password: guest
    • Virtual Host: RabbitMQの仮想ホスト、デフォルトは/
    • 暗号化接続を確立したい場合はEnable TLSトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがRabbitMQサーバーに接続可能かテストできます。
  7. 画面下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector ListまたはCreate Ruleを選択可能です。Create Ruleを選択すると以下のオプションがあります:

RabbitMQ Sinkを使ったルールの作成

このセクションでは、ダッシュボードでMQTTのソーストピックt/#からメッセージを処理し、処理済みデータを設定済みのRabbitMQキューtest_queueに転送するルールの作成方法を説明します。

SQLを定義したルールの作成

  1. EMQXダッシュボードで、Integration -> Rulesをクリックします。

  2. 画面右上のCreateをクリックします。

  3. ルールIDを入力します。例:my_rule

  4. SQLエディターに以下のステートメントを入力します。これはトピックパターンt/#にマッチするMQTTメッセージを転送します。

    sql
    SELECT
      payload,
      now_timestamp() as timestamp
    FROM
      "t/#"

    TIP

    初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。

  5. ルールにアクションを追加し、Sinkを設定します。詳細はAdd RabbitMQ Sink to the Ruleを参照してください。

  6. アクション追加後、Action Outputsセクションに新しいSinkが表示されます。Create RuleページのSaveボタンをクリックしてルール作成を完了します。

これでルールが正常に作成されました。Rulesページで新規ルールを確認でき、**Actions (Sink)**タブで新しいRabbitMQ Sinkも確認できます。

また、Integration -> Flow Designerをクリックするとトポロジーを視覚的に確認できます。トポロジーはトピックt/#のメッセージがルールmy_ruleで解析され、RabbitMQに書き込まれる流れを示します。

RabbitMQ Sinkの追加

このセクションでは、処理結果をRabbitMQに書き込むためにルールにSinkを追加する方法を説明します。

  1. Create Ruleページで、Action OutputsセクションのAdd Actionをクリックし、ルールでトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをRabbitMQに送信します。

  2. Type of ActionドロップダウンリストからRabbitMQを選択します。ActionドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能ですが、ここでは新規Sinkを作成します。

  3. Sinkの名前を入力します。大文字・小文字の英数字の組み合わせで指定してください。

  4. Connectorドロップダウンからmy_rabbitmqを選択します。新規コネクターを作成する場合はドロップダウン横のボタンから作成可能です。設定パラメータはCreate a Connectorを参照してください。

  5. Sinkの設定を以下のように行います:

    • Exchange: 事前に作成したtest_exchangeを入力します。ここにメッセージがパブリッシュされます。

      注意

      RabbitMQにエクスチェンジが存在することを確認してください。存在しない場合、アクションは一時的に動作しなくなり、定期的に再接続を試みます。

    • Routing Key: 事前に作成したtest_routing_keyを入力します。RabbitMQのメッセージパブリッシュ用ルーティングキーです。

      TIP

      エクスチェンジとルーティングキーはテンプレート値として設定可能で、プレースホルダーを使い受信したMQTTメッセージのペイロードから動的に値を抽出しルーティングできます。

      例:ルーティングキーをペイロード内のフィールドに基づいて動的に設定する場合、${payload.akey}と設定します。これによりペイロードのakeyフィールドの値がルーティングキーとして使われます。

      注意:バッチモードでは、エクスチェンジとルーティングキーのテンプレート値はバッチ内の全メッセージで一定でなければなりません。これにより一貫したルーティングが保証され、バッチ処理時の競合を防ぎます。

    • Virtual Host: RabbitMQの仮想ホストを入力します。デフォルトは/です。

    • Message Delivery Modeドロップダウンでnon_persistentまたはpersistentを選択します:

      • non_persistent(デフォルト):メッセージはディスクに永続化されず、RabbitMQの再起動やクラッシュ時に失われる可能性があります。

      • persistent:メッセージはディスクに永続化され、RabbitMQの再起動やクラッシュ時にも耐久性があります。

        TIP

        RabbitMQのキューとエクスチェンジも永続化設定にする必要がある場合があります。詳細はRabbitMQのドキュメントを参照してください。

    • Wait for Publish Confirmations:デフォルトで有効。メッセージがRabbitMQに正常にパブリッシュされたことを確認します。

      TIP

      このオプションを有効にすると、RabbitMQブローカーはメッセージ受領をアック(ACK)してからパブリッシュ成功とみなすため、メッセージ配信の信頼性が向上します。

    • Headers TemplateおよびProperties Template:テンプレートを使ってRabbitMQのカスタムヘッダーおよびプロパティを定義できます。詳細はSet Headers and Properties Templatesを参照してください。

    • Payload Template:デフォルトは空文字列で、メッセージペイロードはJSON形式のテキストとしてRabbitMQにそのまま転送されます。

      プレースホルダーを使い、受信したMQTTメッセージのデータを動的に含めるカスタムペイロードフォーマットも定義可能です。例えば、MQTTメッセージのペイロードとタイムスタンプをRabbitMQメッセージに含めたい場合、以下のテンプレートを使えます:

      json
       {"payload": "${payload}", "timestamp": ${timestamp}}

      このテンプレートは、受信したMQTTメッセージのペイロードとタイムスタンプを含むJSON形式のメッセージを生成します。${payload}${timestamp}はプレースホルダーで、実際の値に置き換えられます。

  6. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のために、1つ以上のフォールバックアクションを定義可能です。詳細はFallback Actionsを参照してください。

  7. 詳細設定(任意)

    • Publish Confirmation Timeout:デフォルトは30秒。パブリッシュ確認のタイムアウト時間で、ブローカーのアックを待つ最大時間です。
    • 必要に応じてsyncまたはasyncクエリモードを選択可能です。詳細はFeatures of Sinkを参照してください。
  8. Createをクリックする前に、Test ConnectivityをクリックしてSinkがRabbitMQサーバーに接続可能かテストできます。

  9. CreateボタンをクリックしてSinkの設定を完了します。作成成功後、ルール作成ページに戻り、新しいSinkがAction Outputsに追加されます。

HeadersおよびPropertiesテンプレートの設定

EMQX 6.0以降、RabbitMQ Sinkアクション作成時にカスタムのRabbitMQヘッダーおよびプロパティを定義可能です。これにより、メッセージにメタデータを直接付加し、RabbitMQ内での互換性やルーティングの柔軟性が向上します。

これらのフィールドはルールSQLの結果変数(例:${payload.device_id})を使ってテンプレート化できます。ヘッダーおよびプロパティのテンプレートは任意で、空欄の場合は追加メタデータは付加されません。

Headersテンプレートの設定方法

1つ以上のキー・バリューのペアをRabbitMQヘッダーとして追加できます。これらはユーザー定義のメタデータで、RabbitMQのコンシューマーが解釈可能です。

  • Key:ヘッダー名。文字列で指定。
  • Value:キーに対応する値。静的文字列またはテンプレート変数を使用可能。

例:MQTTペイロードのデバイスIDを含める場合

KeyValue
device_id${payload.device_id}
Propertiesテンプレートの設定方法

RabbitMQは標準のメッセージプロパティセットをサポートしています。EMQXではこれらを定義し、コンテンツタイプや相関IDなどのメッセージレベルのメタデータを提供可能です。

  • Key:以下の有効なプロパティキーから選択(無効なキーは無視されます)。
  • Value:静的値またはテンプレート変数を設定。

有効なプロパティキー:

  • content_type
  • content_encoding
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id

例:コンテンツタイプとアプリケーションIDを指定する場合

KeyValue
content_typeapplication/json
app_idmy_iot_app
利用例

MQTTメッセージペイロードが以下の場合:

json
{
  "device_id": "sensor-123",
  "status": "ok"
}

以下の設定を行うとします:

  • ヘッダーdevice_idをMQTTペイロードから設定
  • プロパティapp_idを静的値で設定

設定例:

Headers Template:

KeyValue
device_id${payload.device_id}

Properties Template:

KeyValue
app_idmy_app

この設定により、RabbitMQに転送されるすべてのメッセージには以下が含まれます:

  • コンシューマー向けのカスタムメタデータ(Headers)
  • メッセージ処理やデバッグ用の標準メタデータ(Properties)

RabbitMQ Sinkを使ったルールのテスト

EMQXダッシュボード内蔵のWebSocketクライアントを使ってルールとSinkのテストが可能です。

  1. ダッシュボード左のナビゲーションメニューからDiagnose -> WebSocket Clientをクリックします。

  2. 現在のEMQXインスタンスへの接続情報を入力します。

    • ローカルでEMQXを実行している場合はデフォルト値を使用可能です。
    • 認証設定を変更している場合はユーザー名・パスワードの入力が必要です。
  3. ConnectをクリックしてクライアントをEMQXに接続します。

  4. ページ下部のパブリッシュエリアで以下を入力します:

    • Topic: t/test
    • Payload: Hello World RabbitMQ from EMQX
    • QoS: 2
  5. Publishをクリックしてメッセージを送信します。

    Sinkとルールが正常に作成されていれば、指定したルーティングキーでRabbitMQサーバーの指定エクスチェンジにメッセージがパブリッシュされているはずです。

  6. http://localhost:15672 のRabbitMQ管理コンソールにアクセスし、Queuesセクションに移動します。

    TIP

    デフォルト設定の場合、ユーザー名・パスワードともにguestを使用してください。

  7. メッセージが適切なキューにルーティングされていることを確認します。キューをクリックし、**Get Message(s)**ボタンを押すと詳細なメッセージ内容を確認できます。

bridge_igress

RabbitMQ Sourceを使ったルールの作成

このセクションでは、RabbitMQキューからEMQXへデータを転送するルールの作成方法を説明します。RabbitMQ Sourceとメッセージリパブリッシュアクションの両方を作成し、RabbitMQサービスからメッセージを消費してEMQXに転送します。

  1. ダッシュボードのIntegration -> Rulesページに移動します。

  2. 画面右上のCreateをクリックします。

  3. ルールIDにmy_rule_sourceを入力します。

  4. ルールをトリガーするソース(Data Inputs)を設定します。画面右側のData Inputsタブをクリックし、デフォルトのMessages入力を削除後、Add InputをクリックしてRabbitMQ Sourceを作成します。

  5. Add Inputポップアップで、Input TypeドロップダウンからRabbitMQを選択します。SourceドロップダウンはデフォルトのCreate Sourceのままにします。この例では新規Sourceを作成しルールに追加します。

  6. SourceのNameと(任意で)Descriptionを入力します。名前は大文字・小文字の英数字の組み合わせで、例:my-rabbitmq-source

  7. Connectorドロップダウンから先ほど作成したmy-rabbitmqコネクターを選択します。新規コネクター作成はドロップダウン横のボタンから可能です。設定パラメータはCreate a Connectorを参照してください。

  8. RabbitMQからEMQXへメッセージを消費するためのSource情報を設定します:

    • Queue:RabbitMQで作成済みのmessage-sendキュー名を入力
    • No Ack:RabbitMQのno_ackモードでメッセージを消費するか選択。no_ack有効時は、RabbitMQはメッセージを消費者からのアックを待たず即座にキューから削除します。
    • Wait for Publish Confirmations:メッセージパブリッシャーのアックを利用する場合、RabbitMQの確認を待つか指定
  9. 詳細設定(任意):デフォルト値を使用

  10. CreateボタンをクリックしてSourceを作成し、ルールのデータ入力に追加します。同時にルールSQLは以下のように変更されます:

    sql
    SELECT
    *
    FROM
    "$bridges/rabbitmq:my-rabbitmq-source"

    ルールSQLはRabbitMQ Sourceから以下のフィールドにアクセス可能で、SQLを調整してデータ処理が可能です。ここではデフォルトSQLを使用します。

    フィールド名説明
    payloadRabbitMQメッセージの内容
    eventイベントトピック。形式は$bridges/rabbitmq:<source name>
    metadataルールID情報
    timestampメッセージがEMQXに到着したタイムスタンプ
    nodeメッセージが到着したEMQXノード名
    queueメッセージを消費したキュー名
    exchangeメッセージがルーティングされたエクスチェンジ
    routing_keyエクスチェンジからキューへのルーティングに使われたルーティングキー

ここまででRabbitMQ Sourceの作成は完了しましたが、購読したデータはまだEMQXに直接パブリッシュされません。次に、SourceのメッセージをEMQXに転送するためのメッセージリパブリッシュアクションを作成します。

rabbitmq_source

ルールに再パブリッシュアクションを追加

このセクションでは、RabbitMQ Sourceから消費したメッセージを転送し、EMQXトピックt/1にパブリッシュするためのリパブリッシュアクションの追加方法を説明します。

  1. 画面右側のAction Outputタブを選択し、Add Actionボタンをクリックします。Type of ActionドロップダウンからRepublishアクションを選択します。

  2. メッセージリパブリッシュの設定を入力します:

    • Topic:MQTTにパブリッシュするトピック。ここではt/1を入力。
    • QoS012${qos}のいずれかを選択、またはプレースホルダーを入力して他のフィールドからQoSを設定可能。${qos}を選択すると元のメッセージのQoSに従います。
    • Retaintrueまたはfalseを選択。メッセージをリテインメッセージとしてパブリッシュするかどうかを決定。プレースホルダーも利用可能。ここではfalseを選択。
    • Payload:転送するメッセージペイロードのテンプレート。空欄の場合はルールの出力結果をそのまま転送。ここでは${payload}を入力し、ペイロードのみ転送。
    • MQTT 5.0 Message Properties:デフォルトで無効。詳細設定はAdd Republish Actionを参照。
  3. Createをクリックしてアクション作成を完了します。成功するとルール作成ページに戻り、リパブリッシュアクションがAction Outputsタブに追加されます。

  4. ルール作成ページでCreateボタンをクリックし、ルール全体の作成を完了します。

これでルールが正常に作成されました。Rulesページで新規ルールを確認でき、SourcesタブでRabbitMQ Sourceも確認できます。

また、Integrate -> Flow Designerをクリックするとトポロジーを視覚的に確認でき、RabbitMQ Sourceからのメッセージがリパブリッシュを通じてt/1にパブリッシュされる様子が直感的に把握できます。

RabbitMQ Sourceを使ったルールのテスト

RabbitMQ Source付きルールのテスト

  1. MQTTX CLIを使い、トピックt/1をサブスクライブします。

    bash
    mqttx sub -t t/1
  2. 以下のコマンドでRabbitMQにメッセージを送信できます:

    bash
    rabbitmqadmin --username=guest --password=guest \
         publish routing_key=message-send \
         payload="{ \"msg\": \"Hello EMQX\"}"
    • publishはメッセージをパブリッシュするコマンドです。
    • routing_key=message-sendはメッセージのルーティングキーを設定します。この例ではキュー名をルーティングキーとして使用しています。
    • payload="{ \"msg\": \"Hello EMQX\"}"はメッセージ内容を設定します。

    または、RabbitMQ管理インターフェースからもメッセージをパブリッシュ可能です:

    1. 上部メニューのQueuesタブをクリック。
    2. Name列のmessage-sendをクリックし詳細ページを開く。
    3. Publish messageを展開し、Payload欄に"Hello EMQX"と入力し、Publish messageボタンをクリック。
  3. MQTTXに以下のような出力が表示されます。

    bash
    [2024-2-23] [16:59:28] › payload: {"payload":{"msg":"Hello EMQX"},"event":"$bridges/rabbitmq:my-rabbitmq-source","metadata":{"rule_id":"rule_0ly1"},"timestamp":1708678768449,"node":"emqx@127.0.0.1"}