RabbitMQへのMQTTデータ取り込み
RabbitMQは、Advanced Message Queuing Protocol(AMQP)を実装した広く利用されているオープンソースのメッセージブローカーです。分散システム間のメッセージングにおいて堅牢でスケーラブルなプラットフォームを提供します。EMQXプラットフォームはRabbitMQとの統合をサポートしており、MQTTメッセージやイベントをRabbitMQに転送できます。また、RabbitMQサーバーからデータを取得し、EMQXプラットフォームの特定のトピックにパブリッシュすることも可能で、RabbitMQからMQTTへのメッセージ配信を実現します。
本ページでは、EMQXプラットフォームとRabbitMQ間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。
動作概要
RabbitMQデータ統合は、EMQXプラットフォームに標準搭載された機能であり、MQTTベースのIoTデータとRabbitMQの強力なメッセージキュー処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを活用することで、EMQXプラットフォームからRabbitMQへのデータ取り込みをコード不要で簡単に実現できます。
以下の図は、EMQXプラットフォームとRabbitMQ間の典型的なデータ統合アーキテクチャを示しています。
MQTTデータをRabbitMQに取り込む流れは以下の通りです:
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXプラットフォームに正常に接続し、リアルタイムMQTTデータをパブリッシュします。EMQXプラットフォームがメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージ到着後、ルールエンジンを通過し、EMQXプラットフォームで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、RabbitMQにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
- RabbitMQへのメッセージ取り込み:ルールによる処理完了後、メッセージをRabbitMQに転送するアクションがトリガーされます。処理済みメッセージはシームレスにRabbitMQに書き込まれます。
- データの永続化と活用:RabbitMQはメッセージをキューに格納し、適切なコンシューマーに配信します。メッセージは他のアプリケーションやサービスによって消費され、データ分析、可視化、保存などのさらなる処理に利用されます。
特長と利点
RabbitMQとのデータ統合は、以下の特長とメリットをもたらします:
- 信頼性の高いIoTデータメッセージ配信:EMQXプラットフォームはデバイスからクラウドへの信頼性の高い接続とメッセージ配信を保証し、RabbitMQはメッセージの永続化と異なるサービス間の信頼性ある配信を担い、全体のデータ信頼性を確保します。
- MQTTメッセージの変換:ルールエンジンを用いて、EMQXプラットフォームはMQTTメッセージのフィルタリングや変換を実施可能です。データ抽出、フィルタリング、強化、変換を経てRabbitMQに送信されます。
- 柔軟なメッセージマッピング:RabbitMQデータ統合はMQTTトピックとRabbitMQのルーティングキーおよびエクスチェンジの柔軟なマッピングをサポートし、MQTTとRabbitMQ間のシームレスな連携を実現します。
- 高可用性とクラスター対応:EMQXプラットフォームとRabbitMQは共に高可用性のメッセージブローカークラスター構築をサポートし、ノード障害時もサービス継続を保証します。クラスター機能により優れたスケーラビリティも提供されます。
- 高スループットシナリオでの処理能力:RabbitMQデータ統合は同期および非同期書き込みモードをサポートし、レイテンシとスループットのバランスを柔軟に調整可能です。
はじめる前に
このセクションでは、RabbitMQデータ統合の作成を始める前に必要な準備、RabbitMQサーバーの作成方法およびテスト用のエクスチェンジとキューの作成方法を説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
RabbitMQサーバーの起動
ここでは、Dockerを用いてRabbitMQサーバーを起動する方法を紹介します。
管理プラグインを有効にしたRabbitMQサーバーを起動するには、以下のコマンドを実行してください。管理プラグインにより、WebインターフェースでRabbitMQの状態を確認できます。
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
Docker HubのRabbitMQのDockerイメージに関する詳細情報もご参照ください。
メッセージ受信用のエクスチェンジとキューの作成
RabbitMQサーバー起動後、RabbitMQ管理Webインターフェースを使って、EMQXプラットフォームから転送されるメッセージを受信するためのテスト用エクスチェンジとキューを作成します。既にテスト用のエクスチェンジとキューがある場合は、このセクションはスキップ可能です。
Webブラウザで http://{ip address}:15672/ にアクセスし、RabbitMQ管理Webインターフェースを開きます。ログイン画面で以下のデフォルト認証情報を入力し、Loginをクリックします。
- Username:
guest
- Password:
guest
- Username:
上部メニューのExchangesタブをクリックします。Add a new exchangeを展開し、以下の情報を入力します:
- Name:
test_exchange
と入力 - Type:ドロップダウンリストから
direct
を選択 - Durability:
Durable
を選択し、エクスチェンジを永続化(RabbitMQ再起動後も存在) - Auto delete:
No
- Internal:
No
- Arguments:空欄のまま
- Name:
Add exchangeボタンをクリックしてテスト用エクスチェンジを作成します。
上部メニューのQueuesタブをクリックします。Add a new queueを展開し、以下の情報を入力します:
- Type:
Default for virtual host
- Name:
test_queue
と入力 - Durability:
Durable
を選択し、キューを永続化 - Arguments:空欄のまま
- Type:
Add queueボタンをクリックしてテスト用キューを作成します。新しい
test_queue
がAll queuesセクションに表示されます。キュー名のtest_queueをクリックして詳細ページを開きます。Bindingsを展開し、Add binding to this queueセクションに以下を入力します:
- From exchange:
test_exchange
と入力 - Routing key:
test_routing_key
と入力 - Arguments:空欄のまま
- From exchange:
Bindボタンをクリックして、
test_queue
を指定したルーティングキーでtest_exchange
にバインドします。
コネクターの作成
データ統合ルールを作成する前に、RabbitMQサーバーにアクセスするためのRabbitMQコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからData Integrationをクリックします。初めてコネクターを作成する場合は、Data Forwardカテゴリの下にあるRabbitMQを選択します。既にコネクターを作成済みの場合は、New Connectorを選択し、続けてData ForwardカテゴリのRabbitMQを選択します。
Connector Name:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- Server:RabbitMQサーバーがローカルの場合は
localhost
、リモートの場合は実際のホスト名またはIPアドレスを入力 - Port:通常は
5672
、異なる場合は実際のポート番号を入力 - Username:
guest
- Password:
guest
- Virtual Host:RabbitMQの仮想ホスト。デフォルトは
/
- Enable TLS:暗号化接続を確立したい場合はトグルスイッチをオンにする
- ビジネスニーズに応じて詳細設定を構成(任意)
- Server:RabbitMQサーバーがローカルの場合は
Testボタンをクリックします。RabbitMQサービスにアクセス可能であれば、connector availableのプロンプトが返されます。
Newボタンをクリックして作成を完了します。
プロデューサールールの作成
このセクションでは、RabbitMQプロデューサールールを作成し、EMQXプラットフォームコンソールからルールにアクションを追加する方法を示します。
ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。
使用する機能に基づき、SQL Editorでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQLを記述します。sqlSELECT timestamp, clientid, payload FROM "temp_hum/emqx"
TIP
初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことを推奨します。
Nextをクリックしてアクションを追加します。
Connectorのドロップダウンボックスから先ほど作成したコネクターを選択します。
EMQXプラットフォームからRabbitMQサービスにメッセージをパブリッシュするための情報を設定します:
Exchange:前に作成した
test_exchange
を入力。メッセージはこのエクスチェンジにパブリッシュされます。Routing Key:前に作成した
test_routing_key
を入力。RabbitMQでメッセージパブリッシュ時に使用されるルーティングキーです。Message Delivery Modeのドロップダウンから
non_persistent
またはpersistent
を選択:non_persistent
(デフォルト):メッセージはディスクに永続化されず、RabbitMQの再起動やクラッシュ時に失われる可能性があります。persistent
:メッセージはディスクに永続化され、RabbitMQの再起動やクラッシュ時にも耐久性があります。
TIP
RabbitMQが再起動した場合にメッセージを失わないように、キューとエクスチェンジも永続化(Durable)に設定する必要があります。詳細はRabbitMQのドキュメントをご参照ください。
Payload Template:デフォルトは空文字列で、メッセージペイロードはJSON形式のテキストとしてRabbitMQにそのまま転送されます。
プレースホルダーを使ってカスタムのメッセージペイロードフォーマットを定義することも可能です。例えば、MQTTメッセージのペイロードとタイムスタンプをRabbitMQメッセージに含めたい場合は、以下のテンプレートを使用します:
json{"payload": "${payload}", "timestamp": ${timestamp}}
このテンプレートは、転送されるMQTTメッセージのペイロードとタイムスタンプを含むJSON形式のメッセージを生成します。
${payload}
と${timestamp}
はプレースホルダーで、実際のメッセージの値に置き換えられます。Wait for Publish Confirmations:デフォルトで有効。メッセージがRabbitMQに正常にパブリッシュされたことを確認します。
TIP
このオプションを有効にすると、RabbitMQブローカーはメッセージの受信をアック(ACK)し、成功したパブリッシュとして扱うため、メッセージ配信の信頼性が向上します。
Advanced Settingsを展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します(任意)。
Confirmボタンをクリックしてルール作成を完了します。
Successful new ruleのポップアップでBack to Rulesをクリックし、データ統合設定の一連の作業を完了します。
プロデューサールールのテスト
温度と湿度のデータ報告をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
アクションとルールが正常に作成されていれば、指定したルーティングキーでRabbitMQサーバーの指定エクスチェンジにメッセージがパブリッシュされているはずです。RabbitMQ管理コンソール(http://{ip address}:15672)にアクセスし、Queuesセクションに移動してください。
メッセージが適切なキューにルーティングされていることを確認します。キューをクリックし、**Get Message(s)**ボタンを押して詳細なメッセージ内容を確認します。
ペイロード例:
json{"payload": "{ "temp": "27.5", "hum": "41.8" }", "timestamp": 1711333401673 }
RabbitMQソースルールの作成
このセクションでは、RabbitMQソースルールを作成し、RabbitMQソースから消費したメッセージをEMQXプラットフォームに転送してトピックt/1
にパブリッシュするリパブリッシュアクションを追加する方法を示します。
ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。
ルールIDに
my_rule_source
を入力します。ルールをトリガーするソース(データ入力)を設定します。ページ右側の**Actions (Source)**タブをクリックし、**New Action (Source)**をクリックしてRabbitMQソースを作成します。
スライドパネルでソースタイプに**RabbitMQ (Source)**を選択し、Nextをクリックして設定ステップに進みます。
Connectorのドロップダウンから、先に作成した
my-rabbitmq
コネクターを選択します。ドロップダウン横の作成ボタンをクリックすると、ポップアップで新規コネクターを素早く作成可能です。必要な設定パラメータはコネクターの作成を参照してください。RabbitMQからEMQXプラットフォームへメッセージを消費するためのソース情報を設定します:
- Queue:RabbitMQで先に作成したキュー名
message-send
を入力 - No Ack:状況に応じて選択。
no_ack
モードでRabbitMQからメッセージを消費するかどうかを指定します。no_ack
を有効にすると、RabbitMQはメッセージをコンシューマーが正常に処理したかに関わらず即座にキューから削除します。 - Wait for Publish Confirmations:メッセージパブリッシャーのアックを使用する際に、RabbitMQからの確認を待つかどうかを指定します。
- Queue:RabbitMQで先に作成したキュー名
高度な設定(任意):デフォルト値を使用します。
Confirmボタンをクリックしてソース作成を完了し、ルールのデータ入力に追加します。同時にルールSQLが以下のように変更されます。
sqlSELECT * FROM "$bridges/rabbitmq:my-rabbitmq-source"
RabbitMQソースから以下のフィールドにアクセス可能であり、SQLを調整してデータ処理を行えます。ここではデフォルトSQLを使用します。
フィールド名 説明 payload RabbitMQメッセージの内容 event イベントトピック。形式は $bridges/rabbitmq:<source name>
metadata ルールID情報 timestamp メッセージがEMQXに到着したタイムスタンプ node メッセージが到着したEMQXノード名 queue メッセージが消費されたキュー名 exchange メッセージがルーティングされたエクスチェンジ名 routing_key エクスチェンジからキューへメッセージをルーティングする際のルーティングキー Nextをクリックし、出力アクションを作成します。
新しい出力アクションでRepublishを選択します。
メッセージのリパブリッシュ設定を入力します:
- Topic:MQTTにパブリッシュするトピック。ここでは
t/1
を入力。 - QoS:
0
、1
、2
、または${qos}
を選択。${qos}
を選ぶと元のメッセージのQoSに従います。 - Retain:
true
またはfalse
を選択。メッセージをリテインメッセージとしてパブリッシュするかを決定。プレースホルダーも使用可能。ここではfalse
を選択。 - Payload:転送するメッセージペイロードのテンプレート。空欄の場合はルールの出力結果をそのまま転送。ここでは
${payload}
と入力し、ペイロードのみを転送。 - MQTT 5.0 Message Properties:デフォルトで無効。詳細はリパブリッシュアクションの追加を参照。
- Topic:MQTTにパブリッシュするトピック。ここでは
その他の設定はデフォルトのままにし、Confirmボタンをクリックして出力アクションの作成を完了します。
作成成功後、New Ruleページに戻ります。Rulesリストに新規作成したルールが表示されます。リパブリッシュアクションは現在**Actions (Source)**には表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部にリパブリッシュ出力アクションが確認できます。
RabbitMQソースルールのテスト
MQTTX CLIを使用してトピック
t/1
をサブスクライブします:bashmqttx sub -t t/1
以下のコマンドでRabbitMQにメッセージをパブリッシュできます:
bashrabbitmqadmin --username=guest --password=guest \ publish routing_key=message-send \ payload="{ \"msg\": \"Hello EMQX\"}"
publish
はメッセージをパブリッシュするコマンドです。routing_key=message-send
オプションはメッセージのルーティングキーを設定します。この例ではキュー名をルーティングキーとして使用しています。payload="{ \"msg\": \"Hello EMQX\"}"
オプションはメッセージの内容を設定します。
または、RabbitMQ管理インターフェースからもメッセージをパブリッシュ可能です:
- 上部メニューのQueuesタブをクリックします。
- Name列のmessage-sendをクリックして詳細ページを開きます。
- Publish messageを展開し、Payloadボックスに
"Hello EMQX"
と入力して、Publish messageボタンをクリックします。
MQTTXで以下のような出力が確認できます:
bash[2024-2-23] [16:59:28] › payload: {"payload":{"msg":"Hello EMQX"},"event":"$bridges/rabbitmq:my-rabbitmq-source","metadata":{"rule_id":"rule_0ly1"},"timestamp":1708678768449,"node":"emqx@127.0.0.1"}