Skip to content

ClickHouseへのMQTTデータ取り込み

ClickHouse は、高性能なカラム指向のSQLデータベース管理システム(DBMS)であり、オンライン分析処理(OLAP)に優れています。大量データの低レイテンシ処理と分析に優れ、優れたクエリ性能、柔軟なデータモデル、スケーラブルな分散アーキテクチャを備えているため、さまざまなデータ分析シナリオに適しています。EMQX CloudはClickHouseとの統合をサポートしており、MQTTメッセージおよびイベントデータをClickHouseに取り込んで、さらなる分析や処理を行うことが可能です。

動作概要

ClickHouseデータ統合は、EMQX Cloudに標準搭載された機能であり、MQTTのリアルタイムデータキャプチャと伝送能力をClickHouseの強力なデータ処理機能と組み合わせることを目的としています。組み込みのルールエンジンコンポーネントにより、EMQX CloudからClickHouseへのデータ取り込みプロセスを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQX CloudとClickHouse間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud-ClickHouse Integration

MQTTデータをClickHouseに取り込む流れは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQX Cloudに接続し、機械、センサー、製品ラインの稼働状態、計測値、またはトリガーされたイベントに基づくリアルタイムMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータ処理:メッセージが到着すると、ルールエンジンを通過し、EMQX Cloudで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをClickHouseにルーティングするかを決定します。ペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、ペイロードへの追加コンテキストの付加などが適用されます。
  3. ClickHouseへのデータ取り込み:ルールエンジンがClickHouseへの保存対象メッセージを特定すると、メッセージの転送アクションをトリガーします。処理済みデータはClickHouseデータベースのコレクションにシームレスに書き込まれます。
  4. データの保存と活用:データがClickHouseに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、物流やサプライチェーン管理分野では、GPSトラッカー、温度センサー、在庫管理システムなどのIoTデバイスからのデータをリアルタイムで監視・分析し、追跡、ルート最適化、需要予測、効率的な在庫管理に役立てることができます。

特長と利点

ClickHouseとのデータ統合は、効率的なデータ伝送、保存、活用を実現するために以下の特長と利点を提供します。

  • リアルタイムデータストリーミング:EMQX Cloudはリアルタイムデータストリームの処理に特化しており、ソースシステムからClickHouseへの効率的かつ信頼性の高いデータ伝送を実現します。即時の洞察とアクションが必要なユースケースに最適です。
  • 高性能とスケーラビリティ:EMQX Cloudの分散アーキテクチャとClickHouseのカラムナーストレージ形式により、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫した性能と応答性を維持します。
  • 柔軟なデータ変換:EMQX Cloudは強力なSQLベースのルールエンジンを提供し、ClickHouseに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、エンリッチメントなど多様なデータ変換機能をサポートし、ニーズに応じてデータを整形できます。
  • 簡単なデプロイと管理:EMQX Cloudはデータソースの設定、前処理ルール、ClickHouse保存設定をユーザーフレンドリーなインターフェースで提供し、データ統合プロセスのセットアップと継続的な管理を容易にします。
  • 高度な分析:ClickHouseの強力なSQLクエリ言語と複雑な分析関数のサポートにより、IoTデータから価値ある洞察を得られ、予測分析や異常検知などに活用できます。

はじめる前に

このセクションでは、EMQX CloudコンソールでClickHouseデータ統合を作成する前に必要な準備について説明します。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

ClickHouseサーバーの起動

ClickHouseサーバーはDockerを使うか、ClickHouse Cloudを利用して起動し、データベースを作成できます。

Dockerを使ったClickHouseサーバーの起動

  1. 以下の初期化SQL文を含むinit.sqlファイルを作成します。このファイルはコンテナ起動時にデータベースを初期化するために使用します。

    bash
    cat >init.sql <<SQL_INIT
    CREATE DATABASE IF NOT EXISTS mqtt_data;
    CREATE TABLE IF NOT EXISTS mqtt_data.messages (
       data String,
       arrived TIMESTAMP
    ) ENGINE = MergeTree()
    ORDER BY arrived;
    SQL_INIT
  2. 以下のコマンドでClickHouseサーバーを起動します。このコマンドはデータベース名、ポート番号、ユーザー名、パスワードを指定し、カレントディレクトリのinit.sqlファイルをコンテナ内にマウントします。

    bash
    docker run \
    --rm \
    -e CLICKHOUSE_DB=mqtt_data \
    -e CLICKHOUSE_USER=emqx \
    -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \
    -e CLICKHOUSE_PASSWORD=public \
    -p 18123:8123 \
    -p 19000:9000 \
    --ulimit nofile=262144:262144 \
    -v $pwd/init.sql:/docker-entrypoint-initdb.d/init.sql \
    clickhouse/clickhouse-server

DockerでのClickHouseの実行に関する詳細はdockerhubをご参照ください。

ClickHouse Cloudを使ったサーバー起動

  1. https://clickhouse.cloud/ にアクセスし、サービスを作成するためにサインアップします。

  2. 公式ドキュメントのClickHouse Cloud - クイックスタートでClickHouseの使い方を学びます。

  3. ClickHouse Cloudを作成後、Servicesページに移動し、対象のサービスをクリックしてSQLコンソールを開きます。

  4. SQLコンソールの左側でQueriesをクリックし、+New queryをクリックします。

  5. 以下のSQLでmqtt_dataデータベースを作成します。

    bash
    CREATE DATABASE IF NOT EXISTS mqtt_data;
    CREATE TABLE IF NOT EXISTS mqtt_data.messages (
       data String,
       arrived TIMESTAMP
    ) ENGINE = MergeTree()
    ORDER BY arrived;

コネクターの作成

データ統合ルールを作成する前に、ClickHouseサーバーにアクセスするためのClickHouseコネクターを作成する必要があります。

  1. ご自身のデプロイメントに移動し、左ナビゲーションメニューからData Integrationをクリックします。初めてコネクターを作成する場合は、Data Persistenceカテゴリの下にあるClickHouseを選択します。すでにコネクターを作成済みの場合は、New Connectorを選択し、同じくClickHouseを選択します。

  2. Connector Nameはシステムが自動生成します。

  3. 接続情報を入力します。

    • Connector name:コネクター名を入力します。大文字・小文字の英数字の組み合わせが推奨されます。例:my_clickhouse
    • Server URLhttp://{host}:{port}
    • Database Namemqtt_data
    • Usernameemqx
    • Passwordpublic
    • ビジネスニーズに応じて詳細設定を行います(任意)。
  4. Testボタンをクリックします。ClickHouseサービスにアクセス可能であれば、connector availableのメッセージが表示されます。

  5. Newボタンをクリックして作成を完了します。

ルールの作成

このセクションでは、EMQX CloudコンソールでClickHouseルールを作成し、ルールにアクションを追加する方法を説明します。

  1. ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づいてSQL Editorでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信したときにエンジンをトリガーするSQLを作成します。以下のSQLを使用します。

    sql
      SELECT
       timestamp div 1000 as up_timestamp,
       clientid as client_id,
       payload
      FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。

  3. Nextをクリックしてアクションを追加します。

  4. Connectorドロップダウンから先ほど作成したコネクターを選択します。

  5. Batch Value Separatorを設定します。複数の入力項目を区切るための区切り文字で、ここではデフォルトの,(カンマ)を使用します。

    注:デフォルトの,はVALUES形式に適しています。その他の区切り文字も使用可能です。詳細はClickHouseのデータフォーマットをご参照ください。

  6. SQLテンプレートに以下のコマンドを入力します(ルールエンジンを使用して、入力SQL文の文字列を適切にエスケープし、SQLインジェクション攻撃を防止してください)。

    sql
    INSERT INTO messages(data, arrived) VALUES ('${payload}', ${up_timestamp})

    ここで、${payload}${up_timestamp}はメッセージ内容とタイムスタンプを表し、後でメッセージ転送ルールで設定されます。EMQX Cloudは転送前にこれらを対応する内容に置き換えます。

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上のUndefined Vars as Nullスイッチでルールエンジンの動作を切り替えられます。

    • Disabled(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • Enabled:変数が未定義の場合、ルールエンジンはNULLを挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効化は後方互換性確保のためのみ推奨されます。

  7. 詳細設定(任意):高度な設定を参照してください。

  8. Confirmボタンをクリックしてルール作成を完了します。

  9. Successful new ruleポップアップでBack to Rulesをクリックし、データ統合設定チェーンを完了します。

ルールのテスト

MQTTXを使って温度・湿度データの送信をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. Publishをクリックしてメッセージを送信します。ClickHouseサーバーのmqtt_dataデータベース内のmessagesテーブルにエントリが挿入されているはずです。以下のコマンドで確認できます。

    bash
    curl -u emqx:public -X POST -d "SELECT * FROM mqtt_data.messages" http://{host}:18123

    正常に動作していれば、以下のような出力が得られます(タイムスタンプは異なります)。

    bash
    {\n  "temp": "27.5",\n  "hum": "41.8"\n}        2024-03-27 09:35:11

高度な設定

このセクションでは、EMQX Cloud ClickHouseコネクターの高度な設定オプションについて詳しく説明します。コンソールでコネクターを設定する際、Advanced Settingsに移動し、以下のパラメータをニーズに合わせて調整してください。

項目説明推奨値
Connection Pool SizeClickHouseサービスとの接続プールで維持可能な同時接続数を指定します。システムのスケーラビリティと性能を管理するための設定です。
注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションの負荷に依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の原因となります。
8
Clickhouse TimeoutClickHouseサーバーへの接続確立を試みる際の最大待機時間(秒)を指定します。
注意:パフォーマンスとリソース使用のバランスを取るため、ネットワーク環境に応じて最適な値をテストしてください。
15
Start Timeout自動起動されたリソースが正常な状態になるまで待機する最大時間(秒)を指定します。これにより、ClickHouseのデータベースインスタンスなどが完全に稼働準備できるまで処理を進めません。5
Buffer Pool SizeEMQX CloudとClickHouse間のデータ送信(イグレス)を管理するバッファワーカーの数を指定します。これらのワーカーは送信前のデータを一時的に保持・処理します。イングレスのみのブリッジでは「0」に設定可能です。16
Request TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。TTLを超えたリクエストや、ClickHouseからの応答・アックが得られない場合、リクエストは期限切れとみなされます。45
Health Check IntervalClickHouseとの接続状態を自動的にチェックする間隔(秒)を指定します。15
Max Buffer Queue SizeClickHouseコネクターの各バッファワーカーが保持可能な最大バイト数を指定します。データ転送要件やシステム性能に応じて調整してください。256
Batch SizeEMQX CloudからClickHouseへ一度に転送可能なデータバッチの最大サイズを指定します。1に設定すると、データはバッチ化せず個別に送信されます。1
Query Modeメッセージ送信を最適化するために、asynchronous(非同期)またはsynchronous(同期)クエリモードを選択します。非同期モードでは、ClickHouseへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがClickHouseへの到達前にメッセージを受信する可能性があります。Async
Inflight WindowClickHouseとの通信で応答待ちの「インフライトクエリ」の最大数を制御します。
Query Modeasyncの場合、この設定は特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、値を1に設定してください。
100

さらに詳しく

以下のリンクから詳細情報をご覧いただけます。

ブログ: