# MongoDBへのMQTTデータ取り込み

[MongoDB](https://www.mongodb.com/)は、スキーマ設計の柔軟性、スケーラビリティ、大量の構造化および半構造化データの保存能力で知られる主要なNoSQLデータベースです。EMQX PlatformとMongoDBを統合することで、ユーザーはMQTTメッセージやクライアントイベントを直接MongoDBに効率的に取り込むことができます。これにより、MongoDB内での長期的な時系列データの保存や高度なクエリ機能が可能になります。この統合は一方向のデータフローを保証し、EMQX PlatformからのMQTTメッセージがMongoDBデータベースに書き込まれます。この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。

本ページでは、EMQX PlatformとMongoDB間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。

## 動作概要

MongoDBデータ統合は、MQTTベースのIoTデータとMongoDBの強力なデータ保存機能をつなぐためにEMQX Platformに標準搭載された機能です。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQX PlatformからMongoDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとMongoDB間のデータ統合の典型的なアーキテクチャを示しています。

![EMQX Platform MongoDBデータ統合](./_assets/data_integration_mongodb.png)

MQTTデータをMongoDBに取り込む流れは以下の通りです。

1. **メッセージのパブリッシュと受信**：接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQX Platformに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージが到着すると、ルールエンジンを通過し、EMQX Platformで定義されたルールに基づいて処理されます。ルールは事前定義された条件に基づき、MongoDBにルーティングすべきメッセージを判別します。ペイロード変換を指定するルールがある場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
3. **MongoDBへのデータ取り込み**：ルールエンジンがMongoDBへの保存対象メッセージを特定すると、メッセージをMongoDBに転送するアクションをトリガーします。処理済みデータはMongoDBのコレクションにシームレスに書き込まれます。
4. **データの保存と活用**：データがMongoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、接続車両の分野では、保存されたデータを用いて車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などが可能です。同様にIIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に利用されます。

この統合システムを活用することで、電力・エネルギー分野の企業はグリッドの健全性を継続的に監視し、需要予測や潜在的な障害の早期検知が可能になります。リアルタイムおよび履歴データから得られる価値は、運用効率の向上だけでなく、コスト削減や顧客体験の強化にもつながります。

## 特長とメリット

MongoDBとのデータ統合は、効果的なデータ処理と保存を実現するために設計された多彩な特長とメリットを提供します。

- **IoTデータ管理の効率化**

  複雑な統合や面倒なデータ移行を排除し、IoTデータの取り込み、保存、処理、分析を一元的に行えます。データサイロを解消し、IoTデータの統合ビューを実現します。

- **リアルタイムデータ処理**

  EMQX Platformはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMongoDBへの効率的かつ信頼性の高いデータ伝送を保証します。即時の洞察やアクションが必要なユースケースに適しています。

- **柔軟なMongoDB接続オプション**

  単一インスタンスのMongoDBからレプリカセットの堅牢な構成まで、両方の接続をネイティブにサポートし、インフラ要件に応じた柔軟な対応が可能です。

- **高性能かつスケーラブル**

  EMQXの分散アーキテクチャとMongoDBのカラム型ストレージ形式により、データ量の増加に伴うスケーラビリティをシームレスに実現します。大規模データセットでも一貫したパフォーマンスと応答性を維持します。

- **柔軟なデータ変換**

  EMQX Platformの強力なSQLベースのルールエンジンにより、MongoDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様な変換機能をサポートし、ニーズに応じたデータ整形が可能です。

- **NoSQLの利点**

  MongoDBのスキーマレスアーキテクチャにより、多様なMQTTメッセージ構造を厳格なスキーマなしで容易に保存でき、IoTデータの動的な性質に対応します。

- **信頼性の高いデータ保存**

  EMQX Platformのルールエンジンがメッセージを処理・ルーティングした後、MongoDBに保存され、プラットフォームの実績ある信頼性によりデータの整合性と継続的な可用性が保証されます。

- **運用指標と高度な分析**

  総メッセージ数、送信トラフィック率などの指標を取得可能です。これらの指標とMongoDBの強力なクエリ機能を組み合わせることで、データフローの監視、分析、最適化が可能となり、予測分析や異常検知などの高度な活用を支援します。

- **最新のMongoDBバージョン対応**

  最新バージョンのMongoDBと互換性があり、最新機能、最適化、セキュリティアップデートの恩恵を受けられます。

このMongoDBデータ統合は、デバイスから生成される膨大なデータを単に保存するだけでなく、将来のクエリや分析に備えて準備することで、IoTインフラの強化に貢献します。セットアップの容易さと運用の優秀性により、IoTシステムの効率と信頼性を大幅に向上させます。

## はじめる前に

このセクションでは、EMQX PlatformでMongoDBデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解
- [MongoDB](https://www.mongodb.com/)の知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### DockerでMongoDBをインストール

以下のコマンドでDocker経由でMongoDBをインストールし、コンテナを起動し、ユーザーを作成できます。

```bash
# MongoDBのDockerイメージを起動し、パスワードをpublicに設定
docker run -d --name mongodb -p 27017:27017 mongo

# コンテナにアクセス
docker exec -it mongodb bash

# コンテナ内でMongoDBサーバーに接続
mongosh

# ユーザー作成
use admin
db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })

# データベースemqx_dataを作成
use emqx_data

# コレクションemqx_messagesを作成
db.createCollection('emqx_messages')
```

<!-- TODO: MongoDB Atlasを使ったMongoDBサービスの作成 -->

## MongoDBコネクターの作成

データ統合ルールを作成する前に、MongoDBサーバーにアクセスするためのMongoDBコネクターを作成する必要があります。

1. デプロイメントに移動し、左側ナビゲーションメニューから**Data Integration**をクリックします。
2. 初めてコネクターを作成する場合は、**Data Persistence**カテゴリの下にある**MongoDB**を選択します。既にコネクターを作成済みの場合は、**New Connector**を選択し、続いて**Data Persistence**カテゴリの下の**MongoDB**を選択します。
3. **Connector name**：システムが自動的にコネクター名を生成します。
4. 接続情報を入力します：

   - **MongoDB Mode**：実際のデプロイメントモードに基づき接続するMongoDBの種類を選択します。このデモでは例として`single`を選択できます。
     - `single`：単一のスタンドアロンMongoDBインスタンス。
     - `rs`：同じデータセットを維持する複数の`mongod`プロセスからなるレプリカセット。
     - `sharded`：MongoDBのシャーディングクラスター。
   - **Server Host**：サーバーのIPアドレスとポート。
   - **Database Name**：`emqx_data`を入力。
   - **Write Mode**：デフォルトの`unsafe`を保持。
   - **Username**：`admin`を入力。
   - **Password**：`public`を入力。
   - **Auth Source**：ユーザー認証に関連付けられたデータベース名を入力。
   - **Use Legacy Protocol**：MongoDBのレガシー通信プロトコルを使用するかどうかを指定（MongoDBはバージョン3.6で新しいワイヤープロトコルを導入し、後方互換性のためレガシープロトコルを保持）。`true`、`false`、`auto`のいずれかを設定可能で、`auto`（デフォルト）ではEMQXがMongoDBのバージョンに応じて自動判別します。
   - **Srv Record**：デフォルトで無効。これを有効にすると、EMQX PlatformがDNS SRVレコードを使用して接続すべきMongoDBホストを検出でき、レプリカセットやシャーディングクラスターへの接続が容易になります。
   - 暗号化接続を確立したい場合は、**Enable TLS**トグルスイッチをオンにします。

5. 詳細設定（任意）：[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-mongodb.html#advanced-configurations)を参照してください。
6. **Test**ボタンをクリックし、MongoDBサービスにアクセスできる場合は成功メッセージが表示されます。
7. **New**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータをMongoDBに転送するアクションをルールに追加するためのルールを作成します。

1. ルールエリアで**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。

2. **SQL editor**にルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時間`up_timestamp`、クライアントID、`temp_hum/emqx`トピックのペイロードを読み取ります。また、このトピックから温度と湿度を取得します。

   ```sql
     SELECT
       timestamp as up_timestamp,
       clientid as client_id,
       payload.temp as temp,
       payload.hum as hum
       FROM
     "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL Examples**と**Enable Test**をクリックしてSQLルールの学習とテストを行うことをおすすめします。

   :::

3. **Next**をクリックしてアクションを追加します。

4. **Connector**ドロップダウンから先ほど作成したコネクターを選択します。

5. **Payload template**を設定し、`client_id`、`up_timestamp`、`temp`、`hum`をMongoDBに保存します。このテンプレートはMongoDBのinsertコマンドで実行され、サンプルコードは以下の通りです。

   ```json
   {
     "client_id": "${client_id}",
     "timestamp": ${up_timestamp},
     "temp": ${temp},
     "hum": ${hum}
   }
   ```

   ペイロードテンプレート設定時の注意点：

   - すべての`key`はダブルクォーテーション`"`で囲む必要があります。
   - 値のデータ型の自動判別はサポートされていません。
     - 文字列は`"`で囲まないとエラーになります。
     - 数値などの値は`"`で囲まないでください。囲むと文字列として認識されます。
     - タイムスタンプ、日付、時間型は特別な処理をしないと数値または文字列として認識されます。日付や時間として保存したい場合は、ルールSQLの`mongo_date`関数でフィールドを処理してください。
   - 値がJSONオブジェクトの場合はネストされたオブジェクトが許可されます。
     - テンプレート内で値を`"`で囲んでネストすることはできません。実行エラーになります。
     - オブジェクトは自身の構造に従ってネストされ保存されます。
   - オブジェクトをJSON文字列として保存したい場合は、ルールSQLの`json_encode`関数で変換し、テンプレート内の対応する**値**は`"`で囲まないでください。

6. 詳細設定（任意）：[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-mongodb.html#advanced-configurations)を参照してください。

7. **Confirm**ボタンをクリックしてルール作成を完了します。

8. **Successful new rule**のポップアップで**Back to Rules**をクリックし、データ統合設定の一連の作業を完了します。

## ルールのテスト

温度と湿度のデータ報告をシミュレートするために、[MQTTX](https://mqttx.app/)の使用を推奨しますが、他のクライアントでも可能です。

1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. データダンプ結果を確認します。

```bash
emqx_data> db.temp_hum.find()
[
  {
    _id: ObjectId('65fba3d0ac7ad8048e000001'),
    client_id: 'test_client',
    hum: 41.8,
    temp: 27.5,
    timestamp: 1710990288
  }
]
```

3. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計を確認できます。
