Skip to content

Bridge MQTT Data into RocketMQ

EMQX Cloud は RocketMQ へのデータブリッジをサポートしており、MQTT メッセージやクライアントイベントを RocketMQ に転送できます。例えば、RocketMQ を利用してデバイスからのセンサーデータやログデータを収集することが可能です。

本ページでは、EMQX Cloud と RocketMQ 間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。

How It Works

RocketMQ データ統合は、EMQX Cloud に標準搭載された機能であり、EMQX Cloud のリアルタイムデータキャプチャおよび送信機能と RocketMQ の強力なメッセージキュー処理機能を組み合わせています。組み込みの ルールエンジン コンポーネントを活用することで、EMQX Cloud から RocketMQ へのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQX Cloud と RocketMQ 間の典型的なデータ統合アーキテクチャを示しています。

EMQX Cloud-RocketMQ Integration

MQTT データを RocketMQ に取り込む流れは以下の通りです:

  1. メッセージのパブリッシュと受信:産業用 IoT デバイスは MQTT プロトコルを通じて EMQX Cloud に正常に接続し、リアルタイムの MQTT データを EMQX Cloud にパブリッシュします。EMQX Cloud はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloud に定義されたルールによって処理されます。ルールは事前定義された条件に基づき、RocketMQ にルーティングすべきメッセージを判別します。ペイロードの変換が指定されている場合は、データ形式の変換や特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. RocketMQ へのデータ取り込み:ルールによる処理が完了すると、メッセージを RocketMQ に転送するアクションがトリガーされます。処理済みのデータはシームレスに RocketMQ に書き込まれます。
  4. データの保存と活用:データが RocketMQ に保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば金融業界では、RocketMQ を信頼性の高い高性能メッセージキューとして利用し、決済端末や取引システムからのデータを管理します。メッセージをデータ分析や規制プラットフォームに連携させ、リスク管理、不正検知・防止、法令遵守などの要件を満たします。

Features and Benefits

RocketMQ とのデータ統合により、以下の特徴と利点が得られます:

  • 信頼性の高い IoT データメッセージ配送:EMQX Cloud は MQTT メッセージをバッチ処理で安定的に RocketMQ に送信でき、IoT デバイスと RocketMQ およびアプリケーションシステムの統合を実現します。
  • MQTT メッセージの変換:ルールエンジンを利用して、EMQX Cloud は MQTT メッセージのフィルタリングや変換を行えます。データ抽出、フィルタリング、強化、変換を経てから RocketMQ に送信可能です。
  • クラウドネイティブな弾力的スケーリング:EMQX Cloud と RocketMQ はいずれもクラウドネイティブアーキテクチャ上に構築されており、Kubernetes(K8s)をはじめとしたクラウドネイティブエコシステムと親和性が高いです。ビジネスの急速な成長に合わせて無限かつ弾力的にスケール可能です。
  • 柔軟なトピックマッピング:RocketMQ データ統合は MQTT トピックと RocketMQ トピックの柔軟なマッピングをサポートし、RocketMQ メッセージのキー(Key)や値(Value)の設定を簡単に行えます。
  • 高スループットシナリオでの処理能力:RocketMQ データ統合は同期・非同期の両書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。

Before You Start

このセクションでは、RocketMQ データ統合の作成を開始する前に必要な準備、特に RocketMQ サーバーのセットアップ方法について説明します。

Prerequisites

Set up Network

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

Install RocketMQ

  1. RocketMQ をセットアップするための docker-compose ファイル rocketmq.yaml を準備します。
yaml
version: '3.3'

services:
  mqnamesrv:
    image: apache/rocketmq:4.9.4
    container_name: rocketmq_namesrv
    ports:
      - 9876:9876
    volumes:
      - ./rocketmq/logs:/opt/logs
      - ./rocketmq/store:/opt/store
    command: ./mqnamesrv

  mqbroker:
    image: apache/rocketmq:4.9.4
    container_name: rocketmq_broker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./rocketmq/logs:/opt/logs
      - ./rocketmq/store:/opt/store
      - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: 'rocketmq_namesrv:9876'
      JAVA_OPTS: ' -Duser.home=/opt'
      JAVA_OPT_EXT: '-server -Xms1024m -Xmx1024m -Xmn1024m'
    command: ./mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - mqnamesrv
  1. RocketMQ 実行に必要なフォルダと設定を作成します。
bash
mkdir rocketmq
mkdir rocketmq/logs
mkdir rocketmq/store
mkdir rocketmq/conf
  1. 以下の内容を rocketmq/conf/broker.conf に保存します。
bash
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

brokerIP1=change me to your real IP address

defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true

listenPort=10911
deleteWhen=04

fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536

brokerRole=ASYNC_MASTER

flushDiskType=ASYNC_FLUSH
  1. サーバーを起動します。
bash
docker-compose -f rocketmq.yaml up
  1. コンシューマを起動します。
docker run --rm -e NAMESRV_ADDR=host.docker.internal:9876 apache/rocketmq:4.9.4 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

TIP

Linux 環境では host.docker.internal を実際の IP アドレスに変更してください。

Create a Connector

データ統合ルールを作成する前に、まず RocketMQ サーバーにアクセスするための RocketMQ コネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューから Data Integration をクリックします。初めてコネクターを作成する場合は、Data Forward カテゴリの中から RocketMQ を選択します。すでにコネクターを作成済みの場合は、New Connector を選択し、続いて Data Forward カテゴリの中から RocketMQ を選択します。

  2. Connector Name:システムが自動的にコネクター名を生成します。

  3. 接続情報を入力します:

    • Servers:接続したい RocketMQ サーバーのアドレスを入力します(例:localhost)。ホストエントリは Host[:Port] の形式です。ポート番号を省略した場合は RocketMQ のデフォルトポート 9876 が使用されます。
    • AccessKey:RocketMQ サーバーのアクセスキー。
    • SecretKey:RocketMQ サーバーのシークレットキー。
    • Security Token:RocketMQ サーバーのセキュリティトークン。
    • ビジネスニーズに応じて詳細設定を行います(任意)。
  4. Test ボタンをクリックします。RocketMQ サービスにアクセス可能であれば、connector available のメッセージが表示されます。

  5. New ボタンをクリックして作成を完了します。

続いて、このコネクターを基にデータブリッジルールを作成できます。

Create a Rule

このセクションでは、EMQX Cloud コンソールを使って RocketMQ ルールを作成し、ルールにアクションを追加する方法を説明します。

  1. ルールエリアで New Rule をクリックするか、先ほど作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。

  2. 利用したい機能に基づいて SQL Editor にルールを設定します。ここではクライアントが temp_hum/emqx トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーする例を示します。SQL は以下のようになります:

sql
  SELECT

   timestamp as up_timestamp,
   clientid as client_id,
   payload.temp as temp,
   payload.hum as hum

  FROM

   "temp_hum/emqx"

TIP

初心者の方は SQL ExamplesTry It Out をクリックして、SQL ルールの学習とテストを行うことをおすすめします。

  1. Next をクリックしてアクションを追加します。

  2. Connector ドロップダウンから先ほど作成したコネクターを選択します。

  3. EMQX Cloud から RocketMQ サービスへメッセージをパブリッシュするための情報を設定します:

    • RocketMQ topicTopicTest
    • Message Template:テンプレート。デフォルトは空です。空の場合はメッセージ全体が RocketMQ に保存されます。テンプレートはプレースホルダーを含む有効な文字列で、例:
      {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
  4. Advanced Settings を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを適宜設定します(任意)。

  5. Confirm ボタンをクリックしてルール作成を完了します。

  6. Successful new rule ポップアップで Back to Rules をクリックし、データ統合設定の一連の流れを完了します。

Test the Rule

温度・湿度データのレポートをシミュレートするために MQTTX の使用を推奨しますが、他のクライアントでも構いません。

  1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアント ID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. RocketMQ コンシューマのウィンドウに以下のような出力が表示されます。

    bash
    ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1711354009076, bornHost=/121.43.165.169:48850, storeTimestamp=1711354009085, storeHost=/118.178.124.161:10911, msgId=76B27CA100002A9F000000000000058D, commitLogOffset=1421, bodyCRC=1137462344, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1711354066863, CLUSTER=DefaultCluster}, body=[123, 34, 117, 112, 95, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 55, 49, 49, 51, 53, 52, 48, 48, 57, 48, 53, 54, 44, 34, 116, 101, 109, 112, 34, 58, 34, 50, 55, 46, 53, 34, 44, 34, 104, 117, 109, 34, 58, 34, 52, 49, 46, 56, 34, 44, 34, 99, 108, 105, 101, 110, 116, 95, 105, 100, 34, 58, 34, 109, 113, 116, 116, 120, 95, 97, 50, 97, 99, 102, 100, 49, 57, 34, 125], transactionId='null'}]]