# MQTTデータをClickHouseに取り込む

[ClickHouse](https://clickhouse.com/)は、高性能なカラム指向のSQLデータベース管理システム（DBMS）であり、オンライン分析処理（OLAP）に適しています。大量のデータを低レイテンシで処理・分析することに優れており、優れたクエリ性能、柔軟なデータモデル、スケーラブルな分散アーキテクチャを備えているため、さまざまなデータ分析シナリオに適しています。EMQXプラットフォームはClickHouseとの統合をサポートしており、MQTTメッセージおよびイベントデータをClickHouseに取り込んで、さらなる分析や処理を行うことが可能です。

## 動作概要

ClickHouseとのデータ統合は、EMQXプラットフォームに標準搭載された機能であり、MQTTのリアルタイムデータキャプチャと転送機能をClickHouseの強力なデータ処理機能と組み合わせることを目的としています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからClickHouseへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXプラットフォームとClickHouse間のデータ統合の典型的なアーキテクチャを示しています。

![EMQXプラットフォームとClickHouseの統合](./_assets/data_integration_clickhouse.jpg)

MQTTデータをClickHouseに取り込む流れは以下の通りです：

1. **メッセージのパブリッシュと受信**：産業用IoTデバイスはMQTTプロトコルを介してEMQXプラットフォームに正常に接続し、機械、センサー、製品ラインの稼働状態、計測値、またはトリガーされたイベントに基づくリアルタイムMQTTデータをEMQXプラットフォームにパブリッシュします。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージが到着すると、ルールエンジンを通過し、EMQXプラットフォームで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをClickHouseにルーティングするかを決定します。ペイロードの変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
3. **ClickHouseへのデータ取り込み**：ルールエンジンがClickHouseへの保存対象のメッセージを特定すると、メッセージをClickHouseに転送するアクションをトリガーします。処理済みデータはClickHouseデータベースのコレクションにシームレスに書き込まれます。
4. **データの保存と活用**：データがClickHouseに保存されることで、企業はそのクエリ機能を活用してさまざまなユースケースに対応できます。例えば、物流やサプライチェーン管理分野では、GPSトラッカー、温度センサー、在庫管理システムなどのIoTデバイスからのデータをリアルタイムで監視・分析し、追跡、ルート最適化、需要予測、効率的な在庫管理に役立てることができます。

## 特長と利点

ClickHouseとのデータ統合は、効率的なデータ転送、保存、活用を実現するための多彩な特長と利点を提供します。

- **リアルタイムデータストリーミング**：EMQXプラットフォームはリアルタイムデータストリームの処理に最適化されており、ソースシステムからClickHouseへの効率的かつ信頼性の高いデータ転送を保証します。これにより、即時の洞察とアクションが必要なユースケースに最適です。
- **高性能かつスケーラブル**：EMQXプラットフォームの分散アーキテクチャとClickHouseのカラムナストレージ形式により、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫した性能と応答性を維持します。
- **柔軟なデータ変換**：EMQXプラットフォームは強力なSQLベースのルールエンジンを提供し、ClickHouseに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様な変換機能をサポートし、ニーズに応じてデータを整形可能です。
- **簡単なデプロイと管理**：EMQXプラットフォームはデータソースの設定、前処理ルール、ClickHouse保存設定のためのユーザーフレンドリーなインターフェースを提供し、データ統合プロセスのセットアップと運用管理を簡素化します。
- **高度な分析機能**：ClickHouseの強力なSQLクエリ言語と複雑な分析関数のサポートにより、IoTデータから価値ある洞察を得られ、予測分析や異常検知などに活用できます。

## はじめる前に

このセクションでは、EMQXプラットフォームコンソールでClickHouseデータ統合を作成する前に必要な準備について説明します。

### 前提条件

- [データ統合](./introduction.md)に関する知識
- データ統合の[ルール](./rules.md)に関する知識

### ネットワークの設定

<!--@include: ./network-setting.md-->

### ClickHouseサーバーの起動

ClickHouseサーバーは、[Docker](https://www.docker.com/)を使う方法とClickHouse Cloudを使う方法のいずれかで起動し、データベースを作成できます。

#### Dockerを使ってClickHouseサーバーを起動する

1. 以下の初期化SQL文を含む`init.sql`ファイルを作成します。このファイルはコンテナ起動時にデータベースを初期化するために使用されます。

   ```bash
   cat >init.sql <<SQL_INIT
   CREATE DATABASE IF NOT EXISTS mqtt_data;
   CREATE TABLE IF NOT EXISTS mqtt_data.messages (
      data String,
      arrived TIMESTAMP
   ) ENGINE = MergeTree()
   ORDER BY arrived;
   SQL_INIT
   ```

2. 以下のコマンドでClickHouseサーバーを起動します。このコマンドはデータベース名、ポート番号、ユーザー名、パスワードを指定し、カレントディレクトリの`init.sql`ファイルをDockerのディレクトリにマウントします。

   ```bash
   docker run \
   --rm \
   -e CLICKHOUSE_DB=mqtt_data \
   -e CLICKHOUSE_USER=emqx \
   -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \
   -e CLICKHOUSE_PASSWORD=public \
   -p 18123:8123 \
   -p 19000:9000 \
   --ulimit nofile=262144:262144 \
   -v $pwd/init.sql:/docker-entrypoint-initdb.d/init.sql \
   clickhouse/clickhouse-server
   ```

DockerでのClickHouseの実行に関する詳細は[dockerhub](https://hub.docker.com/r/clickhouse/clickhouse-server)をご参照ください。

#### ClickHouse Cloudを使ってClickHouseサーバーを起動する

1. https://clickhouse.cloud/ にアクセスし、サインアップしてサービスを作成します。
2. 公式ドキュメントの[ClickHouse Cloud - クイックスタート](https://clickhouse.com/docs/en/cloud-quick-start)を参照して、ClickHouseの使い方を学びます。
3. ClickHouse Cloudを作成後、**Services**ページで該当サービスをクリックし、SQLコンソールを開きます。
4. SQLコンソールの左側で**Queries**をクリックし、**+New query**をクリックします。
5. 以下のSQLを実行して`mqtt_data`データベースを作成します。

   ```bash
   CREATE DATABASE IF NOT EXISTS mqtt_data;
   CREATE TABLE IF NOT EXISTS mqtt_data.messages (
      data String,
      arrived TIMESTAMP
   ) ENGINE = MergeTree()
   ORDER BY arrived;
   ```

## コネクターの作成

データ統合ルールを作成する前に、ClickHouseサーバーにアクセスするためのClickHouseコネクターを作成する必要があります。

1. デプロイメントに移動し、左ナビゲーションメニューから**データ統合**をクリックします。初めてコネクターを作成する場合は、**データ永続化**カテゴリの下にある**ClickHouse**を選択します。すでにコネクターを作成済みの場合は、**新しいコネクター**を選択し、同じく**データ永続化**カテゴリの**ClickHouse**を選択します。
2. **コネクター名**はシステムが自動的に生成します。
3. 接続情報を入力します：

   - **コネクター名**：英数字の組み合わせで名前を入力します（例：`my_clickhouse`）。
   - **サーバーURL**：`http://{host}:{port}`
   - **データベース名**：`mqtt_data`
   - **ユーザー名**：`emqx`
   - **パスワード**：`public`
   - ビジネスニーズに応じて詳細設定を行います（任意）。
4. **テスト**ボタンをクリックし、ClickHouseサービスにアクセス可能であれば「コネクター利用可能」のメッセージが表示されます。
5. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

このセクションでは、EMQXプラットフォームコンソールを使ってClickHouseルールを作成し、ルールにアクションを追加する方法を説明します。

1. ルールエリアで**新しいルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。
2. 使用する機能に基づいて**SQLエディター**でルールを設定します。例えば、クライアントが`temp_hum/emqx`トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするには、以下のSQLを使用します。

   ```sql
     SELECT
      timestamp div 1000 as up_timestamp,
      clientid as client_id,
      payload
     FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は**SQL例**をクリックし、**テスト有効化**を使ってSQLルールの学習とテストを行うことをおすすめします。

   :::

3. **次へ**をクリックしてアクションを追加します。
4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。
5. **バッチ値区切り文字**：複数の入力項目を区別するために、この例ではデフォルトの`,`をそのまま使用します。

   ※デフォルトはカンマ（`,`）で、VALUES形式に適しています。その他の区切り文字も使用可能です。詳細は[ClickHouseのデータ形式](https://clickhouse.com/docs/en/sql-reference/statements/insert-into)をご参照ください。

6. SQLテンプレートに以下のコマンドを入力します（[ルールエンジン](./rules.md)を使い、SQLインジェクション攻撃を防ぐために入力SQL内の文字列が適切にエスケープされていることを確認してください）。

   ```sql
   INSERT INTO messages(data, arrived) VALUES ('${payload}', ${up_timestamp})
   ```

   ここで`${payload}`と`${up_timestamp}`はそれぞれメッセージ内容とタイムスタンプを表し、後でルール内でメッセージ転送時に設定されます。EMQXプラットフォームは転送前にこれらを対応する内容に置き換えます。

   SQLテンプレート内でプレースホルダー変数が未定義の場合は、**SQLテンプレート**上部の**未定義変数をNULLとして扱う**スイッチを切り替えてルールエンジンの動作を設定できます。

   - **無効**（デフォルト）：ルールエンジンは文字列`undefined`をデータベースに挿入します。
   - **有効**：変数が未定義の場合、ルールエンジンは`NULL`を挿入します。

     ::: tip

     可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。

     :::

7. 詳細設定（任意）：[高度な設定](#advanced-configurations)を参照してください。
8. **確認**ボタンをクリックしてルール作成を完了します。
9. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合の設定チェーンを完了します。

## ルールのテスト

温度・湿度データの報告をシミュレートするために[MQTTX](https://mqttx.app/)の使用を推奨しますが、他のクライアントでも可能です。

1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`
   - クライアントID：`test_client`
   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. **パブリッシュ**をクリックしてメッセージを送信します。ClickHouseサーバーの`mqtt_data`データベース内の`messages`テーブルにエントリが挿入されているはずです。以下のコマンドをターミナルで実行して確認できます。

   ```bash
   curl -u emqx:public -X POST -d "SELECT * FROM mqtt_data.messages" http://{host}:18123
   ```

   正常に動作していれば、以下のような出力が得られます（タイムスタンプは異なります）。

   ```bash
   {\n  "temp": "27.5",\n  "hum": "41.8"\n}        2024-03-27 09:35:11
   ```

## 高度な設定

このセクションでは、EMQXプラットフォームのClickHouseコネクターで利用可能な高度な設定オプションについて詳述します。コンソールでコネクターを設定する際、**詳細設定**に移動して以下のパラメーターをニーズに合わせて調整してください。

| **項目**                  | **説明**                                                                                                                  | **推奨値**          |
| ------------------------- | ------------------------------------------------------------------------------------------------------------------------- | ------------------- |
| **接続プールサイズ**      | ClickHouseサービスとの接続プールで維持できる同時接続数を指定します。この設定はEMQXプラットフォームとClickHouse間のアクティブな接続数を制御し、アプリケーションのスケーラビリティと性能管理に役立ちます。<br/>**注意**：適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションのワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の原因となります。 | `8`                 |
| **ClickHouseタイムアウト** | ClickHouseサーバーへの接続確立時にコネクターが待機する最大時間（秒）を指定します。<br/>**注意**：適切なタイムアウト設定はシステム性能とリソース利用のバランスを取るために重要です。ネットワーク状況に応じて最適値をテストしてください。 | `15`                |
| **起動タイムアウト**      | 自動起動されたリソースが正常状態になるまでコネクターが待機する最大時間（秒）を指定します。この設定により、ClickHouseのデータベースインスタンスなどのリソースが完全に稼働し、データ処理準備が整うまで操作を進めないようにします。 | `5`                 |
| **バッファプールサイズ**  | EMQXプラットフォームとClickHouse間の出口方向（egress）データフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前に一時的にデータを保持・処理します。出口方向のみを扱うブリッジに適した設定であり、入口方向（ingress）のみのブリッジでは`0`に設定可能です。 | `16`                |
| **リクエストTTL**         | バッファに入ったリクエストが有効とみなされる最大時間（秒）を指定します。リクエストがTTLを超えてバッファに滞留するか、送信後にClickHouseからの応答やアックが得られない場合、そのリクエストは期限切れとみなされます。 | `45`                |
| **ヘルスチェック間隔**    | コネクターがClickHouseへの接続状態を自動的にチェックする間隔（秒）を指定します。 | `15`                |
| **最大バッファキューサイズ** | ClickHouseコネクターの各バッファワーカーがバッファリングできる最大バイト数を指定します。バッファワーカーはClickHouseへのデータ送信前にデータを一時的に保持し、データフローの効率化を図ります。システム性能やデータ転送要件に応じて調整してください。 | `256`               |
| **バッチサイズ**          | EMQXプラットフォームからClickHouseへ一度に転送可能なデータバッチの最大サイズを指定します。サイズを調整することで、データ転送の効率と性能を最適化できます。<br />「最大バッチサイズ」が`1`に設定されている場合、データレコードはバッチ化されず個別に送信されます。 | `1`                 |
| **クエリモード**          | メッセージ転送要件に応じて`非同期（asynchronous）`または`同期（synchronous）`のクエリモードを選択できます。非同期モードでは、ClickHouseへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしません。ただし、クライアントがClickHouseへの書き込み完了前にメッセージを受信する可能性があります。 | `Async`             |
| **インフライトウィンドウ** | 「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。この設定は、ClickHouseと通信中に同時に存在可能なインフライトクエリの最大数を制御します。<br/>**クエリモード**が`async`の場合、同一MQTTクライアントからのメッセージを厳密な順序で処理する必要がある場合、この値を`1`に設定してください。 | `100`               |

## さらに詳しく

以下のリンクから詳細情報をご覧いただけます。

**ブログ**：

- [EMQX + ClickHouseによるIoTデータ収集と分析の実装](https://www.emqx.com/en/blog/emqx-and-clickhouse-for-iot-data-access-and-analysis)
- [MQTTからClickHouseへの統合：リアルタイムIoTデータ分析を加速](https://www.emqx.com/en/blog/mqtt-to-clickhouse-integration)
