# MQTTデータをCouchbaseに取り込む

[Couchbase](https://couchbase.com/)は、リレーショナルデータベース（SQLやACIDトランザクションなど）の利点とJSONの柔軟性を兼ね備えた多用途の分散データベースです。Couchbaseのアーキテクチャは高性能かつスケーラブルであり、ユーザープロファイル、動的な製品カタログ、生成AIアプリケーション、ベクトル検索、キャッシュなど、さまざまな業界で広く利用されています。

本ページでは、EMQX PlatformとCouchbaseの統合について包括的に紹介し、データ統合の作成および検証に関する実践的なガイダンスを提供します。

## 動作概要

Couchbaseデータ統合はEMQX Platformに標準搭載された機能であり、MQTTのリアルタイムデータ取得・送信機能とCouchbaseの強力なデータ処理機能を組み合わせることを目的としています。組み込みの[ルールエンジン](https://docs.emqx.com/en/emqx/latest/data-integration/rules.html)コンポーネントを通じて、EMQX PlatformからCouchbaseへのデータ取り込みを複雑なコーディングなしで簡素化します。

以下の図は、EMQX PlatformとCouchbaseのデータ統合における典型的なアーキテクチャを示しています。

![EMQX Platform Couchbase Data Integration](./_assets/data_integration_couchbase.png)

MQTTデータをCouchbaseに取り込むワークフローは以下の通りです。

1. **メッセージのパブリッシュと受信**：産業用IoTデバイスはMQTTプロトコルを介してEMQX Platformに正常に接続し、機械やセンサー、生産ラインの稼働状況や計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQX Platformにパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジンでのマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージが到着すると、EMQX Platformに定義されたルールに基づきルールエンジンで処理されます。ルールは事前定義された条件に基づいて、どのメッセージをCouchbaseにルーティングするかを決定します。ペイロードの変換を指定するルールがある場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などの適切な処理が適用されます。
3. **Couchbaseへのデータ取り込み**：ルールエンジンがCouchbaseに保存すべきメッセージを特定すると、そのメッセージをCouchbaseに転送するアクションをトリガーします。処理済みデータはシームレスにCouchbaseデータベースのデータセットに書き込まれます。
4. **データの保存と活用**：Couchbaseに保存された後、企業は強力なクエリ機能を活用してさまざまなユースケースを支援できます。例えば、動的な製品カタログのシナリオでは、Couchbaseを用いて製品情報の効率的な管理・取得、リアルタイムの在庫更新、顧客へのパーソナライズされた推奨を実現し、購買体験の向上と売上増加に貢献します。

## 特長と利点

Couchbaseデータ統合の特長と利点は以下の通りです。

- **リアルタイムデータストリーム**：EMQX Platformはリアルタイムデータストリームの処理に最適化されており、ソースシステムからCouchbaseへの効率的かつ信頼性の高いデータ転送を実現します。これにより、即時の洞察やアクションが必要なユースケースに最適なリアルタイム分析が可能です。
- **高性能かつスケーラブル**：EMQXの分散アーキテクチャとCouchbaseのカラムナストレージ形式により、データ量の増加に応じてシームレスにスケールアウト可能です。大量データの処理時でも一貫したパフォーマンスと応答性を維持します。
- **柔軟なデータ変換**：EMQX Platformは強力なSQLベースのルールエンジンを提供し、Couchbaseに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、拡充など多様なデータ変換機能をサポートし、ニーズに応じたデータ整形を支援します。
- **簡単なデプロイと管理**：EMQX Platformはデータソースの設定、前処理ルール、Couchbase保存設定をユーザーフレンドリーなインターフェースで提供し、データ統合のセットアップと運用管理を簡素化します。
- **高度な分析機能**：Couchbaseの強力なSQLベースのクエリ言語と複雑な分析機能のサポートにより、IoTデータから価値ある洞察を得ることができ、予測分析や異常検知などを促進します。

## はじめる前に

このセクションでは、EMQX PlatformでCouchbaseデータ統合を作成する前の準備事項を説明します。

### 前提条件

- [データ統合](./introduction.md)の理解
- [ルール](./rules.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Couchbaseのインストール

CouchbaseはDockerでのインストール、またはCouchbase Cloudでのサービス作成を選択できます。

#### DockerでのCouchbaseインストール

Docker上でCouchbaseを実行する方法は、[公式ドキュメント](https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html)をご参照ください。

1. 以下のコマンドでCouchbaseサーバーを起動します。  
   サーバーは以下のポートを開放している必要があります：8093（接続およびデータ挿入用）、8091（Web UIアクセス用）。

    ```bash
    docker run -t --name db -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase/server:enterprise-7.2.0
    ```

    このコマンドを実行すると、DockerがCouchbaseサーバーをダウンロードおよびインストールします。Docker仮想環境内でCouchbaseサーバーが起動すると、以下のメッセージが表示されます。

    ```bash
    Starting Couchbase Server -- Web UI available at http://<ip>:8091
    and logs available in /opt/couchbase/var/lib/couchbase/logs
    ```

2. ブラウザで http://x.x.x.x:8091 にアクセスし、Couchbase Webコンソールを開きます。

3. **Setup New Cluster**をクリックし、クラスター名を入力します。開始を簡単にするために、管理者ユーザー名とパスワードをそれぞれ`admin`と`password`に設定します。

    利用規約に同意し、**Finish with Defaults**をクリックしてデフォルト値で設定を完了します。

4. 設定情報を入力後、右下の**Save & Finish**ボタンをクリックします。これにより設定が反映され、Couchbase Webコンソールのダッシュボードが開きます。

5. 左側のナビゲーションパネルで**Buckets**を選択し、**ADD BUCKET**ボタンをクリックしてバケット名（例：`emqx`）を入力し、**Create**をクリックしてバケットを作成します。

6. デフォルトコレクションに対してプライマリインデックスを作成します。

   ```bash
   docker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "create primary index on default:emqx._default._default;"
   ```

#### Couchbase Cloudでのサービス作成

1. [Couchbase Cloud](https://cloud.couchbase.com/sign-in)にログインします。

2. Couchbase UIを開き、Operationalページで**Create Cluster**をクリックし、プロジェクトを選択します。

3. クラスター作成ページでクラスタータイプを選択し、クラスター名を入力、クラウドプロバイダーを選択し、その他のオプションはデフォルトのままにして続行し、クラスターを作成します。

4. クラスター作成後、**Home**ページでクラスター名をクリックして管理画面に入り、**Data Tools**ページで**Bucket**、**Scope**、**Collection**を作成します。

5. **Connect**をクリックし、**Public Connection String**を確認して接続用に控えます。

6. **Cluster Access**にて、認証用のCluster access nameとPasswordを入力し、**Bucket-Level Access**で4で作成したBucketに適切な権限を付与します。

7. **Allowed IP Addresses**に移動し、**Add Allowed IP**をクリックしてIPホワイトリストを追加します。

これでCouchbase Cloudインスタンスの作成が完了です。

## Couchbaseコネクターの作成

データ統合のルールを作成する前に、CouchbaseサーバーにアクセスするためのCouchbaseコネクターを作成する必要があります。

1. デプロイメントメニューで**Data Integration**を選択し、データ永続化サービスカテゴリの中からCouchbaseサービスを選択します。すでに他のコネクターを作成している場合は、**New Connector**をクリックし、同様にCouchbaseサービスを選択します。

2. **Connector Name**はシステムが自動的に生成します。

3. 接続情報を入力します。

   - **Server Address**：サーバーのIPアドレスとポートを入力します。Couchbase Cloudインスタンスの場合は`couchbases://`のプレフィックスを除去してください。デフォルトポートは`18093`で、TLSが有効である必要があります。
   - **Authentication Information**：Couchbaseのインストールに応じて**Username**と**Password**を設定します。
   - **Advanced Settings（任意）**：[詳細設定](https://docs.emqx.com/en/emqx/latest/data-integration/data-bridge-couchbase.html#advanced-configuration)を参照してください。

4. **Test Connection**ボタンをクリックし、Couchbaseサービスに正常にアクセスできると成功メッセージが表示されます。

5. **Create**ボタンをクリックしてコネクターの作成を完了します。

## ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータをCouchbaseに転送するアクションを追加します。

1. コネクター一覧の**Actions**列にある新規ルールアイコンをクリックするか、**Rules List**で**New Rule**をクリックして**Create New Rule**作成画面に入ります。

2. SQLエディターにルールのSQLを入力します。例えば、クライアントが`temp_hum/emqx`トピックに温度と湿度のメッセージを送信した際にルールエンジンをトリガーするには、以下のSQLを入力します。

   ```sql
    SELECT 
    timestamp,
    clientid, 
    payload.temp as temp, 
    payload.hum as hum

    FROM
    "temp_hum/emqx"
   ```

   ::: tip

   SQLに不慣れな場合は、**SQL Examples**や**Enable Test**をクリックしてルールSQLの学習や結果のテストが可能です。

   :::

3. **Next**をクリックしてアクション作成を開始します。

4. **Use Connector**のドロップダウンから、先に作成したコネクターを選択します。

5. SQLテンプレートに以下のコマンドを入力します。

   ```bash
    insert into emqx(key, value) values(${.clientid}, {"Timestamp": ${.timestamp}, "Temp": ${.temp}, "Hum": ${.hum}})
   ```

6. 必要に応じて詳細設定オプションを構成します（任意）。詳細は[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-Couchbase.html#advanced-settings)を参照してください。

7. **Confirm**ボタンをクリックしてアクション設定を完了します。

8. 成功メッセージのポップアップで**Return to Rule List**をクリックし、データ統合設定を完了します。

## ルールのテスト

温度・湿度データの送信シミュレーションには[MQTTX](https://mqttx.app/)の利用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. パブリッシュボタンをクリックしてメッセージを送信します。Couchbaseサーバーの`emqx`バケットにエントリーが挿入されているはずです。以下のコマンドをターミナルで実行して確認できます。

   ```bash
     docker exec -t db /opt/couchbase/bin/cbq -u admin -p password -engine=http://127.0.0.1:8091/ -script "SELECT * FROM emqx._default._default LIMIT 5;"
   ```

   正常に動作していれば、以下のような出力が得られます（`requestID`やメトリクスは異なる場合があります）。

   ```bash
   {
       "requestID": "858b9a9a-986e-467f-b9ed-9d585bce43be",
       "signature": {
           "*": "*"
       },
       "results": [
       {
           "_default": {
               "Hum": "41.8",
               "Temp": "27.5",
               "Timestamp": 1727322935145
           }
       }
       ],
       "status": "success",
       "metrics": {
           "elapsedTime": "2.662873ms",
           "executionTime": "2.590901ms",
           "resultCount": 1,
           "resultSize": 133,
           "serviceLoad": 12
       }
   }
   ```

3. EMQX Platformコンソールでランタイムデータを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに紐づくすべてのアクションの実行統計ページが表示されます。
