MongoDBへのMQTTデータ取り込み
MongoDBは、スキーマ設計の柔軟性、スケーラビリティ、大量の構造化および半構造化データの保存能力で知られる主要なNoSQLデータベースです。EMQXとMongoDBを統合することで、ユーザーはMQTTメッセージやクライアントイベントを直接MongoDBに効率的に取り込むことができます。これにより、MongoDB内での長期的な時系列データの保存や高度なクエリ機能が可能になります。この統合は一方向のデータフローを保証し、EMQXからのMQTTメッセージがMongoDBデータベースに書き込まれます。この強力な組み合わせは、IoTデータを効果的に管理したい企業にとって堅実な基盤となります。
本ページでは、EMQXとMongoDB間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。
動作の仕組み
MongoDBデータ統合は、MQTTベースのIoTデータとMongoDBの強力なデータ保存機能をつなぐためにEMQXに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、EMQXからMongoDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQXとMongoDB間のデータ統合の典型的なアーキテクチャを示しています。

MongoDBへのMQTTデータ取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを介してEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQXで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、MongoDBにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、ペイロードへの追加コンテキスト付加などの変換が適用されます。
- MongoDBへのデータ取り込み:ルールエンジンがMongoDB保存対象のメッセージを特定すると、MongoDBへの転送アクションがトリガーされます。処理済みデータはMongoDBデータベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データがMongoDBに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに利用できます。例えば、接続車両分野では、車両の健康状態の把握、リアルタイム指標に基づくルート最適化、資産追跡などに活用できます。同様にIIoT環境では、機械の健康監視、メンテナンス予測、生産スケジュールの最適化に利用されます。
この統合システムを活用することで、電力・エネルギー分野の企業はグリッドの状態を継続的に監視し、需要予測や障害発生前の検知が可能になります。リアルタイムおよび履歴データから得られる価値は、運用効率の向上だけでなく、コスト削減や顧客体験の向上にもつながります。
特長と利点
MongoDBとのデータ統合は、効果的なデータ処理と保存を保証するために特化した多様な特長と利点を提供します:
IoTデータ管理の効率化
複雑な統合や面倒なデータ移行を排除し、IoTデータの取り込み、保存、処理、分析を一元的に行えます。データサイロを解消し、IoTデータの統合ビューを実現します。
リアルタイムデータ処理
EMQXはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMongoDBへの効率的かつ信頼性の高いデータ伝送を保証します。即時の洞察とアクションが求められるユースケースに理想的です。
柔軟なMongoDB接続オプション
単一のMongoDBインスタンスでもレプリカセットの堅牢性を活用する場合でも、ネイティブサポートを提供し、インフラ要件に応じて柔軟に対応可能です。
高性能かつスケーラブル
EMQXの分散アーキテクチャとMongoDBのカラムナリーストレージ形式により、データ量の増加に伴うスムーズなスケーラビリティを実現します。大規模データセットでも一貫したパフォーマンスと応答性を維持し、IoT展開の拡大に合わせてストレージ能力を拡張可能です。
柔軟なデータ変換
EMQXの強力なSQLベースのルールエンジンにより、MongoDB保存前のデータ前処理が可能です。フィルタリング、ルーティング、集約、エンリッチメントなど多様な変換手法をサポートし、ニーズに応じたデータ整形を実現します。
NoSQLの柔軟性
MongoDBのスキーマレスアーキテクチャにより、多様なMQTTメッセージ構造を厳格なスキーマなしで容易に保存でき、IoTデータの動的性質に対応します。
信頼性の高いデータ保存
EMQXルールエンジンがメッセージを処理・ルーティングした後、MongoDBに保存され、プラットフォームの実績ある信頼性によりデータ整合性と継続的な可用性が保証されます。
運用メトリクスと高度な分析
総メッセージ数、送信トラフィック率などのメトリクスを活用可能です。これらのメトリクスとMongoDBの強力なクエリ機能を組み合わせることで、データフローの監視、分析、最適化が可能となり、予測分析や異常検知などの価値ある洞察を得られます。
最新のMongoDBバージョン対応
データ統合は最新のMongoDBバージョンに対応しており、ユーザーは最新機能、最適化、セキュリティアップデートの恩恵を受けられます。
コスト効率
EMQXとMongoDBはともにオープンソースソリューションであり、商用ソリューションに比べてコスト効率が高いです。これにより、IoTプロジェクトの総所有コスト削減と投資収益率の向上に寄与します。
このMongoDBデータ統合は、デバイスから生成される膨大なデータを単に保存するだけでなく、将来的なクエリや分析に備えて準備することで、IoTインフラを強化します。セットアップの容易さと運用の卓越性は、IoTシステムの効率性と信頼性を大幅に向上させます。
はじめる前に
このセクションでは、EMQXダッシュボードでMongoDBデータ統合を作成する前に完了すべき準備について説明します。
前提条件
MongoDBサーバーのセットアップ
以下のコマンドを使用して、Docker経由でMongoDBをインストールし、Dockerイメージを起動し、ユーザーを作成できます。
# MongoDBのDockerイメージを起動し、パスワードをpublicに設定
docker run -d --name mongodb -p 27017:27017 mongo
# コンテナにアクセス
docker exec -it mongodb bash
# コンテナ内でMongoDBサーバーを起動(4.xバージョンでは `mongo` ではなく `mongosh` を使用)
mongosh
# ユーザー作成
use admin
db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })データベースの作成
以下のコマンドでMongoDBにデータベースとコレクションを作成できます。
# データベース emqx_data を作成
use emqx_data
# コレクション emqx_messages を作成
db.createCollection('emqx_messages')コネクターの作成
このセクションでは、MongoDB SinkをMongoDBサーバーに接続するためのコネクターの作成方法を示します。
以下の手順は、EMQXとMongoDBの両方をローカルマシンで実行していることを前提としています。MongoDBが別の場所にデプロイされている場合は、設定を適宜調整してください。
EMQXダッシュボードに入り、Integration -> Connectors をクリックします。
ページ右上の Create をクリックします。
Create Connector ページで MongoDB を選択し、Next をクリックします。
コネクターの名前を入力します。名前は大文字・小文字の英数字の組み合わせにしてください。例:
my_mongodbMongoDBサーバーの接続情報を設定します。必須項目(アスタリスク付き)を入力してください。
- MongoDB Mode:実際のMongoDBのデプロイモードに応じて接続タイプを選択します。この例では
singleを選択します。single:単一のスタンドアロンMongoDBインスタンスrs:同じデータセットを維持するmongodプロセスのグループであるレプリカセットsharded:MongoDBのシャーディングクラスター
- Server Host:
127.0.0.1:27017またはMongoDBサーバーがリモートの場合は実際のURLを入力 - Database Name:
emqx_dataを入力 - Write Mode:デフォルトの
unsafeのまま - Username:
adminを入力 - Password:
publicを入力 - Auth Source:ユーザー認証に使用するデータベース名を入力
- Use Legacy Protocol:MongoDBのレガシー通信プロトコルを使用するかを設定(MongoDB 3.6で新しいワイヤープロトコルが導入され、レガシープロトコルは後方互換のために残されています)。
true、false、autoのいずれかで設定可能。auto(デフォルト)では、EMQXがMongoDBのバージョンに基づき自動判別します。 - Srv Record:デフォルトで無効。有効にすると、DNS SRVレコードを使用してMongoDBホストを自動検出し、レプリカセットやシャーディングクラスターへの接続が容易になります。
- 暗号化接続を確立する場合は、Enable TLS のトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
- MongoDB Mode:実際のMongoDBのデプロイモードに応じて接続タイプを選択します。この例では
フォールバックアクション(オプション):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
詳細設定(オプション):詳細は詳細設定を参照してください。
Createをクリックする前に、Test Connectivity をクリックしてコネクターがMongoDBサーバーに接続できるかテストできます。
ページ下部の Create ボタンをクリックしてコネクター作成を完了します。ポップアップダイアログで Back to Connector List または Create Rule を選択し、ルールとSinkの作成を続行できます。詳細はルールとMongoDB Sinkの作成を参照してください。
MongoDB Sinkを用いたルールの作成
このセクションでは、ソースMQTTトピック t/# からのメッセージを処理し、処理済みデータをMongoDBに保存するルールの作成方法をダッシュボードで説明します。
EMQXダッシュボードで Integration -> Rules をクリックします。
ページ右上の Create をクリックします。
ルールIDに
my_ruleを入力し、SQL Editor にルールを設定します。トピックt/#のMQTTメッセージをMongoDBに保存したい場合、以下のSQL構文を使用できます。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT部分に含めてください。sqlSELECT * FROM "t/#"例えば、
timestampを日時型として保存し、payloadをJSON文字列として保存する場合は以下のSQL構文を使用できます:sqlSELECT *, mongo_date(timestamp) as timestamp, json_encode(payload) as payload FROM "t/#"注意:初心者の方は SQL Examples をクリックし、Enable Test を有効にしてSQLルールを学習・テストしてください。
- Add Action ボタンをクリックして、ルール発動時にトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをMongoDBに送信します。
Type of Action ドロップダウンリストから
MongoDBを選択します。Action ドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能ですが、この例では新規Sinkを作成します。Sinkの名前を入力します。名前は大文字・小文字の英数字の組み合わせにしてください。
Connector ドロップダウンから
my_mongodbを選択します。新規コネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメータの詳細はコネクターの作成を参照してください。Collection フィールドにデータを保存するコレクション名を入力します。プレースホルダー
${var_name}を用いた動的設定も可能です。この例ではemqx_messagesと入力します。Payload template を設定し、
clientid、topic、qos、timestamp、payloadをMongoDBに保存します。このテンプレートはMongoDBのinsertコマンドで実行され、サンプルコードは以下の通りです:json{ "clientid": "${clientid}", "topic": "${topic}", "qos": ${qos}, "timestamp": ${timestamp}, "payload": ${payload} }ペイロードテンプレート設定時の注意点:
すべての
keyはダブルクォーテーション"で囲む必要があります。値のデータ型の自動判別はサポートされていません:
- 文字列は
"で囲む必要があります。囲まないとエラーになります。 - 数値などは囲まないでください。囲むと文字列として認識されます。
- timestamp、date、time型は特別な処理がない場合、数値型または文字列型として扱われます。日付や時刻として保存するには、ルールSQLのMongoDBタイム関数を使用してください。(注:これらの関数の戻り値はMongoDBアクション専用で、他のアクションでは使用できません。)
- 文字列は
値がJSONオブジェクトの場合はネストされたオブジェクトが許可されます:
- テンプレート内で値を
"で囲んでネストすることはできません。実行エラーになります。 - オブジェクトは自身の構造に従ってネストされて保存されます。
- テンプレート内で値を
オブジェクトをJSON文字列として保存する場合は、ルールSQLの
json_encode関数で変換し、テンプレート内の対応する値は"で囲まないでください。
詳細設定(オプション):詳細は詳細設定を参照してください。
Create をクリックする前に、Test Connectivity をクリックしてSinkがMongoDBサーバーに接続可能かテストできます。
Create ボタンをクリックしてSink設定を完了します。新しいSinkが Action Outputs に追加されます。
Create Rule ページに戻り、設定内容を確認後、Create ボタンをクリックしてルールを生成します。
これでルールが正常に作成され、Rule ページに新しいルールが表示されます。Actions(Sink) タブをクリックすると、新しいMongoDB Sinkが確認できます。
また、Integration -> Flow Designer をクリックするとトポロジーが表示され、トピック t/# のメッセージがルール my_rule によって解析されMongoDBに送信・保存されている様子が確認できます。
ルールのテスト
ルールとSinkが期待通りに動作するかをテストするために、MQTTXを使ってクライアントをシミュレートし、EMQXにMQTTメッセージをパブリッシュできます。
MQTTXを使ってトピック
t/1にメッセージを送信します:bashmqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello MongoDB" }'Sinkの稼働状況を確認し、「Matched」が1件、「Sent Successfully」が1件増えていることを確認します。
メッセージがコレクション
emqx_messagesに書き込まれているか確認します:> db.emqx_messages.find().pretty() { "_id" : ObjectId("63db7059df489d01ed000009"), "clientid" : "emqx_c", "payload" : { "msg" : "hello MongoDB" }, "qos" : 0, "timestamp" : NumberLong("1675325529070"), "topic" : "t/1" }ルール設定で2番目のSQL構文を使用した場合、返される情報は以下のようになります:
> db.emqx_messages.find().pretty() { "_id" : ObjectId("63db7535df489d01ed000013"), "clientid" : "emqx_c", "payload" : "{ \"msg\": \"hello MongoDB\" }", "qos" : 0, "timestamp" : ISODate("2023-02-02T08:33:36.715Z"), "topic" : "t/1" }
詳細設定
このセクションでは、EMQX MongoDBコネクターおよびSinkの高度な設定オプションについて紹介します。コネクターやSinkの設定時に Advanced Settings を展開し、以下のパラメータをニーズに合わせて調整できます。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| Connect Timeout | EMQXがMongoDBへの接続確立を試みる際のタイムアウト時間。 | 30秒 |
| Socket Timeout | MongoDBとのソケット通信でデータ送受信を試みる際のタイムアウト時間。 | 30秒 |
| Max Overflow Workers | 既存のワーカーが全て占有されている場合に追加で作成可能なワーカー数。負荷急増時の同時接続数増加に重要。 | 0 |
| Wait Queue Timeout | MongoDB接続が利用可能になるまでワーカーがアイドル状態で待機可能な最大時間。 | 10秒 |
| Heartbeat Period | ドライバーがMongoDBの状態をチェックする間隔。ハートビート信号の送信頻度を制御。 | 200秒 |
| Minimum Heartbeat Period | ハートビート間の最短間隔。MongoDB状態チェックの過剰な頻度を防ぎ、効率的な通信を確保。 | 200秒 |
さらに詳しく
以下のリンクから詳細情報をご覧いただけます:
ブログ:
MQTTとMongoDB:IoTデータ管理のためのシームレスなシナジーの構築
レポート:
MQTTパフォーマンスベンチマークテスト:EMQX-MongoDB統合
動画: