MQTTデータをCockroachDB Cloudに取り込む
CockroachDBは、高可用性、スケーラビリティ、および複数のノードやリージョンにまたがる強い一貫性を実現する分散型SQLデータベースです。データベースインフラの展開と保守の複雑さを軽減しつつ、従来のSQLデータベースの利点を提供します。CockroachDBは、分散環境において強い一貫性とACIDトランザクションを必要とするアプリケーションに特に適しています。EMQX CloudとCockroachDBを統合することで、MQTTメッセージやイベントデータを効率的にCockroachDBに取り込み、信頼性の高い保存と高度な分析を実現できます。
動作の仕組み
CockroachDBをEMQX Cloudに統合することで、MQTTのリアルタイムデータ収集機能とCockroachDBの堅牢な分散SQLアーキテクチャを組み合わせています。EMQX Cloudの組み込みルールエンジンにより、MQTTメッセージのルーティング、処理、CockroachDBへの効率的な保存がコードの複雑さなしに簡素化されます。
以下の図は、EMQX CloudとCockroachDB間のデータ統合の典型的なアーキテクチャを示しています。

MQTTデータをCockroachDBに取り込む流れは以下の通りです:
メッセージのパブリッシュと受信: 産業用IoTデバイスがMQTTプロトコルを介してEMQX Cloudに接続し、稼働状態、センサーの読み取り値、またはトリガーされたイベントに基づくリアルタイムデータを送信します。EMQX CloudはこれらのMQTTメッセージを受信し、さらなる処理と保存のための重要なデータを取得します。
メッセージデータの処理: メッセージ受信後、EMQX CloudはSQLベースのルールエンジンを通じてメッセージをルーティングします。ルールエンジンは、受信メッセージを事前定義されたルールと照合し、どのメッセージをCockroachDBに送信するかを判断します。必要に応じて、フィルタリング、集約、またはペイロードの付加情報追加などのデータ変換も行い、CockroachDBに適した形式に変換します。
CockroachDBへのデータ取り込み: ルールエンジンが特定のメッセージをCockroachDBに保存する必要があると判断すると、処理済みデータをデータベースに挿入するアクションをトリガーします。CockroachDBの強い一貫性とACID特性により、分散環境下でもすべてのトランザクションが信頼性を持ち、一貫したデータが保たれます。
データの保存と活用: データがCockroachDBに安全に保存されることで、企業は強力なSQLクエリ機能を活用してさまざまな用途に利用できます。例えば、温度センサー、GPSトラッカー、産業機器から収集されたIoTデータをCockroachDBに保存し、予知保全、運用監視、意思決定のためのリアルタイム分析を可能にします。
特長とメリット
CockroachDBとEMQX Cloudの統合は、データの送信、保存、活用を最適化する以下の特長とメリットを提供します:
- リアルタイムデータストリーミング: EMQX Cloudはリアルタイムデータストリームの処理に最適化されており、IoTデバイスからCockroachDBへの高速かつ信頼性の高いデータ送信を保証します。この機能は、製造業やエネルギー分野でのIoTデバイスの監視・制御など、リアルタイムの洞察が必要なアプリケーションに最適です。
- 高可用性とスケーラビリティ: CockroachDBの分散アーキテクチャにより、データは複数のノードに複製され、高可用性とフォールトトレランスを実現します。データ量が増加しても水平スケールにより一貫したパフォーマンスを維持します。
- 柔軟なデータ変換: EMQX CloudのSQLベースのルールエンジンは多様なデータ変換機能を提供し、CockroachDBへの取り込み前に不要なデータのフィルタリング、センサー読み取り値の集約、ペイロードへの追加情報付加などの前処理が可能です。
- 強い一貫性とACIDトランザクション: CockroachDBは分散環境でもACID準拠の強い一貫性を保証し、金融システムや重要インフラ管理などトランザクションの整合性が求められるアプリケーションに適しています。
- 簡単なデプロイと管理: EMQX Cloudは直感的なインターフェースを提供し、データソースの設定、前処理ルールの構築、CockroachDBの保存設定を簡単に行えます。これにより、データ統合パイプラインの構築と管理の複雑さを軽減します。
- 高度な分析: CockroachDBの強力なSQLクエリ機能により、IoTデータに対して高度な分析が可能です。大規模データセットに対する複雑なクエリの実行により、異常検知、トレンド分析、予測モデリングなどの価値ある洞察を得られます。
- 既存システムとの統合: CockroachDBは豊富なSQL機能をサポートし、既存のデータツールやシステムと互換性があります。企業はIoTデータをビジネスインテリジェンスツール、ダッシュボード、機械学習プラットフォームなどの他のエンタープライズアプリケーションと容易に統合し、より深い洞察を得られます。
はじめる前に
このセクションでは、EMQX CloudコンソールでCockroachDB Cloudのデータ統合を作成する前に必要な準備について説明します。
前提条件
Cockroach Cloudサービスの開始
Cockroach Labsでアカウントを作成し、新しいプロジェクトとクラスターを作成します。
Cockroach Labsのクイックスタートガイドに従ってクラスターを作成します。
重要なお知らせ
VPC PeeringおよびAWS PrivateLinkは、Standard Deployments以上でのみサポートされています。
Cockroach Cloudクラスターを作成したら、Cockroach Cloudコンソールの左ナビゲーションメニューからSQL Shellをクリックします。ここでSQLクエリを実行できます。
テーブルを作成するために以下のSQLクエリを実行します。例:
sqlCREATE TABLE sensor_data ( id SERIAL PRIMARY KEY, sensor_id STRING NOT NULL, temperature FLOAT, humidity FLOAT, timestamp TIMESTAMPTZ DEFAULT now(), created_at TIMESTAMPTZ DEFAULT now() );クエリ実行後、DatabasesセクションのTablesタブに戻り、ページを更新すると新しいテーブルが一覧に表示されます。
PrivateLinkの設定
CockroachDB Cloudに接続する前に、EMQX CloudでPrivateLinkを作成し、CockroachDB CloudでPrivateLinkエンドポイントサービスを設定して、EMQXのデプロイメントとCockroachDB Cloudサーバー間のプライベートネットワーク接続を確立する必要があります。
EMQX Cloudコンソールにログインし、対象のデプロイメントの概要ページに移動します。
Network Managementに移動し、PrivateLinkセクションで**"+ Private Connection"**ボタンをクリックします。
プロンプトを確認し、Next Stepをクリックします。
CockroachDB Cloudプラットフォームにログインし、クラスターを開きます。左ナビゲーションメニューからNetworkingをクリックします。
Private endpointを見つけてAdd a private endpointをクリックします。次にService nameを見つけてメモしておきます。

EMQX CloudにService nameをエンドポイントサービス名として入力し、Create Private Connectionをクリックします。Endpoint IDが表示されるまで待ちます。

CockroachDB Cloudプラットフォームに戻り、EMQX CloudからのEndpoint IDをPrivate Endpointsに追加し、説明(任意)を入力してCreate Endpointをクリックします。
デプロイメント概要ページで接続状態がRunningに変わるまで待ちます。
「success」メッセージが表示されたら、CockroachDB CloudのPrivate endpointsセクションで新しく設定したEndpoint IDが表示されていることを確認します。これでEMQX CloudとCockroachDB Cloud間のPrivateLinkが正常に確立されたことを示します。
コネクターの作成
データ統合ルールを作成する前に、CockroachDB CloudサーバーにアクセスするためのCockroachDBコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからData Integrationをクリックします。初めてコネクターを作成する場合は、Data Persistenceカテゴリの下にあるPostgreSQLを選択します。すでにコネクターを作成している場合は、New Connectorを選択し、続けてData PersistenceカテゴリのPostgreSQLを選択します。
New Connectorページで以下の接続情報を設定します:
Connector Name:システムが自動的にコネクター名を生成します。
Server Host:CockroachDBクラスターのサーバーホストを入力します。
CockroachDB Cloudコンソールの右上にあるConnectをクリックします。PrivateLinkを確立している場合は接続タイプとしてPrivate connectionを選択し、画像のハイライト部分をコピーして貼り付けます。

Database Name:データベース名を入力します。例:
defaultdb(Cockroach Cloudクラスター起動時のデフォルトデータベース)。Username and Password:CockroachDB Cloudデプロイメント作成時に設定したユーザー名とパスワードを入力します。
Enable TLS:トグルスイッチをクリックしてTLS接続を有効にします。SNIテキストボックスにサーバーホストを入力します。
Testボタンをクリックします。Cockroach Cloudサービスにアクセス可能であれば、コネクターが利用可能である旨のプロンプトが返されます。
Newボタンをクリックして作成を完了します。
ルールの作成
このセクションでは、EMQX CloudコンソールでCockroachDBルールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアのNew Ruleをクリックするか、作成したばかりのコネクターのActions列にある新規ルールアイコンをクリックします。
利用したい機能に基づいてSQL Editorでルールを設定します。クライアントが
sensor/#トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするには、以下のSQLを使用できます:sqlSELECT payload.sensor_id AS sensor_id, payload.temperature AS temperature, payload.humidity AS humidity, payload.timestamp AS timestamp FROM "sensor/#"Nextをクリックしてアクションを追加します。
Connectorドロップダウンボックスから先ほど作成したコネクターを選択します。
SQLテンプレートに以下のコマンドを入力します(ルールエンジンを使用して、入力SQL文中の文字列が適切にエスケープされ、SQLインジェクション攻撃を防止できるようにしてください):
sqlINSERT INTO public.sensor_data( id, sensor_id, temperature, humidity, "timestamp", created_at ) VALUES ( DEFAULT, ${sensor_id}, ${temperature}, ${humidity}, TO_TIMESTAMP(${timestamp} / 1000), DEFAULT );高度な設定(任意)を行います。
Confirmボタンをクリックしてルール作成を完了します。
Successful new ruleポップアップでBack to Rulesをクリックし、データ統合設定の一連の作業を完了します。
ルールのテスト
温度と湿度のデータ報告をシミュレートするには、MQTTXの使用を推奨しますが、他の任意のクライアントでも可能です。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
sensor/#クライアントID:
test_clientペイロード:
json{ "sensor_id": "sensor_001", "temperature": 25.5, "humidity": 60.0, "timestamp": 1674392048000 }
送信ボタンをクリックしてメッセージを送信します。Cockroach Cloudサーバーの
defaultdbデータベース内のsensor_dataテーブルにエントリが挿入されているはずです。