MQTTデータをRocketMQにブリッジする
EMQX PlatformはRocketMQへのデータブリッジをサポートしており、MQTTメッセージやクライアントイベントをRocketMQに転送できます。例えば、RocketMQを使ってデバイスからのセンサーデータやログデータを収集することが可能です。
本ページでは、EMQX PlatformとRocketMQ間のデータ統合について詳細に解説し、データ統合の作成および検証方法を実践的に説明します。
動作の仕組み
RocketMQデータ統合はEMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータキャプチャと送信機能と、RocketMQの強力なメッセージキュー処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX PlatformからRocketMQへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。
下図はEMQX PlatformとRocketMQ間の典型的なデータ統合アーキテクチャを示しています。
MQTTデータをRocketMQに取り込む流れは以下の通りです。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQX Platformに正常に接続し、リアルタイムのMQTTデータをパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Platformで定義されたルールに基づいて処理されます。ルールは事前定義された条件に従い、RocketMQへルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、ペイロードの付加情報による強化などが適用されます。
- RocketMQへのデータ取り込み:ルールで処理されたメッセージは、RocketMQへ転送するアクションをトリガーします。処理済みデータはシームレスにRocketMQに書き込まれます。
- データの保存と活用:データがRocketMQに保存されることで、企業はそのクエリ機能を活用可能です。例えば金融業界では、RocketMQを信頼性の高い高性能メッセージキューとして利用し、決済端末や取引システムからのデータを管理します。メッセージをデータ分析や規制プラットフォームに連携させ、リスク管理、不正検知・防止、法令遵守などの要件を満たします。
特長とメリット
RocketMQとのデータ統合は、以下の特長と利点をビジネスにもたらします。
- 信頼性の高いIoTデータメッセージ配信:EMQX PlatformはMQTTメッセージをバッチで確実にRocketMQへ送信でき、IoTデバイスとRocketMQおよびアプリケーションシステムの統合を実現します。
- MQTTメッセージの変換:ルールエンジンを利用し、EMQX PlatformはMQTTメッセージの抽出、フィルタリング、強化、変換を行い、RocketMQへ送信します。
- クラウドネイティブな弾力的スケーリング:EMQX PlatformとRocketMQは共にクラウドネイティブアーキテクチャで構築されており、Kubernetes(K8s)に対応し、クラウドネイティブエコシステムと統合可能です。ビジネスの急速な成長に合わせて無限かつ弾力的にスケールアウトできます。
- 柔軟なトピックマッピング:RocketMQデータ統合はMQTTトピックとRocketMQトピックの柔軟なマッピングをサポートし、RocketMQメッセージのキー(Key)や値(Value)の設定を簡単に行えます。
- 高スループットシナリオでの処理能力:RocketMQデータ統合は同期・非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。
はじめる前に
このセクションでは、RocketMQデータ統合の作成を始める前に必要な準備、特にRocketMQサーバーのセットアップ方法について説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
RocketMQのインストール
- RocketMQをセットアップするためのdocker-composeファイル
rocketmq.yaml
を用意します。
version: '3.3'
services:
mqnamesrv:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv
ports:
- 9876:9876
volumes:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
command: ./mqnamesrv
mqbroker:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
- ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: 'rocketmq_namesrv:9876'
JAVA_OPTS: ' -Duser.home=/opt'
JAVA_OPT_EXT: '-server -Xms1024m -Xmx1024m -Xmn1024m'
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrv
- RocketMQの実行に必要なフォルダと設定を準備します。
mkdir rocketmq
mkdir rocketmq/logs
mkdir rocketmq/store
mkdir rocketmq/conf
- 以下の内容を
rocketmq/conf/broker.conf
に保存します。
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerIP1=change me to your real IP address
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
- サーバーを起動します。
docker-compose -f rocketmq.yaml up
- コンシューマーを起動します。
docker run --rm -e NAMESRV_ADDR=host.docker.internal:9876 apache/rocketmq:4.9.4 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
TIP
Linux環境では、host.docker.internal
を実際のIPアドレスに変更してください。
コネクターの作成
データ統合ルールを作成する前に、RocketMQサーバーにアクセスするためのRocketMQコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ転送カテゴリの下にあるRocketMQを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ転送カテゴリのRocketMQを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します。
- Servers:接続したいRocketMQサーバーのアドレスを入力します(例:localhost)。ホストの形式はHost[:Port]です。ポート番号を指定しない場合、RocketMQのデフォルトポート9876が使用されます。
- AccessKey:RocketMQサーバーのAccessKey。
- SecretKey:RocketMQサーバーのSecretKey。
- Security Token:RocketMQサーバーのセキュリティトークン。
- ビジネス要件に応じて詳細設定を行います(任意)。
テストボタンをクリックします。RocketMQサービスにアクセス可能であれば、connector availableというメッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
これで、このコネクターを基にデータブリッジルールを作成できます。
ルールの作成
このセクションでは、EMQX Platformコンソールを使ってRocketMQルールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
利用したい機能に基づいてSQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温湿度メッセージを送信した際にエンジンをトリガーするSQL例を示します。
SELECT
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"
TIP
初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことをおすすめします。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
EMQX PlatformからRocketMQサービスへメッセージをパブリッシュするための情報を設定します。
- RocketMQトピック:
TopicTest
- メッセージテンプレート:テンプレートはデフォルトで空です。空の場合、メッセージ全体がRocketMQに格納されます。テンプレートはプレースホルダーを含む有効な文字列で指定可能です。例:
{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
- RocketMQトピック:
詳細設定を展開し、同期/非同期モード、キューやバッチ、その他のパラメータを適宜設定します(任意)。
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温湿度データの報告をシミュレートするために、MQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
RocketMQコンシューマーのウィンドウに以下のような出力が表示されます。
bashConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1711354009076, bornHost=/121.43.165.169:48850, storeTimestamp=1711354009085, storeHost=/118.178.124.161:10911, msgId=76B27CA100002A9F000000000000058D, commitLogOffset=1421, bodyCRC=1137462344, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1711354066863, CLUSTER=DefaultCluster}, body=[123, 34, 117, 112, 95, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 55, 49, 49, 51, 53, 52, 48, 48, 57, 48, 53, 54, 44, 34, 116, 101, 109, 112, 34, 58, 34, 50, 55, 46, 53, 34, 44, 34, 104, 117, 109, 34, 58, 34, 52, 49, 46, 56, 34, 44, 34, 99, 108, 105, 101, 110, 116, 95, 105, 100, 34, 58, 34, 109, 113, 116, 116, 120, 95, 97, 50, 97, 99, 102, 100, 49, 57, 34, 125], transactionId='null'}]]