Skip to content

RabbitMQへのMQTTデータ取り込み

RabbitMQは、Advanced Message Queuing Protocol(AMQP)を実装した広く利用されているオープンソースのメッセージブローカーです。分散システム間のメッセージングにおいて堅牢かつスケーラブルなプラットフォームを提供します。EMQX CloudはRabbitMQとの統合をサポートしており、MQTTメッセージやイベントをRabbitMQに転送できます。また、RabbitMQサーバーからデータを消費し、EMQX Cloudの特定トピックにパブリッシュすることも可能で、RabbitMQからMQTTへのメッセージ配信を実現します。

本ページでは、EMQX CloudとRabbitMQ間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。

動作概要

RabbitMQデータ統合は、MQTTベースのIoTデータとRabbitMQの強力なメッセージキュー処理機能を橋渡しするために、EMQX Cloudに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQX CloudからRabbitMQへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQX CloudとRabbitMQ間のデータ統合の典型的なアーキテクチャを示しています。

emqx-integration-rabbitmq

MQTTデータのRabbitMQへの取り込みは以下のように動作します:

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、リアルタイムのMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloudで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、RabbitMQにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
  3. RabbitMQへのメッセージ取り込み:ルールによる処理が完了すると、メッセージをRabbitMQに転送するアクションがトリガーされます。処理済みのメッセージはシームレスにRabbitMQに書き込まれます。
  4. データの永続化と活用:RabbitMQはメッセージをキューに保存し、適切なコンシューマに配信します。メッセージは他のアプリケーションやサービスで消費され、データ分析、可視化、保存などのさらなる処理に利用されます。

特長とメリット

RabbitMQとのデータ統合は、以下の特長と利点をビジネスにもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQX Cloudはデバイスからクラウドへの信頼性の高い接続とメッセージ配信を保証し、RabbitMQはメッセージの永続化とサービス間の信頼性の高い配信を担い、各プロセスにおけるデータの信頼性を確保します。
  • MQTTメッセージの変換:ルールエンジンを用いて、EMQX CloudはMQTTメッセージのフィルタリングや変換が可能です。メッセージはRabbitMQに送信される前にデータ抽出、フィルタリング、拡充、変換を受けられます。
  • 柔軟なメッセージマッピング:RabbitMQデータ統合はMQTTトピックからRabbitMQのルーティングキーおよびエクスチェンジへの柔軟なマッピングをサポートし、MQTTとRabbitMQ間のシームレスな統合を実現します。
  • 高可用性とクラスターサポート:EMQX CloudおよびRabbitMQは共に高可用なメッセージブローカークラスターの構築をサポートし、ノード障害時でもサービス提供を継続可能にします。クラスター機能を活用することで優れたスケーラビリティも実現します。
  • 高スループットシナリオでの処理能力:RabbitMQデータ統合は同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

はじめる前に

このセクションでは、RabbitMQデータ統合の作成を開始する前に必要な準備として、RabbitMQサーバーの作成方法およびテスト用のRabbitMQエクスチェンジとキューの作成方法について説明します。

前提条件

  • データ統合の知識
  • EMQX Cloudデータ統合のルールの知識
  • UNIXターミナルおよびコマンドの基本知識

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

RabbitMQサーバーの起動

ここでは、Dockerを使用してRabbitMQサーバーを起動する方法を紹介します。

以下のコマンドを実行すると、管理プラグインが有効なRabbitMQサーバーが起動します。管理プラグインにより、WebインターフェースでRabbitMQを監視できます。

bash
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management

Docker HubのRabbitMQのDocker実行に関する情報もご参照ください。

メッセージ受信用のエクスチェンジとキューの作成

RabbitMQサーバー起動後、RabbitMQ管理Webインターフェースを使って、EMQX Cloudから転送されるメッセージ受信用のテスト用エクスチェンジとキューを作成できます。既にテスト用のエクスチェンジとキューがある場合は、このセクションはスキップ可能です。

  1. Webブラウザで http://{ip address}:15672/ にアクセスし、RabbitMQ管理Webインターフェースを開きます。ログイン画面で以下のデフォルト認証情報を入力し、Loginをクリックします。

    • Usernameguest
    • Passwordguest
  2. 上部メニューのExchangesタブをクリックします。Add a new exchangeを展開し、以下の情報を入力します:

    • Nametest_exchangeと入力
    • Type:ドロップダウンリストからdirectを選択
    • DurabilityDurableを選択し、エクスチェンジを永続化(RabbitMQサーバー再起動後も存在)
    • Auto deleteNo
    • InternalNo
    • Arguments:空欄のまま
  3. Add exchangeボタンをクリックしてテスト用エクスチェンジを作成します。

  4. 上部メニューのQueuesタブをクリックします。Add a new queueを展開し、以下の情報を入力します:

    • TypeDefault for virtual host
    • Nametest_queueと入力
    • DurabilityDurableを選択し、キューを永続化
    • Arguments:空欄のまま
  5. Add queueボタンをクリックしてテスト用キューを作成します。新しいtest_queueAll queuesセクションに表示されます。

  6. キュー名のtest_queueをクリックして詳細ページを開きます。Bindingsを展開し、Add binding to this queueセクションに以下を入力します:

    • From exchangetest_exchangeと入力
    • Routing keytest_routing_keyと入力
    • Arguments:空欄のまま
  7. Bindボタンをクリックし、test_queueを指定したルーティングキーでtest_exchangeにバインドします。

コネクターの作成

データ統合ルールを作成する前に、RabbitMQサーバーにアクセスするためのRabbitMQコネクターを作成する必要があります。

  1. デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ転送カテゴリの下にあるRabbitMQを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ転送カテゴリの下のRabbitMQを選択します。

  2. コネクター名:システムが自動的にコネクター名を生成します。

  3. 接続情報を入力します:

    • Server:RabbitMQサーバーがローカルの場合はlocalhost、リモートの場合は実際のホスト名またはIPアドレスを入力
    • Port5672(異なる場合は実際のポート番号)
    • Usernameguest
    • Passwordguest
    • Virtual Host:RabbitMQの仮想ホスト。デフォルトは/
    • Enable TLS:暗号化接続を確立する場合はトグルスイッチをオンにする
    • ビジネスニーズに応じて詳細設定を構成(任意)
  4. テストボタンをクリックします。RabbitMQサービスにアクセス可能な場合、connector availableのメッセージが表示されます。

  5. 新規ボタンをクリックして作成を完了します。

プロデューサールールの作成

ここでは、RabbitMQプロデューサールールを作成し、EMQX Cloudコンソールを通じてルールにアクションを追加する方法を示します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信したときにエンジンをトリガーするSQLを記述します。

    sql
     SELECT
      timestamp, clientid, payload
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL Examplesをクリックし、Try It OutでSQLルールの学習とテストが可能です。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. EMQX CloudからRabbitMQサービスへのメッセージパブリッシュ情報を設定します:

    • Exchange:前に作成したtest_exchangeを入力。メッセージはこのエクスチェンジにパブリッシュされます。

    • Routing Key:前に作成したtest_routing_keyを入力。RabbitMQでメッセージパブリッシュ時に使用されるルーティングキーです。

    • Message Delivery Modeのドロップダウンでnon_persistentまたはpersistentを選択:

      • non_persistent(デフォルト):メッセージはディスクに永続化されず、RabbitMQ再起動やクラッシュ時に失われる可能性があります。

      • persistent:メッセージはディスクに永続化され、RabbitMQ再起動やクラッシュ時の耐久性を提供します。

      TIP

      RabbitMQ再起動時のメッセージ紛失を防ぐため、キューとエクスチェンジも永続化(Durable)に設定する必要があります。詳細はRabbitMQのドキュメントをご参照ください。

    • Payload Template:デフォルトは空文字列で、メッセージペイロードは変更されずJSON形式のテキストとしてRabbitMQに転送されます。

      プレースホルダーを用いてカスタムメッセージペイロード形式を定義することも可能です。例えば、MQTTメッセージのペイロードとタイムスタンプをRabbitMQメッセージに含めたい場合、以下のテンプレートを使用します:

      json
      {"payload": "${payload}", "timestamp": ${timestamp}}

      このテンプレートは、受信したMQTTメッセージのペイロードとタイムスタンプを含むJSON形式のメッセージを生成します。${payload}${timestamp}はプレースホルダーで、実際のメッセージ値に置き換えられます。

    • Wait for Publish Confirmations:デフォルトで有効。メッセージがRabbitMQに正常にパブリッシュされたことを保証します。

      TIP

      このオプションを有効にすると、RabbitMQブローカーはメッセージの受信を確認してから成功とみなすため、メッセージ配信の信頼性が向上します。

  6. 詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを必要に応じて設定します(任意)。

  7. 確認ボタンをクリックしてルール作成を完了します。

  8. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

プロデューサールールのテスト

温度と湿度データの送信をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも可能です。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. アクションとルールが正常に作成されていれば、指定したルーティングキーでRabbitMQサーバーの指定エクスチェンジにメッセージがパブリッシュされているはずです。RabbitMQ管理コンソール(http://{ip address}:15672)にアクセスし、Queuesセクションに移動します。

  3. メッセージが適切なキューにルーティングされていることを確認します。キューをクリックし、**Get Message(s)**ボタンを押して詳細なメッセージ内容を確認します。

    • ペイロード:

      json
      {"payload":
       "{
       "temp": "27.5",
       "hum": "41.8"
        }",
        "timestamp": 1711333401673
      }

RabbitMQソースルールの作成

ここでは、RabbitMQソースルールを作成し、RabbitMQソースから消費したメッセージをEMQX Cloudに転送し、トピックt/1にパブリッシュするリパブリッシュアクションをルールに追加する方法を示します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. ルールIDにmy_rule_sourceを入力します。

  3. ルールをトリガーするソース(データ入力)を設定します。ページ右側の**アクション(ソース)**タブをクリックし、**新規アクション(ソース)**をクリックしてRabbitMQソースを作成します。

  4. スライドパネルでソースタイプとして**RabbitMQ(Source)**を選択し、次へをクリックして設定ステップに進みます。

  5. コネクターのドロップダウンから先ほど作成したmy-rabbitmqコネクターを選択します。ドロップダウン横の作成ボタンをクリックすると、ポップアップで新規コネクターを素早く作成可能です。設定パラメータはコネクターの作成を参照してください。

  6. RabbitMQからEMQX Cloudへメッセージを消費するためのソース情報を設定します:

    • Queue:RabbitMQで事前に作成したキュー名message-sendを入力
    • No Ack:状況に応じて、RabbitMQからメッセージをno_ackモードで消費するか選択。no_ackモードを有効にすると、RabbitMQはメッセージをコンシューマが処理完了を確認する前に即座にキューから削除します。
    • Wait for Publish Confirmations:メッセージパブリッシャーのアックを使用する際に、RabbitMQの確認を待つかどうかを指定
  7. 詳細設定(任意):デフォルト値を使用します。

  8. 確認ボタンをクリックしてソースの作成を完了し、ルールのデータ入力に追加します。同時に、ルールSQLが以下のように変更されます:

    sql
    SELECT
    *
    FROM
    "$bridges/rabbitmq:my-rabbitmq-source"

    RabbitMQソースから以下のフィールドにアクセス可能で、SQLを調整してデータ処理を行えます。ここではデフォルトのSQLを使用可能です。

    フィールド名説明
    payloadRabbitMQメッセージの内容
    eventイベントトピック。$bridges/rabbitmq:<source name>形式
    metadataルールID情報
    timestampメッセージがEMQXに到着したタイムスタンプ
    nodeメッセージが到着したEMQXノード名
    queueメッセージが消費されたキュー名
    exchangeメッセージがルーティングされたエクスチェンジ名
    routing_keyエクスチェンジからキューへのメッセージルーティングに使用されたルーティングキー
  9. 次へをクリックし、出力アクションを作成します。

  10. 新規出力アクションでリパブリッシュを選択します。

  11. メッセージリパブリッシュ設定を入力します:

    • トピック:MQTTにパブリッシュするトピック。ここではt/1を入力。
    • QoS012、または${qos}を選択。${qos}を選ぶと元のメッセージのQoSに従います。
    • Retaintrueまたはfalseを選択。メッセージをリテインメッセージとしてパブリッシュするかどうかを決定。プレースホルダーも使用可能。ここではfalseを選択。
    • ペイロード:転送メッセージペイロード生成用のテンプレート。空欄の場合はルール出力結果をそのまま転送。ここでは${payload}を入力し、ペイロードのみ転送。
    • MQTT 5.0メッセージプロパティ:デフォルトで無効。詳細はリパブリッシュアクションの追加を参照。
  12. その他の設定はデフォルトのままにし、確認ボタンをクリックして出力アクションの作成を完了します。

作成成功後、新規ルールページに戻り、ルール一覧に新規ルールが表示されます。リパブリッシュアクションは現在**アクション(ソース)**には表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部にリパブリッシュ出力アクションが確認できます。

RabbitMQソースルールのテスト

  1. MQTTX CLIを使用してトピックt/1をサブスクライブします:

    bash
    mqttx sub -t t/1
  2. 以下のコマンドでRabbitMQにメッセージをパブリッシュできます:

    bash
    rabbitmqadmin --username=guest --password=guest \
         publish routing_key=message-send \
         payload="{ \"msg\": \"Hello EMQX\"}"
    • publishはメッセージをパブリッシュするコマンドです。
    • routing_key=message-sendオプションはメッセージのルーティングキーを設定します。この例ではキュー名をルーティングキーとして使用しています。
    • payload="{ \"msg\": \"Hello EMQX\"}"オプションはメッセージ内容を設定します。

    または、RabbitMQ管理インターフェースからもメッセージをパブリッシュ可能です:

    1. 上部メニューのQueuesタブをクリックします。
    2. Name列のmessage-sendをクリックして詳細ページを開きます。
    3. Publish messageを展開し、Payloadボックスに"Hello EMQX"と入力し、Publish messageボタンをクリックします。
  3. MQTTXに以下のような出力が表示されます:

    bash
    [2024-2-23] [16:59:28] › payload: {"payload":{"msg":"Hello EMQX"},"event":"$bridges/rabbitmq:my-rabbitmq-source","metadata":{"rule_id":"rule_0ly1"},"timestamp":1708678768449,"node":"emqx@127.0.0.1"}