# ClickHouse Cloud への MQTT データ取り込み

[ClickHouse Cloud](https://clickhouse.com/cloud) は、ClickHouse が公式に提供するフルマネージドのクラウドサービスです。ClickHouse の強力な機能を継承しつつ、データベースインフラのデプロイや運用の複雑さを排除しています。ClickHouse Cloud により、データベースインスタンスを迅速に立ち上げ、高可用性、自動スケーリング、組み込みのセキュリティを活用でき、従量課金制の料金体系によりデータ分析やビジネス成長に集中できます。

EMQX Cloud は ClickHouse Cloud との統合をサポートしており、MQTT メッセージやイベントデータを ClickHouse Cloud に取り込み、さらなる分析や処理を行うことが可能です。

## 動作概要

EMQX Cloud に ClickHouse Cloud を統合する機能は、MQTT のリアルタイムデータ取得・転送機能と ClickHouse Cloud の強力なデータ処理機能を融合させるための標準機能です。組み込みのルールエンジンを活用することで、EMQX Cloud から ClickHouse へのデータ取り込みプロセスを簡素化し、効率的なデータ保存と分析を実現し、複雑なコーディングを不要にします。

以下の図は、EMQX Cloud と ClickHouse Cloud 間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Cloud-ClickHouse Integration](./_assets/data_integration_clickhouse.jpg)

ClickHouse Cloud への MQTT データ取り込みは以下のように動作します：

1. **メッセージのパブリッシュと受信**：産業用 IoT デバイスは MQTT プロトコルを介して EMQX Cloud に正常に接続し、稼働状況や計測値、トリガーされたイベントに基づくリアルタイムデータをパブリッシュします。EMQX Cloud がこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。

2. **メッセージデータの処理**：メッセージ到着後、ルールエンジンを通過し、あらかじめ定義されたルールに基づいて ClickHouse Cloud へルーティングすべきメッセージが判別されます。ペイロードの変換（データ形式の変換、フィルタリング、追加コンテキストによる拡充など）が指定されている場合は、それらの変換処理が実行されます。

3. **ClickHouse Cloud へのデータ取り込み**：ルールエンジンが ClickHouse への保存対象メッセージを特定すると、これらのメッセージを ClickHouse Cloud に転送するアクションがトリガーされます。処理済みデータはシームレスに ClickHouse データベースに書き込まれます。

4. **データの保存と活用**：ClickHouse Cloud に安全に保存されたデータは、強力なクエリ機能を活用して様々な用途に利用可能です。例えば、物流やサプライチェーン管理では、GPS トラッカー、温度センサー、在庫管理システムからの IoT データを監視・分析し、リアルタイムトラッキング、ルート最適化、需要予測、効果的な在庫管理に役立てられます。

## 特長とメリット

ClickHouse Cloud との統合は、効率的なデータ転送、保存、活用を実現するために多くの特長とメリットを提供します：

- **リアルタイムデータストリーミング**：EMQX Cloud はリアルタイムデータストリームの処理に最適化されており、ソースシステムから ClickHouse Cloud への効率的かつ信頼性の高い転送を保証します。即時の洞察とアクションが求められるユースケースに最適です。

- **高性能かつスケーラブル**：EMQX Cloud の分散アーキテクチャと ClickHouse Cloud のカラムナーストレージ形式の組み合わせにより、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を維持します。

- **柔軟なデータ変換**：EMQX Cloud の強力な SQL ベースのルールエンジンにより、ClickHouse Cloud に保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、拡充など多様な変換が可能で、組織のニーズに合わせてデータを整形できます。

- **簡単なデプロイと管理**：EMQX Cloud はデータソースの設定、前処理ルールの作成、ClickHouse Cloud ストレージ設定の管理をユーザーフレンドリーなインターフェースで提供し、データ統合プロセスのセットアップと運用を簡素化します。

- **高度な分析機能**：ClickHouse Cloud の強力な SQL ベースのクエリ言語と複雑な分析関数のサポートにより、IoT データから価値あるインサイトを抽出できます。予測分析や異常検知なども可能です。

ClickHouse Cloud との統合により、強力なクラウドストレージと処理能力を活用しつつ、運用プロセスを簡素化した効率的かつ柔軟な IoT データアーキテクチャを構築でき、リアルタイムのデータ分析と洞察を実現します。

## はじめる前に

このセクションでは、EMQX Cloud コンソールで ClickHouse Cloud データ統合を作成する前に必要な準備について説明します。

### 前提条件

- [データ統合](./introduction.md) に関する知識
- データ統合の [ルール](./rules.md) に関する知識

### ClickHouse Cloud サービスの開始

1. [ClickHouse Cloud](https://clickhouse.cloud/) にアクセスし、サインアップしてサービスを作成します。
2. 公式ドキュメントの [ClickHouse Cloud - クイックスタート](https://clickhouse.com/docs/en/cloud-quick-start) で ClickHouse の使い方を学びます。
3. ClickHouse Cloud を作成後、**Services** ページに移動し、対象のサービスをクリックして SQL コンソールを開きます。
4. SQL コンソールの左側で **Queries** をクリックし、**+New query** をクリックします。
5. `mqtt_data` という名前のデータベースを作成します。
   ```bash
   CREATE DATABASE IF NOT EXISTS mqtt_data;
   CREATE TABLE IF NOT EXISTS mqtt_data.messages (
      data String,
      arrived TIMESTAMP
   ) ENGINE = MergeTree()
   ORDER BY arrived;
   ```
### ネットワーク設定

ClickHouse Cloud への接続前に、EMQX Cloud で PrivateLink を作成し、ClickHouse Cloud 側で PrivateLink エンドポイントサービスを設定して、EMQX のデプロイメントと ClickHouse Cloud サーバー間のプライベートネットワーク接続を確立する必要があります。

1. [EMQX Cloud コンソール](https://cloud-intl.emqx.com/console/) にログインし、対象のデプロイメントの概要ページに移動します。

2. **Network Management** に進み、**PrivateLink** セクションで **"+ Private Connection"** ボタンをクリックします。

3. プロンプトを確認して **Next Step** をクリックします。

4. ClickHouse Cloud プラットフォームにログインし、対象の ClickHouse Cloud サービスを開きます。

5. 左ナビゲーションメニューから **Settings** をクリックし、**Private endpoints** を探します。**Set up private endpoint** をクリックし、**Service name** と **DNS name** を確認して控えておきます。

  <img src="./_assets/data_integration_clickhousecloud_01.png" alt="ClickHouse Cloud のプライベートエンドポイント設定画面" style="zoom:67%;" />

6. EMQX Cloud に戻り、**Service name** をエンドポイントサービス名として入力し、**Create Private Connection** をクリックします。

7. デプロイメント詳細ページで接続状態が **Running** に変わるまで待ちます。**Endpoint ID** を控えておきます。

  ![data_integration_clickhousecloud_02.png](./_assets/data_integration_clickhousecloud_02.png)

8. 再度 ClickHouse Cloud プラットフォームに戻り、EMQX Cloud から控えた **Endpoint ID** を **Endpoint ID** 欄に入力し、説明（任意）を記入して **Create Endpoint** をクリックします。

9. 「success」メッセージが表示されるまで待ち、ClickHouse Cloud の **Private endpoints** セクションで新しく設定した Endpoint ID が表示されていることを確認します。これにより EMQX Cloud と ClickHouse Cloud 間の PrivateLink が正常に確立されたことが分かります。

## コネクターの作成

データ統合ルールを作成する前に、ClickHouse サーバーにアクセスするための ClickHouse コネクターを作成する必要があります。

1. デプロイメントに移動し、左ナビゲーションメニューから **Data Integration** をクリックします。初めてコネクターを作成する場合は、**Data Persistence** カテゴリの下にある **ClickHouse** を選択します。既にコネクターを作成済みの場合は、**New Connector** を選択し、続けて **Data Persistence** カテゴリの下の **ClickHouse** を選択します。

2. **New Connector** ページで以下の設定を行います：

   - **Connector Name**：システムが自動的にコネクター名を生成します。

   - **Server URL**：`https://` をプレフィックスとして付け、先に控えた ClickHouse Cloud のプライベートエンドポイントの DNS 名とポート番号 8443 を組み合わせて入力します（例：`https://{ClickHouse Cloud DNS name}:8443`）。

   - **Database Name**：`mqtt_data`

   - **Username と Password**：ClickHouse Cloud ページの左ナビゲーションメニューから **Connect** をクリックし、現在のユーザー名とパスワードを確認して、EMQX Cloud ページに入力します。

     ![data_integration_clickhousecloud_03](./_assets/data_integration_clickhousecloud_03.png)

   - ビジネス要件に応じて詳細設定を行います（任意）。

   ::: tip

   ここで入力する DNS 名はプライベートアドレスであり、Connect セクションに表示されるパブリック IP とは異なることに注意してください。

   :::

3. **Test** ボタンをクリックします。ClickHouse サービスにアクセス可能であれば、**connector available** のメッセージが返されます。

4. **New** ボタンをクリックして作成を完了します。

## ルールの作成

このセクションでは、EMQX Cloud コンソールで ClickHouse ルールを作成し、ルールにアクションを追加する方法を説明します。

1. ルールエリアの **New Rule** をクリックするか、作成したコネクターの **Actions** 列にある新規ルールアイコンをクリックします。

2. 使用する機能に基づいて **SQL Editor** でルールを設定します。ここでは、クライアントが `temp_hum/emqx` トピックに温度と湿度のメッセージを送信したときにエンジンをトリガーすることを目的としています。以下のような SQL を記述します：

   ```sql
     SELECT
      timestamp div 1000 as up_timestamp,
      clientid as client_id,
      payload
     FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の場合は、**SQL Examples** をクリックし、**Enable Test** を有効にして SQL ルールを学習・テストできます。

   :::

3. **Next** をクリックしてアクションを追加します。

4. **Connector** ドロップダウンから先ほど作成したコネクターを選択します。

5. **Batch Value Separator** を設定します。複数の入力項目を区別するため、ここではデフォルトの `,` をそのまま使用します。

   注意：デフォルトはカンマ `,` で、VALUES 形式に適しています。その他の区切り文字も使用可能です。詳細は [ClickHouse のデータフォーマット](https://clickhouse.com/docs/en/sql-reference/statements/insert-into) を参照してください。

6. SQL テンプレートに以下のコマンドを入力します（[ルールエンジン](./rules.md) を使用して、SQL 文中の文字列が適切にエスケープされ、SQL インジェクション攻撃を防止できるようにしてください）：

   ```sql
   INSERT INTO messages(data, arrived) VALUES ('${payload}', ${up_timestamp})
   ```

   ここで `${payload}` と `${up_timestamp}` はメッセージ内容とタイムスタンプを表し、後でメッセージ転送ルールで設定します。EMQX Cloud は転送前にこれらを対応する内容に置き換えます。

   SQL テンプレート内でプレースホルダー変数が未定義の場合、**SQL template** 上部の **Undefined Vars as Null** スイッチでルールエンジンの動作を切り替えられます：

   - **Disabled**（デフォルト）：ルールエンジンは文字列 `undefined` をデータベースに挿入します。

   - **Enabled**：変数が未定義の場合、ルールエンジンは `NULL` を挿入します。

     ::: tip

     可能な限りこのオプションは有効にすべきであり、無効化は後方互換性確保のためにのみ使用してください。

     :::

7. 詳細設定（任意）：[高度な設定](#advanced-configurations) を参照してください。

8. **Confirm** ボタンをクリックしてルール作成を完了します。

9. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/) を使って温湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。

1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - クライアント ID：`test_client`
   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. **Publish** をクリックしてメッセージを送信します。ClickHouse Cloud サーバーの `mqtt_data` データベース内の `messages` テーブルにエントリが挿入されているはずです。SQL コンソールで `mqtt_data` データベースの `messages` テーブルを確認して検証できます。

## 高度な設定

このセクションでは、EMQX Cloud ClickHouse コネクターの高度な設定オプションについて詳述します。コンソールでコネクターを設定する際、**Advanced Settings** に移動して以下のパラメータをビジネス要件に合わせて調整してください。

| **項目**                  | **説明**                                                                                         | **推奨値**           |
| ------------------------- | ------------------------------------------------------------------------------------------------ | --------------------- |
| **Connection Pool Size**  | ClickHouse サービスとの接続プールで同時に維持可能な接続数を指定します。この設定は EMQX Cloud と ClickHouse 間のアクティブ接続数を制御し、アプリケーションのスケーラビリティとパフォーマンスを管理します。<br/>**注意**：適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、ワークロードに依存します。大きすぎるとリソース枯渇の恐れがあり、小さすぎるとスループットが制限されます。 | `8`                   |
| **Clickhouse Timeout**    | ClickHouse サーバーへの接続確立を試みる際の最大待機時間（秒）を指定します。<br/>**注意**：システムパフォーマンスとリソース利用のバランスを取るため、ネットワーク条件を考慮して最適なタイムアウト値をテストすることが推奨されます。 | `15`                  |
| **Start Timeout**         | 自動起動されたリソースが正常状態になるまで待機する最大時間（秒）を指定します。これにより、ClickHouse のデータベースインスタンスなど接続先リソースが完全に稼働準備できるまで処理を進めないようにします。 | `5`                   |
| **Buffer Pool Size**      | EMQX Cloud と ClickHouse 間のイグレス（送信）データフローを管理するために割り当てるバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前の一時保存と処理を担当します。イングレス（受信）データのみを扱うブリッジの場合は「0」に設定可能です。 | `16`                  |
| **Request TTL**           | バッファに入ったリクエストが有効とみなされる最大期間（秒）を指定します。リクエストがこの TTL を超えてバッファに滞留するか、ClickHouse からの応答やアックが遅延した場合、リクエストは期限切れと判断されます。 | `45`                  |
| **Health Check Interval** | ClickHouse への接続状態を自動的に監視するヘルスチェックの間隔（秒）を指定します。 | `15`                  |
| **Max Buffer Queue Size** | ClickHouse コネクターの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータ送信前の一時保存を担当し、システム性能やデータ転送要件に応じて調整してください。 | `256`                 |
| **Batch Size**            | EMQX Cloud から ClickHouse へ一度に転送可能なデータバッチの最大サイズを指定します。サイズを調整することで転送効率やパフォーマンスを最適化できます。<br />「Max Batch Size」が「1」の場合、データはバッチ化されず個別に送信されます。 | `1`                   |
| **Query Mode**            | メッセージ転送要件に応じて `asynchronous`（非同期）または `synchronous`（同期）クエリモードを選択します。非同期モードでは ClickHouse への書き込みが MQTT メッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを ClickHouse への到着前に受信する可能性があります。 | `Async`               |
| **Inflight Window**       | 「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。ClickHouse との通信時に同時に存在可能なインフライトクエリの最大数を制御します。<br/>**Query Mode** が `async` の場合、この設定は特に重要です。同一 MQTT クライアントからのメッセージを厳密に順序処理したい場合は、値を 1 に設定してください。 | `100`                 |
