MQTTデータをCouchbaseに取り込む
Couchbaseは、リレーショナルデータベース(SQLやACIDトランザクションなど)の利点とJSONの柔軟性を兼ね備えた多用途の分散データベースです。Couchbaseのアーキテクチャは高性能かつスケーラブルであり、ユーザープロファイル、動的な製品カタログ、生成AIアプリケーション、ベクトル検索、キャッシュなど、さまざまな業界で広く利用されています。
本ページでは、EMQX PlatformとCouchbaseの統合について包括的に紹介し、データ統合の作成および検証に関する実践的なガイダンスを提供します。
動作概要
Couchbaseデータ統合はEMQX Platformに標準搭載された機能であり、MQTTのリアルタイムデータ取得・送信機能とCouchbaseの強力なデータ処理機能を組み合わせることを目的としています。組み込みのルールエンジンコンポーネントを通じて、EMQX PlatformからCouchbaseへのデータ取り込みを複雑なコーディングなしで簡素化します。
以下の図は、EMQX PlatformとCouchbaseのデータ統合における典型的なアーキテクチャを示しています。
MQTTデータをCouchbaseに取り込むワークフローは以下の通りです。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQX Platformに正常に接続し、機械やセンサー、生産ラインの稼働状況や計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQX Platformにパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジンでのマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、EMQX Platformに定義されたルールに基づきルールエンジンで処理されます。ルールは事前定義された条件に基づいて、どのメッセージをCouchbaseにルーティングするかを決定します。ペイロードの変換を指定するルールがある場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの適切な処理が適用されます。
- Couchbaseへのデータ取り込み:ルールエンジンがCouchbaseに保存すべきメッセージを特定すると、そのメッセージをCouchbaseに転送するアクションをトリガーします。処理済みデータはシームレスにCouchbaseデータベースのデータセットに書き込まれます。
- データの保存と活用:Couchbaseに保存された後、企業は強力なクエリ機能を活用してさまざまなユースケースを支援できます。例えば、動的な製品カタログのシナリオでは、Couchbaseを用いて製品情報の効率的な管理・取得、リアルタイムの在庫更新、顧客へのパーソナライズされた推奨を実現し、購買体験の向上と売上増加に貢献します。
特長と利点
Couchbaseデータ統合の特長と利点は以下の通りです。
- リアルタイムデータストリーム:EMQX Platformはリアルタイムデータストリームの処理に最適化されており、ソースシステムからCouchbaseへの効率的かつ信頼性の高いデータ転送を実現します。これにより、即時の洞察やアクションが必要なユースケースに最適なリアルタイム分析が可能です。
- 高性能かつスケーラブル:EMQXの分散アーキテクチャとCouchbaseのカラムナストレージ形式により、データ量の増加に応じてシームレスにスケールアウト可能です。大量データの処理時でも一貫したパフォーマンスと応答性を維持します。
- 柔軟なデータ変換:EMQX Platformは強力なSQLベースのルールエンジンを提供し、Couchbaseに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機能をサポートし、ニーズに応じたデータ整形を支援します。
- 簡単なデプロイと管理:EMQX Platformはデータソースの設定、前処理ルール、Couchbase保存設定をユーザーフレンドリーなインターフェースで提供し、データ統合のセットアップと運用管理を簡素化します。
- 高度な分析機能:Couchbaseの強力なSQLベースのクエリ言語と複雑な分析機能のサポートにより、IoTデータから価値ある洞察を得ることができ、予測分析や異常検知などを促進します。
はじめる前に
このセクションでは、EMQX PlatformでCouchbaseデータ統合を作成する前の準備事項を説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Couchbaseのインストール
CouchbaseはDockerでのインストール、またはCouchbase Cloudでのサービス作成を選択できます。
DockerでのCouchbaseインストール
Docker上でCouchbaseを実行する方法は、公式ドキュメントをご参照ください。
以下のコマンドでCouchbaseサーバーを起動します。
サーバーは以下のポートを開放している必要があります:8093(接続およびデータ挿入用)、8091(Web UIアクセス用)。bashdocker run -t --name db -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase/server:enterprise-7.2.0
このコマンドを実行すると、DockerがCouchbaseサーバーをダウンロードおよびインストールします。Docker仮想環境内でCouchbaseサーバーが起動すると、以下のメッセージが表示されます。
bashStarting Couchbase Server -- Web UI available at http://<ip>:8091 and logs available in /opt/couchbase/var/lib/couchbase/logs
ブラウザで http://x.x.x.x:8091 にアクセスし、Couchbase Webコンソールを開きます。
Setup New Clusterをクリックし、クラスター名を入力します。開始を簡単にするために、管理者ユーザー名とパスワードをそれぞれ
admin
とpassword
に設定します。利用規約に同意し、Finish with Defaultsをクリックしてデフォルト値で設定を完了します。
設定情報を入力後、右下のSave & Finishボタンをクリックします。これにより設定が反映され、Couchbase Webコンソールのダッシュボードが開きます。
左側のナビゲーションパネルでBucketsを選択し、ADD BUCKETボタンをクリックしてバケット名(例:
emqx
)を入力し、Createをクリックしてバケットを作成します。デフォルトコレクションに対してプライマリインデックスを作成します。
bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "create primary index on default:emqx._default._default;"
Couchbase Cloudでのサービス作成
Couchbase Cloudにログインします。
Couchbase UIを開き、OperationalページでCreate Clusterをクリックし、プロジェクトを選択します。
クラスター作成ページでクラスタータイプを選択し、クラスター名を入力、クラウドプロバイダーを選択し、その他のオプションはデフォルトのままにして続行し、クラスターを作成します。
クラスター作成後、Homeページでクラスター名をクリックして管理画面に入り、Data ToolsページでBucket、Scope、Collectionを作成します。
Connectをクリックし、Public Connection Stringを確認して接続用に控えます。
Cluster Accessにて、認証用のCluster access nameとPasswordを入力し、Bucket-Level Accessで4で作成したBucketに適切な権限を付与します。
Allowed IP Addressesに移動し、Add Allowed IPをクリックしてIPホワイトリストを追加します。
これでCouchbase Cloudインスタンスの作成が完了です。
Couchbaseコネクターの作成
データ統合のルールを作成する前に、CouchbaseサーバーにアクセスするためのCouchbaseコネクターを作成する必要があります。
デプロイメントメニューでData Integrationを選択し、データ永続化サービスカテゴリの中からCouchbaseサービスを選択します。すでに他のコネクターを作成している場合は、New Connectorをクリックし、同様にCouchbaseサービスを選択します。
Connector Nameはシステムが自動的に生成します。
接続情報を入力します。
- Server Address:サーバーのIPアドレスとポートを入力します。Couchbase Cloudインスタンスの場合は
couchbases://
のプレフィックスを除去してください。デフォルトポートは18093
で、TLSが有効である必要があります。 - Authentication Information:Couchbaseのインストールに応じてUsernameとPasswordを設定します。
- Advanced Settings(任意):詳細設定を参照してください。
- Server Address:サーバーのIPアドレスとポートを入力します。Couchbase Cloudインスタンスの場合は
Test Connectionボタンをクリックし、Couchbaseサービスに正常にアクセスできると成功メッセージが表示されます。
Createボタンをクリックしてコネクターの作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータをCouchbaseに転送するアクションを追加します。
コネクター一覧のActions列にある新規ルールアイコンをクリックするか、Rules ListでNew RuleをクリックしてCreate New Rule作成画面に入ります。
SQLエディターにルールのSQLを入力します。例えば、クライアントが
temp_hum/emqx
トピックに温度と湿度のメッセージを送信した際にルールエンジンをトリガーするには、以下のSQLを入力します。sqlSELECT timestamp, clientid, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
SQLに不慣れな場合は、SQL ExamplesやEnable TestをクリックしてルールSQLの学習や結果のテストが可能です。
Nextをクリックしてアクション作成を開始します。
Use Connectorのドロップダウンから、先に作成したコネクターを選択します。
SQLテンプレートに以下のコマンドを入力します。
bashinsert into emqx(key, value) values(${.clientid}, {"Timestamp": ${.timestamp}, "Temp": ${.temp}, "Hum": ${.hum}})
必要に応じて詳細設定オプションを構成します(任意)。詳細は詳細設定を参照してください。
Confirmボタンをクリックしてアクション設定を完了します。
成功メッセージのポップアップでReturn to Rule Listをクリックし、データ統合設定を完了します。
ルールのテスト
温度・湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
パブリッシュボタンをクリックしてメッセージを送信します。Couchbaseサーバーの
emqx
バケットにエントリーが挿入されているはずです。以下のコマンドをターミナルで実行して確認できます。bashdocker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "SELECT * FROM emqx._default._default LIMIT 5;"
正常に動作していれば、以下のような出力が得られます(
requestID
やメトリクスは異なる場合があります)。bash{ "requestID": "858b9a9a-986e-467f-b9ed-9d585bce43be", "signature": { "*": "*" }, "results": [ { "_default": { "Hum": "41.8", "Temp": "27.5", "Timestamp": 1727322935145 } } ], "status": "success", "metrics": { "elapsedTime": "2.662873ms", "executionTime": "2.590901ms", "resultCount": 1, "resultSize": 133, "serviceLoad": 12 } }
EMQX Platformコンソールでランタイムデータを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに紐づくすべてのアクションの実行統計ページが表示されます。