MQTTデータをCouchbaseに取り込む
Couchbaseは、リレーショナルデータベース(SQLやACIDトランザクションなど)の利点とJSONの柔軟性を兼ね備えた多用途の分散データベースです。Couchbaseのアーキテクチャは高性能かつスケーラブルであり、ユーザープロファイル、動的な製品カタログ、生成AIアプリケーション、ベクター検索、キャッシュなど、多くの業界で広く利用されています。
本ページでは、EMQX CloudとCouchbaseの統合について包括的に紹介し、データ統合の作成および検証に関する実践的なガイダンスを提供します。
動作概要
Couchbaseデータ統合はEMQX Cloudの標準機能であり、MQTTのリアルタイムデータキャプチャと伝送能力をCouchbaseの強力なデータ処理能力と組み合わせることを目的としています。組み込みのルールエンジンコンポーネントを通じて、EMQX CloudからCouchbaseへのデータ取り込みを複雑なコーディングなしで簡素化します。
以下の図は、EMQX CloudとCouchbaseのデータ統合における典型的なアーキテクチャを示しています。

MQTTデータをCouchbaseに取り込むワークフローは以下の通りです。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQX Cloudに正常に接続し、機械、センサー、生産ラインの稼働状況、計測値、またはトリガーされたイベントに基づいてリアルタイムのMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジンでのマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、EMQX Cloudで定義されたルールに基づきルールエンジンで処理されます。これらのルールは事前定義された条件に基づき、どのメッセージをCouchbaseにルーティングすべきかを決定します。ペイロード変換が指定されている場合は、データフォーマットの変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの適切な変換が適用されます。
- Couchbaseへのデータ取り込み:ルールエンジンがCouchbaseに保存すべきメッセージを特定すると、そのメッセージをCouchbaseに転送するアクションをトリガーします。処理済みデータはシームレスにCouchbaseデータベースのデータセットに書き込まれます。
- データの保存と活用:Couchbaseに保存された後、企業は強力なクエリ機能を活用してさまざまなユースケースを支援できます。例えば、動的な製品カタログのシナリオでは、Couchbaseを用いて製品情報の効率的な管理と取得、リアルタイムの在庫更新のサポート、顧客へのパーソナライズされた推奨を提供し、購買体験の向上と売上増加を実現します。
特長と利点
Couchbaseデータ統合には以下の特長と利点があります。
- リアルタイムデータストリーム:EMQX Cloudはリアルタイムデータストリームの処理に最適化されており、ソースシステムからCouchbaseへの効率的かつ信頼性の高いデータ転送を保証します。これにより、組織はリアルタイムでデータをキャプチャし分析でき、即時の洞察とアクションが求められるユースケースに最適です。
- 高性能かつスケーラブル:EMQXの分散アーキテクチャとCouchbaseのカラムナー形式ストレージにより、データ量の増加に応じてシームレスにスケール可能です。大量データ処理時でも一貫したパフォーマンスと応答性を維持します。
- 柔軟なデータ変換:EMQX Cloudは強力なSQLベースのルールエンジンを提供し、Couchbaseに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、強化など多様なデータ変換機能をサポートし、組織のニーズに合わせたデータ調整を支援します。
- 簡単なデプロイと管理:EMQX Cloudはデータソースの設定、前処理ルール、Couchbase保存設定のためのユーザーフレンドリーなインターフェースを提供し、データ統合プロセスのセットアップおよび継続的な管理を簡素化します。
- 高度な分析機能:Couchbaseの強力なSQLベースのクエリ言語と複雑な分析機能のサポートにより、IoTデータから価値ある洞察を得ることができ、予測分析や異常検知などを促進します。
はじめる前に
このセクションでは、EMQX CloudでCouchbaseデータ統合を作成する前に必要な準備について説明します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
Couchbaseのインストール
CouchbaseはDockerを使ってインストールするか、Couchbase Cloudでサービスを作成する方法があります。
DockerでのCouchbaseインストール
Docker上でCouchbaseを実行する方法の詳細は、公式ドキュメントをご参照ください。
以下のコマンドでCouchbaseサーバーを起動します。
サーバーは以下のポートを開放している必要があります:8093(接続およびデータ挿入用)、8091(Web UIアクセス用)。bashdocker run -t --name db -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase/server:enterprise-7.2.0このコマンドを実行すると、DockerがCouchbaseサーバーをダウンロードおよびインストールします。CouchbaseサーバーがDocker仮想環境で起動すると、以下のメッセージが表示されます。
bashStarting Couchbase Server -- Web UI available at http://<ip>:8091 and logs available in /opt/couchbase/var/lib/couchbase/logsブラウザで http://x.x.x.x:8091 にアクセスし、Couchbase Webコンソールを開きます。
Setup New Clusterをクリックし、クラスタ名を入力します。開始を簡単にするため、管理者ユーザー名とパスワードをそれぞれ
adminとpasswordに設定してください。利用規約に同意し、Finish with Defaultsをクリックしてデフォルト設定で構成を完了します。
設定情報を入力後、右下のSave & Finishボタンをクリックします。これにより設定に基づいてサーバーがセットアップされ、Couchbase Webコンソールのダッシュボードが開きます。
左側のナビゲーションパネルでBucketsを選択し、ADD BUCKETボタンをクリックしてバケット名(例:
emqx)を入力し、Createをクリックしてバケットを作成します。デフォルトコレクション用のプライマリインデックスを作成します。
bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "create primary index on default:emqx._default._default;"
Couchbase Cloudでのサービス作成
Couchbase Cloudにログインします。
Couchbase UIを開き、OperationalページでCreate Clusterをクリックし、プロジェクトを選択します。
Create Clusterページでクラスタタイプを選択し、クラスタ名を入力、クラウドプロバイダーを選択し、その他のオプションはデフォルトのままにして続行しクラスタを作成します。
クラスタ作成後、Homeページに移動し、クラスタ名をクリックして管理画面に入り、Data ToolsページでBucket、Scope、Collectionを作成します。
Connectをクリックし、Public Connection Stringを確認して接続情報を控えます。
Cluster Accessにてクラスタアクセス名とパスワードを入力し認証を設定し、Bucket-Level Accessでステップ4で作成したバケットへの適切な権限を付与します。
Allowed IP Addressesに移動し、Add Allowed IPをクリックしてIPホワイトリストを追加します。
これでCouchbase Cloudインスタンスの作成が完了です。
Couchbaseコネクターの作成
データ統合のルールを作成する前に、Couchbaseサーバーにアクセスするためのコネクターを作成する必要があります。
デプロイメントメニューでデータ統合を選択し、データ永続化サービスカテゴリの中からCouchbaseサービスを選択します。すでに他のコネクターを作成している場合は、New Connectorをクリックし、同様にCouchbaseサービスを選択します。
Connector Nameはシステムが自動で名前を生成します。
接続情報を入力します。
- Server Address:サーバーのIPアドレスとポートを入力します。Couchbase Cloudインスタンスの場合は
couchbases://のプレフィックスを削除してください。デフォルトポートは18093で、TLSが有効である必要があります。 - 認証情報:Couchbaseのインストールに応じてUsernameとPasswordを入力します。
- 詳細設定(任意):詳細設定を参照してください。
- Server Address:サーバーのIPアドレスとポートを入力します。Couchbase Cloudインスタンスの場合は
Test Connectionボタンをクリックし、Couchbaseサービスに正常にアクセスできれば成功メッセージが表示されます。
Createボタンをクリックしてコネクターの作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータをCouchbaseに転送するアクションを追加します。
コネクター一覧のActions列にある新規ルールアイコンをクリックするか、Rules ListでNew RuleをクリックしてCreate New Rule作成画面に入ります。
SQLエディターにルールSQLを入力します。例えば、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信した際にルールエンジンをトリガーするには、以下のSQLを入力します。sqlSELECT timestamp, clientid, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
SQLに不慣れな場合は、SQL ExamplesやTry It OutをクリックしてルールSQLの学習や結果のテストが可能です。
Nextをクリックしてアクション作成に進みます。
Use Connectorドロップダウンから先ほど作成したコネクターを選択します。
SQLテンプレートに以下のコマンドを入力します。
bashinsert into emqx(key, value) values(${.clientid}, {"Timestamp": ${.timestamp}, "Temp": ${.temp}, "Hum": ${.hum}})必要に応じて詳細設定オプションを構成します(任意)。詳細は詳細設定をご覧ください。
Confirmボタンをクリックしてアクション設定を完了します。
成功メッセージのポップアップでReturn to Rule Listをクリックし、データ統合設定全体を完了します。
ルールのテスト
MQTTXを使って温度と湿度のデータ報告をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
パブリッシュボタンをクリックしてメッセージを送信します。Couchbaseサーバーの
emqxバケットにエントリが挿入されているはずです。以下のコマンドをターミナルで実行して確認できます。bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "SELECT * FROM emqx._default._default LIMIT 5;"正常に動作していれば、以下のような出力が得られます(
requestIDやメトリクスは異なる場合があります)。bash{ "requestID": "858b9a9a-986e-467f-b9ed-9d585bce43be", "signature": { "*": "*" }, "results": [ { "_default": { "Hum": "41.8", "Temp": "27.5", "Timestamp": 1727322935145 } } ], "status": "success", "metrics": { "elapsedTime": "2.662873ms", "executionTime": "2.590901ms", "resultCount": 1, "resultSize": 133, "serviceLoad": 12 } }EMQX Cloudコンソールで実行時データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに紐づくすべてのアクションの統計を実行時統計ページで閲覧できます。