Skip to content

MongoDBへのMQTTデータ取り込み

MongoDBは、スキーマ設計の柔軟性、スケーラビリティ、大量の構造化および半構造化データの保存能力で知られる主要なNoSQLデータベースです。EMQX PlatformとMongoDBを統合することで、ユーザーはMQTTメッセージやクライアントイベントを直接MongoDBに効率的に取り込むことができます。これにより、MongoDB内での長期的な時系列データの保存や高度なクエリ機能が可能になります。この統合は一方向のデータフローを保証し、EMQX PlatformからのMQTTメッセージがMongoDBデータベースに書き込まれます。この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。

本ページでは、EMQX PlatformとMongoDB間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。

動作概要

MongoDBデータ統合は、MQTTベースのIoTデータとMongoDBの強力なデータ保存機能をつなぐためにEMQX Platformに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQX PlatformからMongoDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとMongoDB間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Platform MongoDBデータ統合

MQTTデータをMongoDBに取り込む流れは以下の通りです。

  1. メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQX Platformに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQX Platformで定義されたルールに基づいて処理されます。ルールは事前定義された条件に基づき、MongoDBにルーティングすべきメッセージを判別します。ペイロード変換を指定するルールがある場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. MongoDBへのデータ取り込み:ルールエンジンがMongoDBへの保存対象メッセージを特定すると、メッセージをMongoDBに転送するアクションをトリガーします。処理済みデータはMongoDBのコレクションにシームレスに書き込まれます。
  4. データの保存と活用:データがMongoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、接続車両の分野では、保存されたデータを用いて車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などが可能です。同様にIIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に利用されます。

この統合システムを活用することで、電力・エネルギー分野の企業はグリッドの健全性を継続的に監視し、需要予測や潜在的な障害の早期検知が可能になります。リアルタイムおよび履歴データから得られる価値は、運用効率の向上だけでなく、コスト削減や顧客体験の強化にもつながります。

特長とメリット

MongoDBとのデータ統合は、効果的なデータ処理と保存を実現するために設計された多彩な特長とメリットを提供します。

  • IoTデータ管理の効率化

    複雑な統合や面倒なデータ移行を排除し、IoTデータの取り込み、保存、処理、分析を一元的に行えます。データサイロを解消し、IoTデータの統合ビューを実現します。

  • リアルタイムデータ処理

    EMQX Platformはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMongoDBへの効率的かつ信頼性の高いデータ伝送を保証します。即時の洞察やアクションが必要なユースケースに適しています。

  • 柔軟なMongoDB接続オプション

    単一インスタンスのMongoDBからレプリカセットの堅牢な構成まで、両方の接続をネイティブにサポートし、インフラ要件に応じた柔軟な対応が可能です。

  • 高性能かつスケーラブル

    EMQXの分散アーキテクチャとMongoDBのカラム型ストレージ形式により、データ量の増加に伴うスケーラビリティをシームレスに実現します。大規模データセットでも一貫したパフォーマンスと応答性を維持します。

  • 柔軟なデータ変換

    EMQX Platformの強力なSQLベースのルールエンジンにより、MongoDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様な変換機能をサポートし、ニーズに応じたデータ整形が可能です。

  • NoSQLの利点

    MongoDBのスキーマレスアーキテクチャにより、多様なMQTTメッセージ構造を厳格なスキーマなしで容易に保存でき、IoTデータの動的な性質に対応します。

  • 信頼性の高いデータ保存

    EMQX Platformのルールエンジンがメッセージを処理・ルーティングした後、MongoDBに保存され、プラットフォームの実績ある信頼性によりデータの整合性と継続的な可用性が保証されます。

  • 運用指標と高度な分析

    総メッセージ数、送信トラフィック率などの指標を取得可能です。これらの指標とMongoDBの強力なクエリ機能を組み合わせることで、データフローの監視、分析、最適化が可能となり、予測分析や異常検知などの高度な活用を支援します。

  • 最新のMongoDBバージョン対応

    最新バージョンのMongoDBと互換性があり、最新機能、最適化、セキュリティアップデートの恩恵を受けられます。

このMongoDBデータ統合は、デバイスから生成される膨大なデータを単に保存するだけでなく、将来のクエリや分析に備えて準備することで、IoTインフラの強化に貢献します。セットアップの容易さと運用の優秀性により、IoTシステムの効率と信頼性を大幅に向上させます。

はじめる前に

このセクションでは、EMQX PlatformでMongoDBデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

DockerでMongoDBをインストール

以下のコマンドでDocker経由でMongoDBをインストールし、コンテナを起動し、ユーザーを作成できます。

bash
# MongoDBのDockerイメージを起動し、パスワードをpublicに設定
docker run -d --name mongodb -p 27017:27017 mongo

# コンテナにアクセス
docker exec -it mongodb bash

# コンテナ内でMongoDBサーバーに接続
mongosh

# ユーザー作成
use admin
db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })

# データベースemqx_dataを作成
use emqx_data

# コレクションemqx_messagesを作成
db.createCollection('emqx_messages')

MongoDBコネクターの作成

データ統合ルールを作成する前に、MongoDBサーバーにアクセスするためのMongoDBコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューからData Integrationをクリックします。

  2. 初めてコネクターを作成する場合は、Data Persistenceカテゴリの下にあるMongoDBを選択します。既にコネクターを作成済みの場合は、New Connectorを選択し、続いてData Persistenceカテゴリの下のMongoDBを選択します。

  3. Connector name:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • MongoDB Mode:実際のデプロイメントモードに基づき接続するMongoDBの種類を選択します。このデモでは例としてsingleを選択できます。
      • single:単一のスタンドアロンMongoDBインスタンス。
      • rs:同じデータセットを維持する複数のmongodプロセスからなるレプリカセット。
      • sharded:MongoDBのシャーディングクラスター。
    • Server Host:サーバーのIPアドレスとポート。
    • Database Nameemqx_dataを入力。
    • Write Mode:デフォルトのunsafeを保持。
    • Usernameadminを入力。
    • Passwordpublicを入力。
    • Auth Source:ユーザー認証に関連付けられたデータベース名を入力。
    • Use Legacy Protocol:MongoDBのレガシー通信プロトコルを使用するかどうかを指定(MongoDBはバージョン3.6で新しいワイヤープロトコルを導入し、後方互換性のためレガシープロトコルを保持)。truefalseautoのいずれかを設定可能で、auto(デフォルト)ではEMQXがMongoDBのバージョンに応じて自動判別します。
    • Srv Record:デフォルトで無効。これを有効にすると、EMQX PlatformがDNS SRVレコードを使用して接続すべきMongoDBホストを検出でき、レプリカセットやシャーディングクラスターへの接続が容易になります。
    • 暗号化接続を確立したい場合は、Enable TLSトグルスイッチをオンにします。
  5. 詳細設定(任意):詳細設定を参照してください。

  6. Testボタンをクリックし、MongoDBサービスにアクセスできる場合は成功メッセージが表示されます。

  7. Newボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをMongoDBに転送するアクションをルールに追加するためのルールを作成します。

  1. ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。

  2. SQL editorにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時間up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードを読み取ります。また、このトピックから温度と湿度を取得します。

    sql
      SELECT
        timestamp as up_timestamp,
        clientid as client_id,
        payload.temp as temp,
        payload.hum as hum
        FROM
      "temp_hum/emqx"

    TIP

    初心者の方は、SQL ExamplesEnable TestをクリックしてSQLルールの学習とテストを行うことをおすすめします。

  3. Nextをクリックしてアクションを追加します。

  4. Connectorドロップダウンから先ほど作成したコネクターを選択します。

  5. Payload templateを設定し、client_idup_timestamptemphumをMongoDBに保存します。このテンプレートはMongoDBのinsertコマンドで実行され、サンプルコードは以下の通りです。

    json
    {
      "client_id": "${client_id}",
      "timestamp": ${up_timestamp},
      "temp": ${temp},
      "hum": ${hum}
    }

    ペイロードテンプレート設定時の注意点:

    • すべてのkeyはダブルクォーテーション"で囲む必要があります。
    • 値のデータ型の自動判別はサポートされていません。
      • 文字列は"で囲まないとエラーになります。
      • 数値などの値は"で囲まないでください。囲むと文字列として認識されます。
      • タイムスタンプ、日付、時間型は特別な処理をしないと数値または文字列として認識されます。日付や時間として保存したい場合は、ルールSQLのmongo_date関数でフィールドを処理してください。
    • 値がJSONオブジェクトの場合はネストされたオブジェクトが許可されます。
      • テンプレート内で値を"で囲んでネストすることはできません。実行エラーになります。
      • オブジェクトは自身の構造に従ってネストされ保存されます。
    • オブジェクトをJSON文字列として保存したい場合は、ルールSQLのjson_encode関数で変換し、テンプレート内の対応する"で囲まないでください。
  6. 詳細設定(任意):詳細設定を参照してください。

  7. Confirmボタンをクリックしてルール作成を完了します。

  8. Successful new ruleのポップアップでBack to Rulesをクリックし、データ統合設定の一連の作業を完了します。

ルールのテスト

温度と湿度のデータ報告をシミュレートするために、MQTTXの使用を推奨しますが、他のクライアントでも可能です。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. データダンプ結果を確認します。

bash
emqx_data> db.temp_hum.find()
[
  {
    _id: ObjectId('65fba3d0ac7ad8048e000001'),
    client_id: 'test_client',
    hum: 41.8,
    temp: 27.5,
    timestamp: 1710990288
  }
]
  1. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計を確認できます。