RabbitMQへのMQTTデータ取り込み
RabbitMQは、Advanced Message Queuing Protocol(AMQP)を実装した広く利用されているオープンソースのメッセージブローカーです。分散システム間のメッセージングにおいて堅牢かつスケーラブルなプラットフォームを提供します。EMQX CloudはRabbitMQとの統合をサポートしており、MQTTメッセージやイベントをRabbitMQに転送できます。また、RabbitMQサーバーからデータを消費し、EMQX Cloudの特定トピックにパブリッシュすることも可能で、RabbitMQからMQTTへのメッセージ配信を実現します。
本ページでは、EMQX CloudとRabbitMQ間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。
動作概要
RabbitMQデータ統合は、MQTTベースのIoTデータとRabbitMQの強力なメッセージキュー処理機能を橋渡しするために、EMQX Cloudに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQX CloudからRabbitMQへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQX CloudとRabbitMQ間のデータ統合の典型的なアーキテクチャを示しています。

MQTTデータのRabbitMQへの取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、リアルタイムのMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloudで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、RabbitMQにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
- RabbitMQへのメッセージ取り込み:ルールによる処理が完了すると、メッセージをRabbitMQに転送するアクションがトリガーされます。処理済みのメッセージはシームレスにRabbitMQに書き込まれます。
- データの永続化と活用:RabbitMQはメッセージをキューに保存し、適切なコンシューマに配信します。メッセージは他のアプリケーションやサービスで消費され、データ分析、可視化、保存などのさらなる処理に利用されます。
特長とメリット
RabbitMQとのデータ統合は、以下の特長と利点をビジネスにもたらします:
- 信頼性の高いIoTデータメッセージ配信:EMQX Cloudはデバイスからクラウドへの信頼性の高い接続とメッセージ配信を保証し、RabbitMQはメッセージの永続化とサービス間の信頼性の高い配信を担い、各プロセスにおけるデータの信頼性を確保します。
- MQTTメッセージの変換:ルールエンジンを用いて、EMQX CloudはMQTTメッセージのフィルタリングや変換が可能です。メッセージはRabbitMQに送信される前にデータ抽出、フィルタリング、拡充、変換を受けられます。
- 柔軟なメッセージマッピング:RabbitMQデータ統合はMQTTトピックからRabbitMQのルーティングキーおよびエクスチェンジへの柔軟なマッピングをサポートし、MQTTとRabbitMQ間のシームレスな統合を実現します。
- 高可用性とクラスターサポート:EMQX CloudおよびRabbitMQは共に高可用なメッセージブローカークラスターの構築をサポートし、ノード障害時でもサービス提供を継続可能にします。クラスター機能を活用することで優れたスケーラビリティも実現します。
- 高スループットシナリオでの処理能力:RabbitMQデータ統合は同期および非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。
はじめる前に
このセクションでは、RabbitMQデータ統合の作成を開始する前に必要な準備として、RabbitMQサーバーの作成方法およびテスト用のRabbitMQエクスチェンジとキューの作成方法について説明します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
RabbitMQサーバーの起動
ここでは、Dockerを使用してRabbitMQサーバーを起動する方法を紹介します。
以下のコマンドを実行すると、管理プラグインが有効なRabbitMQサーバーが起動します。管理プラグインにより、WebインターフェースでRabbitMQを監視できます。
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-managementDocker HubのRabbitMQのDocker実行に関する情報もご参照ください。
メッセージ受信用のエクスチェンジとキューの作成
RabbitMQサーバー起動後、RabbitMQ管理Webインターフェースを使って、EMQX Cloudから転送されるメッセージ受信用のテスト用エクスチェンジとキューを作成できます。既にテスト用のエクスチェンジとキューがある場合は、このセクションはスキップ可能です。
Webブラウザで http://{ip address}:15672/ にアクセスし、RabbitMQ管理Webインターフェースを開きます。ログイン画面で以下のデフォルト認証情報を入力し、Loginをクリックします。
- Username:
guest - Password:
guest
- Username:
上部メニューのExchangesタブをクリックします。Add a new exchangeを展開し、以下の情報を入力します:
- Name:
test_exchangeと入力 - Type:ドロップダウンリストから
directを選択 - Durability:
Durableを選択し、エクスチェンジを永続化(RabbitMQサーバー再起動後も存在) - Auto delete:
No - Internal:
No - Arguments:空欄のまま
- Name:
Add exchangeボタンをクリックしてテスト用エクスチェンジを作成します。
上部メニューのQueuesタブをクリックします。Add a new queueを展開し、以下の情報を入力します:
- Type:
Default for virtual host - Name:
test_queueと入力 - Durability:
Durableを選択し、キューを永続化 - Arguments:空欄のまま
- Type:
Add queueボタンをクリックしてテスト用キューを作成します。新しい
test_queueがAll queuesセクションに表示されます。キュー名のtest_queueをクリックして詳細ページを開きます。Bindingsを展開し、Add binding to this queueセクションに以下を入力します:
- From exchange:
test_exchangeと入力 - Routing key:
test_routing_keyと入力 - Arguments:空欄のまま
- From exchange:
Bindボタンをクリックし、
test_queueを指定したルーティングキーでtest_exchangeにバインドします。
コネクターの作成
データ統合ルールを作成する前に、RabbitMQサーバーにアクセスするためのRabbitMQコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ転送カテゴリの下にあるRabbitMQを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ転送カテゴリの下のRabbitMQを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- Server:RabbitMQサーバーがローカルの場合は
localhost、リモートの場合は実際のホスト名またはIPアドレスを入力 - Port:
5672(異なる場合は実際のポート番号) - Username:
guest - Password:
guest - Virtual Host:RabbitMQの仮想ホスト。デフォルトは
/ - Enable TLS:暗号化接続を確立する場合はトグルスイッチをオンにする
- ビジネスニーズに応じて詳細設定を構成(任意)
- Server:RabbitMQサーバーがローカルの場合は
テストボタンをクリックします。RabbitMQサービスにアクセス可能な場合、connector availableのメッセージが表示されます。
新規ボタンをクリックして作成を完了します。
プロデューサールールの作成
ここでは、RabbitMQプロデューサールールを作成し、EMQX Cloudコンソールを通じてルールにアクションを追加する方法を示します。
ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
使用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信したときにエンジンをトリガーするSQLを記述します。sqlSELECT timestamp, clientid, payload FROM "temp_hum/emqx"TIP
初心者の方は、SQL Examplesをクリックし、Try It OutでSQLルールの学習とテストが可能です。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
EMQX CloudからRabbitMQサービスへのメッセージパブリッシュ情報を設定します:
Exchange:前に作成した
test_exchangeを入力。メッセージはこのエクスチェンジにパブリッシュされます。Routing Key:前に作成した
test_routing_keyを入力。RabbitMQでメッセージパブリッシュ時に使用されるルーティングキーです。Message Delivery Modeのドロップダウンで
non_persistentまたはpersistentを選択:non_persistent(デフォルト):メッセージはディスクに永続化されず、RabbitMQ再起動やクラッシュ時に失われる可能性があります。persistent:メッセージはディスクに永続化され、RabbitMQ再起動やクラッシュ時の耐久性を提供します。
TIP
RabbitMQ再起動時のメッセージ紛失を防ぐため、キューとエクスチェンジも永続化(Durable)に設定する必要があります。詳細はRabbitMQのドキュメントをご参照ください。
Payload Template:デフォルトは空文字列で、メッセージペイロードは変更されずJSON形式のテキストとしてRabbitMQに転送されます。
プレースホルダーを用いてカスタムメッセージペイロード形式を定義することも可能です。例えば、MQTTメッセージのペイロードとタイムスタンプをRabbitMQメッセージに含めたい場合、以下のテンプレートを使用します:
json{"payload": "${payload}", "timestamp": ${timestamp}}このテンプレートは、受信したMQTTメッセージのペイロードとタイムスタンプを含むJSON形式のメッセージを生成します。
${payload}と${timestamp}はプレースホルダーで、実際のメッセージ値に置き換えられます。Wait for Publish Confirmations:デフォルトで有効。メッセージがRabbitMQに正常にパブリッシュされたことを保証します。
TIP
このオプションを有効にすると、RabbitMQブローカーはメッセージの受信を確認してから成功とみなすため、メッセージ配信の信頼性が向上します。
詳細設定を展開し、同期/非同期モード、キューやバッチ、その他パラメータを必要に応じて設定します(任意)。
確認ボタンをクリックしてルール作成を完了します。
新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
プロデューサールールのテスト
温度と湿度データの送信をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも可能です。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxクライアントID:
test_clientペイロード:
json{ "temp": "27.5", "hum": "41.8" }
アクションとルールが正常に作成されていれば、指定したルーティングキーでRabbitMQサーバーの指定エクスチェンジにメッセージがパブリッシュされているはずです。RabbitMQ管理コンソール(http://{ip address}:15672)にアクセスし、Queuesセクションに移動します。
メッセージが適切なキューにルーティングされていることを確認します。キューをクリックし、**Get Message(s)**ボタンを押して詳細なメッセージ内容を確認します。
ペイロード:
json{"payload": "{ "temp": "27.5", "hum": "41.8" }", "timestamp": 1711333401673 }
RabbitMQソースルールの作成
ここでは、RabbitMQソースルールを作成し、RabbitMQソースから消費したメッセージをEMQX Cloudに転送し、トピックt/1にパブリッシュするリパブリッシュアクションをルールに追加する方法を示します。
ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
ルールIDに
my_rule_sourceを入力します。ルールをトリガーするソース(データ入力)を設定します。ページ右側の**アクション(ソース)**タブをクリックし、**新規アクション(ソース)**をクリックしてRabbitMQソースを作成します。
スライドパネルでソースタイプとして**RabbitMQ(Source)**を選択し、次へをクリックして設定ステップに進みます。
コネクターのドロップダウンから先ほど作成した
my-rabbitmqコネクターを選択します。ドロップダウン横の作成ボタンをクリックすると、ポップアップで新規コネクターを素早く作成可能です。設定パラメータはコネクターの作成を参照してください。RabbitMQからEMQX Cloudへメッセージを消費するためのソース情報を設定します:
- Queue:RabbitMQで事前に作成したキュー名
message-sendを入力 - No Ack:状況に応じて、RabbitMQからメッセージを
no_ackモードで消費するか選択。no_ackモードを有効にすると、RabbitMQはメッセージをコンシューマが処理完了を確認する前に即座にキューから削除します。 - Wait for Publish Confirmations:メッセージパブリッシャーのアックを使用する際に、RabbitMQの確認を待つかどうかを指定
- Queue:RabbitMQで事前に作成したキュー名
詳細設定(任意):デフォルト値を使用します。
確認ボタンをクリックしてソースの作成を完了し、ルールのデータ入力に追加します。同時に、ルールSQLが以下のように変更されます:
sqlSELECT * FROM "$bridges/rabbitmq:my-rabbitmq-source"RabbitMQソースから以下のフィールドにアクセス可能で、SQLを調整してデータ処理を行えます。ここではデフォルトのSQLを使用可能です。
フィールド名 説明 payload RabbitMQメッセージの内容 event イベントトピック。 $bridges/rabbitmq:<source name>形式metadata ルールID情報 timestamp メッセージがEMQXに到着したタイムスタンプ node メッセージが到着したEMQXノード名 queue メッセージが消費されたキュー名 exchange メッセージがルーティングされたエクスチェンジ名 routing_key エクスチェンジからキューへのメッセージルーティングに使用されたルーティングキー 次へをクリックし、出力アクションを作成します。
新規出力アクションでリパブリッシュを選択します。
メッセージリパブリッシュ設定を入力します:
- トピック:MQTTにパブリッシュするトピック。ここでは
t/1を入力。 - QoS:
0、1、2、または${qos}を選択。${qos}を選ぶと元のメッセージのQoSに従います。 - Retain:
trueまたはfalseを選択。メッセージをリテインメッセージとしてパブリッシュするかどうかを決定。プレースホルダーも使用可能。ここではfalseを選択。 - ペイロード:転送メッセージペイロード生成用のテンプレート。空欄の場合はルール出力結果をそのまま転送。ここでは
${payload}を入力し、ペイロードのみ転送。 - MQTT 5.0メッセージプロパティ:デフォルトで無効。詳細はリパブリッシュアクションの追加を参照。
- トピック:MQTTにパブリッシュするトピック。ここでは
その他の設定はデフォルトのままにし、確認ボタンをクリックして出力アクションの作成を完了します。
作成成功後、新規ルールページに戻り、ルール一覧に新規ルールが表示されます。リパブリッシュアクションは現在**アクション(ソース)**には表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部にリパブリッシュ出力アクションが確認できます。
RabbitMQソースルールのテスト
MQTTX CLIを使用してトピック
t/1をサブスクライブします:bashmqttx sub -t t/1以下のコマンドでRabbitMQにメッセージをパブリッシュできます:
bashrabbitmqadmin --username=guest --password=guest \ publish routing_key=message-send \ payload="{ \"msg\": \"Hello EMQX\"}"publishはメッセージをパブリッシュするコマンドです。routing_key=message-sendオプションはメッセージのルーティングキーを設定します。この例ではキュー名をルーティングキーとして使用しています。payload="{ \"msg\": \"Hello EMQX\"}"オプションはメッセージ内容を設定します。
または、RabbitMQ管理インターフェースからもメッセージをパブリッシュ可能です:
- 上部メニューのQueuesタブをクリックします。
- Name列のmessage-sendをクリックして詳細ページを開きます。
- Publish messageを展開し、Payloadボックスに
"Hello EMQX"と入力し、Publish messageボタンをクリックします。
MQTTXに以下のような出力が表示されます:
bash[2024-2-23] [16:59:28] › payload: {"payload":{"msg":"Hello EMQX"},"event":"$bridges/rabbitmq:my-rabbitmq-source","metadata":{"rule_id":"rule_0ly1"},"timestamp":1708678768449,"node":"emqx@127.0.0.1"}