MongoDBへのMQTTデータ取り込み
MongoDBは、柔軟なスキーマ設計、スケーラビリティ、大量の構造化および半構造化データの保存能力で知られる主要なNoSQLデータベースです。EMQX CloudとMongoDBを統合することで、ユーザーはMQTTメッセージやクライアントイベントを直接MongoDBに効率的に取り込むことができます。これにより、MongoDB内での長期的な時系列データの保存や高度なクエリが可能になります。この統合は一方向のデータフローを保証し、EMQX CloudからのMQTTメッセージがMongoDBデータベースに書き込まれます。この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。
本ページでは、EMQX CloudとMongoDB間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を提供します。
動作の仕組み
MongoDBデータ統合は、MQTTベースのIoTデータとMongoDBの強力なデータ保存機能をつなぐためにEMQX Cloudに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQX CloudからMongoDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。
以下の図は、EMQXとMongoDB間のデータ統合の典型的なアーキテクチャを示しています。

MongoDBへのMQTTデータ取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQX Cloudに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQX Cloudで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、MongoDBにルーティングすべきメッセージを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
- MongoDBへのデータ取り込み:ルールエンジンがMongoDB保存対象のメッセージを特定すると、メッセージをMongoDBに転送するアクションがトリガーされます。処理済みデータはMongoDBデータベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データがMongoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、接続車両の分野では、保存されたデータを利用して車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などが可能です。同様にIIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に活用できます。
この統合システムを利用することで、電力・エネルギー分野の企業はグリッドの状態を継続的に監視し、需要予測や潜在的な停電の早期検知が可能になります。リアルタイムおよび履歴データから得られる価値は、運用効率の向上だけでなく、コスト削減や顧客体験の向上にもつながります。
特長とメリット
MongoDBとのデータ統合は、効果的なデータ処理と保存を確実にするための多彩な特長とメリットを提供します:
IoTデータ管理の効率化
複雑な統合や面倒なデータ移行を排除し、IoTデータの取り込み、保存、処理、分析を一元的に行えます。データのサイロ化を解消し、IoTデータの統合ビューを実現します。
リアルタイムデータ処理
EMQX Cloudはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMongoDBへの効率的かつ信頼性の高いデータ伝送を保証します。即時の洞察とアクションが求められるユースケースに最適です。
柔軟なMongoDB接続オプション
単一のMongoDBインスタンスからレプリカセットの堅牢な構成まで、両方の接続をネイティブにサポートし、インフラニーズに応じた柔軟な対応が可能です。
高性能かつスケーラブル
EMQXの分散アーキテクチャとMongoDBのカラム型ストレージ形式により、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を維持します。IoT展開の拡大に伴い、データ保存能力も容易に拡張できます。
柔軟なデータ変換
EMQX Cloudは強力なSQLベースのルールエンジンを提供し、MongoDBに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様な変換機構をサポートし、組織のニーズに合わせてデータを整形可能です。
NoSQLの柔軟性
MongoDBのスキーマレスアーキテクチャにより、多様なMQTTメッセージ構造を堅牢なスキーマなしで容易に保存でき、IoTデータの動的な性質に対応します。
信頼性の高いデータ保存
EMQX Cloudのルールエンジンがメッセージを処理・ルーティングした後、MongoDBに保存され、プラットフォームの実績ある信頼性によりデータの整合性と継続的な可用性が保証されます。
運用指標と高度な分析
総メッセージ数、送信トラフィック率などの指標を取得可能です。これらの指標とMongoDBの強力なクエリ機能を組み合わせることで、データフローの監視、分析、最適化が可能となり、予測分析や異常検知などの高度な活用を支援します。
最新MongoDBバージョン対応
データ統合はMongoDBの最新バージョンに対応しており、ユーザーはデータベースプラットフォームの最新機能、最適化、セキュリティアップデートを享受できます。
このMongoDBデータ統合により、デバイスが生成する膨大なデータを単に保存するだけでなく、将来のクエリや分析に備えて活用可能な状態に保つことができます。セットアップの容易さと運用の優秀性は、IoTシステムの効率性と信頼性を大幅に向上させます。
はじめる前に
このセクションでは、EMQX CloudでMongoDBデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
DockerでのMongoDBインストール
以下のコマンドを使用してDocker経由でMongoDBをインストールし、Dockerイメージを起動し、ユーザーを作成できます。
# MongoDBのDockerイメージを起動し、パスワードをpublicに設定
docker run -d --name mongodb -p 27017:27017 mongo
# コンテナにアクセス
docker exec -it mongodb bash
# コンテナ内でMongoDBサーバーを起動
mongosh
# ユーザー作成
use admin
db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })
# データベースemqx_dataを作成
use emqx_data
# コレクションemqx_messagesを作成
db.createCollection('emqx_messages')MongoDBコネクターの作成
データ統合ルールを作成する前に、MongoDBサーバーにアクセスするためのMongoDBコネクターを作成する必要があります。
デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるMongoDBを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのMongoDBを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- MongoDBモード:実際のデプロイメントモードに基づき接続するMongoDBのタイプを選択します。このデモでは例として
singleを選択できます。single:単一のスタンドアロンMongoDBインスタンス。rs:同じデータセットを維持するmongodプロセスのグループであるレプリカセット。sharded:MongoDBのシャーディングクラスター。
- サーバーホスト:サーバーのIPアドレスとポート。
- データベース名:
emqx_dataを入力。 - 書き込みモード:デフォルト値の
unsafeを保持。 - ユーザー名:
adminを入力。 - パスワード:
publicを入力。 - 認証ソース:ユーザー資格情報に関連付けられたデータベース名を入力。
- レガシープロトコルを使用:MongoDBのレガシー通信プロトコルを使用するかどうかを決定します(MongoDBはバージョン3.6で新しいワイヤープロトコルを導入し、後方互換性のためにレガシープロトコルを残しています)。この設定は
true、false、autoのいずれかに設定可能です。autoモード(デフォルト)では、EMQXが検出したMongoDBバージョンに基づき自動的に使用するプロトコルを判断します。 - Srvレコード:デフォルトで無効です。有効にすると、EMQX CloudがDNS SRVレコードを使用して接続すべきMongoDBホストを検出できるようになり、レプリカセットやシャーディングクラスターへの接続が容易になります。
- 暗号化接続を確立したい場合は、TLSを有効にするトグルスイッチをクリックします。
- MongoDBモード:実際のデプロイメントモードに基づき接続するMongoDBのタイプを選択します。このデモでは例として
詳細設定(任意):詳細設定を参照してください。
テストボタンをクリックします。MongoDBサービスにアクセス可能な場合、成功メッセージが表示されます。
新規ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをMongoDBに転送するアクションをルールに追加する必要があります。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時刻
up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードを読み取ります。また、このトピックから温度と湿度を取得しています。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
ペイロードテンプレートを設定し、
client_id、up_timestamp、temp、humをMongoDBに保存します。このテンプレートはMongoDBのinsertコマンドで実行され、サンプルコードは以下の通りです:json{ "client_id": "${client_id}", "timestamp": ${up_timestamp}, "temp": ${temp}, "hum": ${hum} }ペイロードテンプレート設定時の注意点:
すべての
キーはダブルクォーテーション"で囲む必要があります。値のデータ型の自動判別はサポートされていません:
- 文字列は
"で囲む必要があり、囲まないとエラーになります。 - 数値などの値は囲まないでください。囲むと文字列として認識されます。
- タイムスタンプ、日付、時間型は特別な処理をしないと数値または文字列として認識されます。日付や時間として保存するには、ルールSQL内で
mongo_date関数を使用してフィールドを処理してください。
- 文字列は
値がJSONオブジェクトの場合はネストされたオブジェクトが許可されます:
- テンプレート内で値を
"で囲んでネストすることはできません。実行エラーの原因となります。 - オブジェクトは自身の構造に従ってネストされて保存されます。
- テンプレート内で値を
オブジェクトをJSON文字列として保存したい場合は、ルールSQLで
json_encode関数を使って変換し、テンプレート内の対応する値は"で囲まないでください。
詳細設定(任意):詳細設定を参照してください。
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度・湿度データの報告をシミュレートするために、MQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
データダンプ結果を確認します。
emqx_data> db.temp_hum.find()
[
{
_id: ObjectId('65fba3d0ac7ad8048e000001'),
client_id: 'test_client',
hum: 41.8,
temp: 27.5,
timestamp: 1710990288
}
]- コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、そのルールの統計情報およびルール配下のすべてのアクションの統計を確認できます。