Skip to content

Bridge MQTT Data into RocketMQ

EMQXはRocketMQへのデータブリッジをサポートしており、MQTTメッセージやクライアントイベントをRocketMQに転送できます。例えば、RocketMQを利用してデバイスからのセンサーデータやログデータを収集することが可能です。

本ページでは、EMQXとRocketMQ間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。

注意事項

Alibaba Cloudが提供するRocketMQサービスを利用する場合、本データ統合はバッチモードをサポートしていません。

動作概要

RocketMQデータ統合は、EMQXに標準搭載された機能であり、EMQXのリアルタイムデータキャプチャと送信機能とRocketMQの強力なメッセージキュー処理機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからRocketMQへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXとRocketMQ間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration RocketMQ

MQTTデータをRocketMQに取り込む流れは以下の通りです:

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、リアルタイムのMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールに従って処理されます。ルールは事前に定義された条件に基づき、RocketMQへルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、ペイロードへの追加コンテキスト付与などの処理が適用されます。
  3. RocketMQへのデータ取り込み:ルールによる処理が完了すると、メッセージをRocketMQに転送するアクションがトリガーされます。処理済みデータはシームレスにRocketMQに書き込まれます。
  4. データの保存と活用:データがRocketMQに保存されることで、様々なユースケースに活用可能になります。例えば金融業界では、RocketMQを高信頼・高性能なメッセージキューとして利用し、決済端末や取引システムからのデータを管理します。メッセージはデータ分析や規制対応プラットフォームと連携し、リスク管理、不正検知・防止、コンプライアンス遵守などの要件を満たします。

特長とメリット

RocketMQとのデータ統合は、以下の特長と利点をビジネスにもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQXはMQTTメッセージを信頼性高くバッチ送信でき、IoTデバイスとRocketMQおよびアプリケーションシステムの統合を実現します。
  • MQTTメッセージの変換:ルールエンジンを用いてMQTTメッセージの抽出、フィルタリング、エンリッチメント、変換が可能で、RocketMQ送信前にデータを柔軟に加工できます。
  • クラウドネイティブな弾力的スケーリング:EMQXとRocketMQは共にクラウドネイティブアーキテクチャで構築されており、Kubernetes(K8s)との親和性が高く、クラウドネイティブエコシステムと統合可能です。ビジネスの急速な成長に応じて無限に弾力的にスケールできます。
  • 柔軟なトピックマッピング:RocketMQデータ統合はMQTTトピックとRocketMQトピックの柔軟なマッピングをサポートし、RocketMQメッセージのキー(Key)や値(Value)を簡単に設定可能です。
  • 高スループットシナリオでの処理能力:RocketMQデータ統合は同期・非同期の両方の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

はじめる前に

このセクションでは、RocketMQデータ統合の作成を開始する前に必要な準備、特にRocketMQサーバーのセットアップ方法を説明します。

前提条件

RocketMQのインストール

  1. RocketMQをセットアップするためのdocker-composeファイルrocketmq.yamlを用意します。
yaml
version: '3.9'

services:
  mqnamesrv:
    image: apache/rocketmq:4.9.4
    container_name: rocketmq_namesrv
    ports:
      - 9876:9876
    volumes:
      - ./rocketmq/logs:/opt/logs
      - ./rocketmq/store:/opt/store
    command: ./mqnamesrv

  mqbroker:
    image: apache/rocketmq:4.9.4
    container_name: rocketmq_broker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./rocketmq/logs:/opt/logs
      - ./rocketmq/store:/opt/store
      - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rocketmq_namesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms1024m -Xmx1024m -Xmn1024m"
    command: ./mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - mqnamesrv
  1. RocketMQの実行に必要なフォルダと設定を準備します。
bash
mkdir rocketmq
mkdir rocketmq/logs
mkdir rocketmq/store
mkdir rocketmq/conf
  1. 以下の内容を rocketmq/conf/broker.conf に保存します。
bash
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

brokerIP1=change me to your real IP address

defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true

listenPort=10911
deleteWhen=04

fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536

brokerRole=ASYNC_MASTER

flushDiskType=ASYNC_FLUSH
  1. サーバーを起動します。
bash
docker-compose -f rocketmq.yaml up
  1. コンシューマーを起動します。
docker run --rm -e NAMESRV_ADDR=host.docker.internal:9876 apache/rocketmq:4.9.4 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

TIP

Linux環境の場合は、host.docker.internalを実際のIPアドレスに変更してください。

コネクターの作成

このセクションでは、SinkをRocketMQサーバーに接続するためのコネクター作成手順を示します。

以下の手順は、EMQXとRocketMQをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでRocketMQを選択し、Nextをクリックします。
  4. Configurationステップで以下を設定します:
    • Connector name:コネクター名を入力します。英数字の組み合わせで、例:my_rocketmq
    • Servers127.0.0.1:9876を入力します。
    • Namespace:RocketMQサービスにネームスペース設定がある場合のみ入力し、なければ空欄のままにします。
    • AccessKeySecretKeySecret Token:RocketMQサービスの設定に応じて入力するか空欄のままにします。
    • その他はデフォルトのままにします。
  5. 詳細設定(任意):詳細はSinkの機能を参照してください。
  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがRocketMQサーバーに接続可能かテストできます。
  7. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを指定したルール作成に進めます。詳細はメッセージ保存用RocketMQ Sinkのルール作成およびイベント記録用RocketMQ Sinkのルール作成を参照してください。

メッセージ保存用RocketMQ Sinkのルール作成

このセクションでは、ダッシュボード上でMQTTソーストピックt/#からのメッセージを処理し、処理済みデータを設定済みSink経由でRocketMQトピックTopicTestに転送するルールの作成方法を示します。

  1. EMQXダッシュボードで Integration -> Rules をクリックします。

  2. 画面右上の Create をクリックします。

  3. ルールIDにmy_ruleを入力し、SQL Editorに以下のステートメントを入力します。これはトピックt/#配下のMQTTメッセージをRocketMQに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT句に含めていることを確認してください。

    sql
    SELECT 
      *
    FROM
      "t/#"

    TIP

    初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストしてください。

    • Add Actionボタンをクリックし、ルール発動時にトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをRocketMQに送信します。
  4. Type of ActionドロップダウンからRocketMQを選択します。ActionはデフォルトのCreate Actionのままにします。既存のSinkがあれば選択可能ですが、この例では新規Sinkを作成します。

  5. Sink名を入力します。英数字の組み合わせで入力してください。

  6. Connectorドロップダウンから先ほど作成したmy_rocketmqを選択します。新規コネクター作成も可能です。設定パラメータはコネクターの作成を参照してください。

  7. 以下のRocketMQ固有フィールドを入力します:

    • RocketMQ Topic:メッセージを転送するトピック名を入力します。例:TopicTest
    • Tag(任意):RocketMQタグを動的に割り当てるテンプレートフィールドです。${msg_type}${clientid}など、ルールSQLの結果を利用したプレースホルダーを含められます。空欄の場合はタグなしとなります。
    • Key(任意):各メッセージにキーを割り当てるテンプレートフィールドです。メッセージのトレースや重複排除に役立ちます。例:${device_id}${username}。こちらもルールSQLの結果からプレースホルダーを利用可能です。
  8. Message Templateフィールドでは、RocketMQに送信するメッセージペイロードの構造をカスタマイズできます。

    デフォルトは空欄で、空欄の場合はメッセージ全体がRocketMQに転送されます。

    テンプレートは任意の有効な文字列で、プレースホルダーを含められます。例:

    • ${id}, ${username}, ${clientid}, ${timestamp}
    • {"id": ${id}, "username": ${username}}

    実際の値は文字列で、JSON形式のテンプレートも可能です。プレースホルダーはルールSQLで選択したフィールドで実行時に置換されます。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  10. 詳細設定(任意):詳細はSinkの機能を参照してください。

  11. Createをクリックする前に、Test ConnectivityをクリックしてSinkがRocketMQサーバーに接続可能かテストできます。

  12. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  13. Create Ruleページで設定内容を確認し、Saveをクリックしてルールを生成します。

これでRocketMQ Sink用のルール作成が完了しました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると、新しいRocketMQ Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleで解析され、RocketMQに送信・保存されている様子を確認できます。

イベント記録用RocketMQ Sinkのルール作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSink経由でRocketMQトピックTestTopicに転送するルール作成方法を示します。

ルール作成手順はメッセージ保存用RocketMQ Sinkのルール作成とほぼ同様ですが、SQLルール構文が異なります。

オンライン/オフライン状態記録用のSQLルール文法は以下の通りです:

sql
SELECT
  *
FROM 
  "$events/client_connected", "$events/client_disconnected"

TIP

便宜上、オンライン/オフラインイベントの受け取りにTopicTestトピックを再利用します。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello RocketMQ" }'

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

データが TopicTest トピックに転送されているか確認してください。

以下のようなデータがコンシューマーにより出力されます。

bash
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=581, queueOffset=0, sysFlag=0, bornTimestamp=1679037578889, bornHost=/172.26.83.106:43920, storeTimestamp=1679037578891, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F000000000000060E, commitLogOffset=1550, bodyCRC=7414108, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=511, queueOffset=1, sysFlag=0, bornTimestamp=1679037580174, bornHost=/172.26.83.106:43920, storeTimestamp=1679037580176, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F0000000000000E61, commitLogOffset=3681, bodyCRC=1604860416, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=458, queueOffset=2, sysFlag=0, bornTimestamp=1679037584933, bornHost=/172.26.83.106:43920, storeTimestamp=1679037584934, storeHost=/172.26.83.106:10911, msgId=AC1A536A00002A9F000000000000166E, commitLogOffset=5742, bodyCRC=383397630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1679037605342, CLUSTER=DefaultCluster}, body=[...], transactionId='null'}]]