# **CockroachDB Cloud に MQTT データを取り込む**

[CockroachDB](https://www.cockroachlabs.com/product/cloud/) は、高可用性、スケーラビリティ、および複数のノードやリージョンにまたがる強い一貫性を実現する分散型 SQL データベースです。データベースインフラの展開と管理の複雑さを軽減しつつ、従来の SQL データベースの利点を提供します。CockroachDB は、分散環境での強い一貫性と ACID トランザクションを必要とするアプリケーションに特に適しています。EMQX プラットフォームと CockroachDB を統合することで、MQTT メッセージやイベントデータを効率的に CockroachDB に取り込み、信頼性の高い保存と高度な分析を実現できます。

## 動作概要

CockroachDB と EMQX プラットフォームの統合は、MQTT のリアルタイムデータ収集機能と CockroachDB の分散 SQL アーキテクチャの堅牢性を組み合わせたものです。EMQX プラットフォームに組み込まれたルールエンジンがデータ取り込みプロセスを簡素化し、MQTT メッセージを効率的にルーティング、処理、保存することが可能で、複雑なコーディングは不要です。

以下の図は、EMQX プラットフォームと CockroachDB 間のデータ統合の典型的なアーキテクチャを示しています。

![data_integration_cockroachdb](./_assets/data_integration_cockroachdb.png)

MQTT データを CockroachDB に取り込む流れは以下の通りです：

1. **メッセージのパブリッシュと受信:** 産業用 IoT デバイスが MQTT プロトコルを介して EMQX プラットフォームに接続し、稼働状態、センサーの読み取り値、またはトリガーイベントに基づくリアルタイムデータを送信します。EMQX プラットフォームはこれらの MQTT メッセージを受信し、さらなる処理と保存に必要な重要なデータを取得します。

2. **メッセージデータの処理:** メッセージ受信後、EMQX プラットフォームは SQL ベースのルールエンジンを通じてメッセージをルーティングします。ルールエンジンは、受信メッセージを事前定義されたルールと照合し、どのメッセージを CockroachDB に送信すべきかを判断します。必要に応じて、フィルタリング、集約、エンリッチメントなどのデータ変換も行い、CockroachDB に適した形式に変換します。

3. **CockroachDB へのデータ取り込み:** ルールエンジンが特定のメッセージを CockroachDB に保存する必要があると判断すると、処理済みデータのデータベースへの挿入アクションをトリガーします。CockroachDB の強い一貫性と ACID 特性により、分散環境下でもすべてのトランザクションが信頼性を持ち、一貫したデータが維持されます。

4. **データの保存と活用:** CockroachDB に安全に保存されたデータは、強力な SQL クエリ機能を活用してさまざまな用途に利用できます。例えば、温度センサー、GPS トラッカー、産業機器から収集した IoT データを CockroachDB に保存し、予知保全、運用監視、意思決定のためのリアルタイム分析を実現できます。

## 特長と利点

CockroachDB と EMQX プラットフォームの統合は、データの送信、保存、活用を最適化する以下の特長と利点を提供します：

- **リアルタイムデータストリーミング:** EMQX プラットフォームはリアルタイムデータストリームの処理に最適化されており、IoT デバイスから CockroachDB への高速かつ信頼性の高いデータ送信を保証します。製造業やエネルギー分野での IoT デバイスの監視や制御など、リアルタイムの洞察が求められるアプリケーションに最適です。
- **高可用性とスケーラビリティ:** CockroachDB の分散アーキテクチャにより、データは複数のノードに複製され、高可用性とフォールトトレランスを実現します。データ量が増加しても、CockroachDB は水平スケーリングにより一貫したパフォーマンスを維持します。
- **柔軟なデータ変換:** EMQX プラットフォームの SQL ベースのルールエンジンは、多様なデータ変換機能を提供します。組織は CockroachDB への取り込み前に不要なデータのフィルタリング、センサー読み取り値の集約、ペイロードの追加コンテキストによるエンリッチメントなどの前処理を行えます。
- **強い一貫性と ACID トランザクション:** CockroachDB は強い一貫性を保証し、分散環境でもすべてのトランザクションが ACID 準拠であることを保証します。金融システムや重要インフラ管理システムなど、トランザクションの整合性が求められるアプリケーションに適しています。
- **簡単なデプロイと管理:** EMQX プラットフォームは、データソースの設定、前処理ルールの作成、CockroachDB ストレージ設定の管理を直感的なインターフェースで提供します。これにより、データ統合パイプラインの構築と維持が簡素化され、システム管理の複雑さを軽減します。
- **高度な分析:** CockroachDB の強力な SQL クエリ機能により、IoT データに対して高度な分析を実行できます。大規模データセットに対する複雑なクエリを実行することで、異常検知、トレンド分析、予測モデリングなどの貴重な洞察を得られます。
- **既存システムとの統合:** CockroachDB は豊富な SQL 機能をサポートしており、既存のデータツールやシステムとの互換性があります。組織は IoT データをビジネスインテリジェンスツール、ダッシュボード、機械学習プラットフォームなどの他のエンタープライズアプリケーションと容易に統合し、より深い洞察を得られます。

## はじめる前に

このセクションでは、EMQX プラットフォームコンソールで CockroachDB Cloud データ統合を作成する前に必要な準備について説明します。

### 前提条件

- [データ統合](https://github.com/emqx/cloud-docs/blob/master/en_US/data_integration/introduction.md) に関する知識
- データ統合の [ルール](https://github.com/emqx/cloud-docs/blob/master/en_US/data_integration/rules.md) に関する知識

### Cockroach Cloud サービスの開始

1. [Cockroach Labs](https://www.cockroachlabs.com/) でアカウントを作成し、新しいプロジェクトとクラスターを作成します。
2. Cockroach Labs の [クイックスタートガイド](https://www.cockroachlabs.com/docs/cockroachcloud/create-your-cluster) に従ってクラスターを作成します。
   
    ::: tip 重要なお知らせ
    
    [VPC ピアリング](https://www.cockroachlabs.com/docs/cockroachcloud/network-authorization#vpc-peering) と [AWS PrivateLink](https://www.cockroachlabs.com/docs/cockroachcloud/network-authorization#aws-privatelink) は、Standard Deployments 以上でのみサポートされています。
    
    :::
3. Cockroach Cloud クラスターを作成したら、Cockroach Cloud コンソールの左側ナビゲーションメニューから **SQL Shell** をクリックします。SQL クエリを実行できるインターフェースが開きます。
4. テーブルを作成するために以下の SQL クエリを実行します。例：
      
    ```sql
    CREATE TABLE sensor_data (
        id SERIAL PRIMARY KEY,
        sensor_id STRING NOT NULL,
        temperature FLOAT,
        humidity FLOAT,
        timestamp TIMESTAMPTZ DEFAULT now(),
        created_at TIMESTAMPTZ DEFAULT now()
    );
    ```
    
5. クエリ実行後、**Databases** セクションの **Tables** タブに戻り、ページを更新すると新しいテーブルが表示されます。

### PrivateLink の設定

CockroachDB Cloud への接続前に、EMQX プラットフォームで PrivateLink を作成し、CockroachDB Cloud で PrivateLink エンドポイントサービスを設定して、EMQX デプロイメントと CockroachDB Cloud サーバー間のプライベートネットワーク接続を確立する必要があります。

1. [EMQX Platform コンソール](https://cloud-intl.emqx.com/console/) にログインし、対象のデプロイメントの概要ページに移動します。
2. **Network Management** に移動し、**PrivateLink** セクションの **"+ Private Connection"** ボタンをクリックします。
3. プロンプトを確認し、**Next Step** をクリックします。
4. CockroachDB Cloud プラットフォームにログインし、クラスターを開きます。左側ナビゲーションメニューから **Networking** をクリックします。
5. **Private endpoint** を見つけ、**Add a private endpoint** をクリックします。続いて **Service name** を確認し、後で使用するために保存します。

    ![cockroach_image](./_assets/cockroach_image.png)

6. EMQX プラットフォームに **Service name** をエンドポイントサービス名として入力し、**Create Private Connection** をクリックします。**Endpoint ID** が表示されるまで待ちます。

    ![cockroach_image 1](./_assets/cockroach_image 1.png)

7. CockroachDB Cloud プラットフォームに戻り、EMQX プラットフォームからの **Endpoint ID** を **Private Endpoints** に追加し、説明（任意）を入力して **Create Endpoint** をクリックします。
8. デプロイメント概要ページで接続状態が **Running** に変わるまで待ちます。
9. 「成功」メッセージが表示されたら、CockroachDB Cloud の **Private endpoints** セクションで新しく設定した Endpoint ID が表示されていることを確認します。これにより、EMQX プラットフォームと CockroachDB Cloud 間の PrivateLink が正常に確立されたことがわかります。

## コネクターの作成

データ統合ルールを作成する前に、CockroachDB Cloud サーバーにアクセスするための CockroachDB コネクターを作成する必要があります。

1. デプロイメントに移動し、左側ナビゲーションメニューから **Data Integration** をクリックします。初めてコネクターを作成する場合は、**Data Persistence** カテゴリの下の **PostgreSQL** を選択します。すでにコネクターを作成している場合は、**New Connector** を選択し、続けて **Data Persistence** カテゴリの下の **PostgreSQL** を選択します。

2. **New Connector** ページで以下の接続情報を設定します：
    - **Connector Name**：システムが自動的にコネクター名を生成します。
    - **Server Host**：CockroachDB クラスターのサーバーホストを入力します。
      
      CockroachDB Cloud コンソールの右上にある **Connect** をクリックします。プライベートリンクを確立している場合は、接続タイプとして **Private connection** を選択し、画像のハイライト部分をコピー＆ペーストします。
      
      ![cockroach_image 2](./_assets/cockroach_image 2.png)
      
    - **Database Name**：データベース名を入力します。例：`defaultdb`（Cockroach Cloud クラスター起動時のデフォルトデータベース）。
    - **Username and Password**：CockroachDB Cloud デプロイメント作成時に設定したユーザー名とパスワードを入力します。
    - **Enable TLS**：トグルスイッチをクリックして TLS 接続を有効にします。**SNI** テキストボックスにサーバーホストを入力します。

3. **Test** ボタンをクリックします。Cockroach Cloud サービスにアクセス可能であれば、コネクターが利用可能である旨のプロンプトが表示されます。

4. **New** ボタンをクリックして作成を完了します。

## ルールの作成

このセクションでは、EMQX プラットフォームコンソールを使って CockroachDB ルールを作成し、ルールにアクションを追加する方法を説明します。

1. ルールエリアの **New Rule** をクリックするか、作成したコネクターの **Actions** 列にある新規ルールアイコンをクリックします。
2. 使用する機能に基づいて **SQL Editor** でルールを設定します。クライアントが `sensor/#` トピックに温度と湿度のメッセージを送信したときにエンジンをトリガーするには、以下の SQL を使用できます。
   
    ```sql
    SELECT 
        payload.sensor_id AS sensor_id,
        payload.temperature AS temperature,
        payload.humidity AS humidity,
        payload.timestamp AS timestamp
    FROM "sensor/#"
    ```
    
3. **Next** をクリックしてアクションを追加します。
4. **Connector** ドロップダウンボックスから先ほど作成したコネクターを選択します。
5. SQL テンプレートに以下のコマンドを入力します（[ルールエンジン](./rules.md) を使用して、入力 SQL 文中の文字列を適切にエスケープし、SQL インジェクション攻撃を防止してください）：
   
    ```sql
    INSERT INTO public.sensor_data(
        id, 
        sensor_id, 
        temperature, 
        humidity, 
        "timestamp", 
        created_at
    )
    VALUES (
        DEFAULT,
        ${sensor_id},
        ${temperature},
        ${humidity},
        TO_TIMESTAMP(${timestamp} / 1000),
        DEFAULT
    );
    ```
    
6. 詳細設定（任意）を行います。
7. **Confirm** ボタンをクリックしてルール作成を完了します。
8. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合設定の一連の作業を完了します。

## ルールのテスト

温度と湿度のデータ報告をシミュレートするために [MQTTX](https://mqttx.app/) の使用を推奨しますが、他のクライアントでも可能です。

1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
    - トピック：`sensor/#`
    - クライアント ID：`test_client`
    - ペイロード：
      
        ```json
        {
          "sensor_id": "sensor_001",
          "temperature": 25.5,
          "humidity": 60.0,
          "timestamp": 1674392048000
        }
        ```
    
2. 送信ボタンをクリックしてメッセージを送信します。Cockroach Cloud サーバーの `defaultdb` データベース内の `sensor_data` テーブルにエントリが挿入されているはずです。
