# RabbitMQへのMQTTデータ取り込み

[RabbitMQ](https://www.rabbitmq.com/)は、Advanced Message Queuing Protocol（AMQP）を実装した広く利用されているオープンソースのメッセージブローカーです。分散システム間のメッセージングにおいて堅牢でスケーラブルなプラットフォームを提供します。EMQXプラットフォームはRabbitMQとの統合をサポートしており、MQTTメッセージやイベントをRabbitMQに転送できます。また、RabbitMQサーバーからデータを取得し、EMQXプラットフォームの特定のトピックにパブリッシュすることも可能で、RabbitMQからMQTTへのメッセージ配信を実現します。

本ページでは、EMQXプラットフォームとRabbitMQ間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。

## 動作概要

RabbitMQデータ統合は、EMQXプラットフォームに標準搭載された機能であり、MQTTベースのIoTデータとRabbitMQの強力なメッセージキュー処理機能を橋渡しします。組み込みの[ルールエンジン](./rules.md)コンポーネントを活用することで、EMQXプラットフォームからRabbitMQへのデータ取り込みをコード不要で簡単に実現できます。

以下の図は、EMQXプラットフォームとRabbitMQ間の典型的なデータ統合アーキテクチャを示しています。

![emqx-integration-rabbitmq](./_assets/emqx-integration-rabbitmq.png)

MQTTデータをRabbitMQに取り込む流れは以下の通りです：

1. **メッセージのパブリッシュと受信**：産業用IoTデバイスはMQTTプロトコルを通じてEMQXプラットフォームに正常に接続し、リアルタイムMQTTデータをパブリッシュします。EMQXプラットフォームがメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージ到着後、ルールエンジンを通過し、EMQXプラットフォームで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、RabbitMQにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
3. **RabbitMQへのメッセージ取り込み**：ルールによる処理完了後、メッセージをRabbitMQに転送するアクションがトリガーされます。処理済みメッセージはシームレスにRabbitMQに書き込まれます。
4. **データの永続化と活用**：RabbitMQはメッセージをキューに格納し、適切なコンシューマーに配信します。メッセージは他のアプリケーションやサービスによって消費され、データ分析、可視化、保存などのさらなる処理に利用されます。

## 特長と利点

RabbitMQとのデータ統合は、以下の特長とメリットをもたらします：

- **信頼性の高いIoTデータメッセージ配信**：EMQXプラットフォームはデバイスからクラウドへの信頼性の高い接続とメッセージ配信を保証し、RabbitMQはメッセージの永続化と異なるサービス間の信頼性ある配信を担い、全体のデータ信頼性を確保します。
- **MQTTメッセージの変換**：ルールエンジンを用いて、EMQXプラットフォームはMQTTメッセージのフィルタリングや変換を実施可能です。データ抽出、フィルタリング、強化、変換を経てRabbitMQに送信されます。
- **柔軟なメッセージマッピング**：RabbitMQデータ統合はMQTTトピックとRabbitMQのルーティングキーおよびエクスチェンジの柔軟なマッピングをサポートし、MQTTとRabbitMQ間のシームレスな連携を実現します。
- **高可用性とクラスター対応**：EMQXプラットフォームとRabbitMQは共に高可用性のメッセージブローカークラスター構築をサポートし、ノード障害時もサービス継続を保証します。クラスター機能により優れたスケーラビリティも提供されます。
- **高スループットシナリオでの処理能力**：RabbitMQデータ統合は同期および非同期書き込みモードをサポートし、レイテンシとスループットのバランスを柔軟に調整可能です。

## はじめる前に

このセクションでは、RabbitMQデータ統合の作成を始める前に必要な準備、RabbitMQサーバーの作成方法およびテスト用のエクスチェンジとキューの作成方法を説明します。

### 前提条件

- [データ統合](./introduction.md)の知識
- EMQXプラットフォームのデータ統合[ルール](./rules.md)の知識
- UNIXターミナルとコマンドの基本知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### RabbitMQサーバーの起動

ここでは、[Docker](https://www.docker.com/)を用いてRabbitMQサーバーを起動する方法を紹介します。

管理プラグインを有効にしたRabbitMQサーバーを起動するには、以下のコマンドを実行してください。管理プラグインにより、WebインターフェースでRabbitMQの状態を確認できます。

```bash
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
```

Docker Hubの[RabbitMQのDockerイメージ](https://hub.docker.com/_/rabbitmq)に関する詳細情報もご参照ください。

### メッセージ受信用のエクスチェンジとキューの作成

RabbitMQサーバー起動後、RabbitMQ管理Webインターフェースを使って、EMQXプラットフォームから転送されるメッセージを受信するためのテスト用エクスチェンジとキューを作成します。既にテスト用のエクスチェンジとキューがある場合は、このセクションはスキップ可能です。

1. Webブラウザで http://{ip address}:15672/ にアクセスし、RabbitMQ管理Webインターフェースを開きます。ログイン画面で以下のデフォルト認証情報を入力し、**Login**をクリックします。
   - **Username**：`guest`
   - **Password**：`guest`
2. 上部メニューの**Exchanges**タブをクリックします。**Add a new exchange**を展開し、以下の情報を入力します：

   - **Name**：`test_exchange`と入力
   - **Type**：ドロップダウンリストから`direct`を選択
   - **Durability**：`Durable`を選択し、エクスチェンジを永続化（RabbitMQ再起動後も存在）
   - **Auto delete**：`No`
   - **Internal**：`No`
   - **Arguments**：空欄のまま

3. **Add exchange**ボタンをクリックしてテスト用エクスチェンジを作成します。
4. 上部メニューの**Queues**タブをクリックします。**Add a new queue**を展開し、以下の情報を入力します：
   - **Type**：`Default for virtual host`
   - **Name**：`test_queue`と入力
   - **Durability**：`Durable`を選択し、キューを永続化
   - **Arguments**：空欄のまま
5. **Add queue**ボタンをクリックしてテスト用キューを作成します。新しい`test_queue`が**All queues**セクションに表示されます。
6. キュー名の**test_queue**をクリックして詳細ページを開きます。**Bindings**を展開し、**Add binding to this queue**セクションに以下を入力します：
   - **From exchange**：`test_exchange`と入力
   - **Routing key**：`test_routing_key`と入力
   - **Arguments**：空欄のまま
7. **Bind**ボタンをクリックして、`test_queue`を指定したルーティングキーで`test_exchange`にバインドします。

## コネクターの作成

データ統合ルールを作成する前に、RabbitMQサーバーにアクセスするためのRabbitMQコネクターを作成する必要があります。

1. デプロイメントに移動し、左ナビゲーションメニューから**Data Integration**をクリックします。初めてコネクターを作成する場合は、**Data Forward**カテゴリの下にある**RabbitMQ**を選択します。既にコネクターを作成済みの場合は、**New Connector**を選択し、続けて**Data Forward**カテゴリの**RabbitMQ**を選択します。
2. **Connector Name**：システムが自動的にコネクター名を生成します。
3. 接続情報を入力します：

   - **Server**：RabbitMQサーバーがローカルの場合は`localhost`、リモートの場合は実際のホスト名またはIPアドレスを入力
   - **Port**：通常は`5672`、異なる場合は実際のポート番号を入力
   - **Username**：`guest`
   - **Password**：`guest`
   - **Virtual Host**：RabbitMQの仮想ホスト。デフォルトは`/`
   - **Enable TLS**：暗号化接続を確立したい場合はトグルスイッチをオンにする
   - ビジネスニーズに応じて詳細設定を構成（任意）

4. **Test**ボタンをクリックします。RabbitMQサービスにアクセス可能であれば、**connector available**のプロンプトが返されます。
5. **New**ボタンをクリックして作成を完了します。

## プロデューサールールの作成

このセクションでは、RabbitMQプロデューサールールを作成し、EMQXプラットフォームコンソールからルールにアクションを追加する方法を示します。

1. ルールエリアで**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。
2. 使用する機能に基づき、**SQL Editor**でルールを設定します。ここでは、クライアントが`temp_hum/emqx`トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQLを記述します。

   ```sql
    SELECT
     timestamp, clientid, payload
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL Examples**をクリックし、**Enable Test**を有効にしてSQLルールの学習とテストを行うことを推奨します。

   :::

3. **Next**をクリックしてアクションを追加します。
4. **Connector**のドロップダウンボックスから先ほど作成したコネクターを選択します。
5. EMQXプラットフォームからRabbitMQサービスにメッセージをパブリッシュするための情報を設定します：

   - **Exchange**：前に作成した`test_exchange`を入力。メッセージはこのエクスチェンジにパブリッシュされます。
   - **Routing Key**：前に作成した`test_routing_key`を入力。RabbitMQでメッセージパブリッシュ時に使用されるルーティングキーです。
   - **Message Delivery Mode**のドロップダウンから`non_persistent`または`persistent`を選択：

     - `non_persistent`（デフォルト）：メッセージはディスクに永続化されず、RabbitMQの再起動やクラッシュ時に失われる可能性があります。

     - `persistent`：メッセージはディスクに永続化され、RabbitMQの再起動やクラッシュ時にも耐久性があります。

     ::: tip

     RabbitMQが再起動した場合にメッセージを失わないように、キューとエクスチェンジも永続化（Durable）に設定する必要があります。詳細はRabbitMQの[ドキュメント](https://www.rabbitmq.com/documentation.html)をご参照ください。

     :::

   - **Payload Template**：デフォルトは空文字列で、メッセージペイロードはJSON形式のテキストとしてRabbitMQにそのまま転送されます。

     プレースホルダーを使ってカスタムのメッセージペイロードフォーマットを定義することも可能です。例えば、MQTTメッセージのペイロードとタイムスタンプをRabbitMQメッセージに含めたい場合は、以下のテンプレートを使用します：

     ```json
     {"payload": "${payload}", "timestamp": ${timestamp}}
     ```

     このテンプレートは、転送されるMQTTメッセージのペイロードとタイムスタンプを含むJSON形式のメッセージを生成します。`${payload}`と`${timestamp}`はプレースホルダーで、実際のメッセージの値に置き換えられます。

   - **Wait for Publish Confirmations**：デフォルトで有効。メッセージがRabbitMQに正常にパブリッシュされたことを確認します。

     ::: tip

     このオプションを有効にすると、RabbitMQブローカーはメッセージの受信をアック（ACK）し、成功したパブリッシュとして扱うため、メッセージ配信の信頼性が向上します。

     :::

6. **Advanced Settings**を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します（任意）。
7. **Confirm**ボタンをクリックしてルール作成を完了します。
8. **Successful new rule**のポップアップで**Back to Rules**をクリックし、データ統合設定の一連の作業を完了します。

## プロデューサールールのテスト

温度と湿度のデータ報告をシミュレートするために、[MQTTX](https://mqttx.app/)の使用を推奨しますが、他のクライアントでも構いません。

1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - クライアントID：`test_client`
   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. アクションとルールが正常に作成されていれば、指定したルーティングキーでRabbitMQサーバーの指定エクスチェンジにメッセージがパブリッシュされているはずです。RabbitMQ管理コンソール（http://{ip address}:15672）にアクセスし、**Queues**セクションに移動してください。
3. メッセージが適切なキューにルーティングされていることを確認します。キューをクリックし、**Get Message(s)**ボタンを押して詳細なメッセージ内容を確認します。

   - ペイロード例：

     ```json
     {"payload":
      "{
      "temp": "27.5",
      "hum": "41.8"
       }",
       "timestamp": 1711333401673
     }
     ```

## RabbitMQソースルールの作成

このセクションでは、RabbitMQソースルールを作成し、RabbitMQソースから消費したメッセージをEMQXプラットフォームに転送してトピック`t/1`にパブリッシュするリパブリッシュアクションを追加する方法を示します。

1. ルールエリアで**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。
2. ルールIDに`my_rule_source`を入力します。
3. ルールをトリガーするソース（データ入力）を設定します。ページ右側の**Actions (Source)**タブをクリックし、**New Action (Source)**をクリックしてRabbitMQソースを作成します。
4. スライドパネルでソースタイプに**RabbitMQ (Source)**を選択し、**Next**をクリックして設定ステップに進みます。
5. **Connector**のドロップダウンから、先に作成した`my-rabbitmq`コネクターを選択します。ドロップダウン横の作成ボタンをクリックすると、ポップアップで新規コネクターを素早く作成可能です。必要な設定パラメータは[コネクターの作成](#コネクターの作成)を参照してください。
6. RabbitMQからEMQXプラットフォームへメッセージを消費するためのソース情報を設定します：

   - **Queue**：RabbitMQで先に作成したキュー名`message-send`を入力
   - **No Ack**：状況に応じて選択。`no_ack`モードでRabbitMQからメッセージを消費するかどうかを指定します。`no_ack`を有効にすると、RabbitMQはメッセージをコンシューマーが正常に処理したかに関わらず即座にキューから削除します。
   - **Wait for Publish Confirmations**：メッセージパブリッシャーのアックを使用する際に、RabbitMQからの確認を待つかどうかを指定します。

7. 高度な設定（任意）：デフォルト値を使用します。
8. **Confirm**ボタンをクリックしてソース作成を完了し、ルールのデータ入力に追加します。同時にルールSQLが以下のように変更されます。

    ```sql
    SELECT
    *
    FROM
    "$bridges/rabbitmq:my-rabbitmq-source"
    ```

    RabbitMQソースから以下のフィールドにアクセス可能であり、SQLを調整してデータ処理を行えます。ここではデフォルトSQLを使用します。

    | フィールド名  | 説明                                                        |
    | :------------ | :----------------------------------------------------------- |
    | payload       | RabbitMQメッセージの内容                                    |
    | event         | イベントトピック。形式は`$bridges/rabbitmq:<source name>`  |
    | metadata      | ルールID情報                                                |
    | timestamp     | メッセージがEMQXに到着したタイムスタンプ                    |
    | node          | メッセージが到着したEMQXノード名                            |
    | queue         | メッセージが消費されたキュー名                              |
    | exchange      | メッセージがルーティングされたエクスチェンジ名              |
    | routing_key   | エクスチェンジからキューへメッセージをルーティングする際のルーティングキー |

9. **Next**をクリックし、出力アクションを作成します。
10. 新しい出力アクションで**Republish**を選択します。
11. メッセージのリパブリッシュ設定を入力します：

    - **Topic**：MQTTにパブリッシュするトピック。ここでは`t/1`を入力。
    - **QoS**：`0`、`1`、`2`、または`${qos}`を選択。`${qos}`を選ぶと元のメッセージのQoSに従います。
    - **Retain**：`true`または`false`を選択。メッセージをリテインメッセージとしてパブリッシュするかを決定。プレースホルダーも使用可能。ここでは`false`を選択。
    - **Payload**：転送するメッセージペイロードのテンプレート。空欄の場合はルールの出力結果をそのまま転送。ここでは`${payload}`と入力し、ペイロードのみを転送。
    - **MQTT 5.0 Message Properties**：デフォルトで無効。詳細は[リパブリッシュアクションの追加](https://docs.emqx.com/en/emqx/latest/data-integration/rule-get-started.html#add-republish-action)を参照。

12. その他の設定はデフォルトのままにし、**Confirm**ボタンをクリックして出力アクションの作成を完了します。

作成成功後、**New Rule**ページに戻ります。**Rules**リストに新規作成したルールが表示されます。リパブリッシュアクションは現在**Actions (Source)**には表示されません。必要に応じてルール編集ボタンをクリックすると、ルール設定の下部にリパブリッシュ出力アクションが確認できます。

## RabbitMQソースルールのテスト

1. [MQTTX CLI](https://mqttx.app/cli)を使用してトピック`t/1`をサブスクライブします：

   ```bash
   mqttx sub -t t/1
   ```

2. 以下のコマンドでRabbitMQにメッセージをパブリッシュできます：

   ```bash
   rabbitmqadmin --username=guest --password=guest \
        publish routing_key=message-send \
        payload="{ \"msg\": \"Hello EMQX\"}"
   ```

   - `publish`はメッセージをパブリッシュするコマンドです。
   - `routing_key=message-send`オプションはメッセージのルーティングキーを設定します。この例ではキュー名をルーティングキーとして使用しています。
   - `payload="{ \"msg\": \"Hello EMQX\"}"`オプションはメッセージの内容を設定します。

   または、RabbitMQ管理インターフェースからもメッセージをパブリッシュ可能です：

   1. 上部メニューの**Queues**タブをクリックします。
   2. **Name**列の**message-send**をクリックして詳細ページを開きます。
   3. **Publish message**を展開し、**Payload**ボックスに`"Hello EMQX"`と入力して、**Publish message**ボタンをクリックします。

3. MQTTXで以下のような出力が確認できます：

   ```bash
   [2024-2-23] [16:59:28] › payload: {"payload":{"msg":"Hello EMQX"},"event":"$bridges/rabbitmq:my-rabbitmq-source","metadata":{"rule_id":"rule_0ly1"},"timestamp":1708678768449,"node":"emqx@127.0.0.1"}
   ```
