ClickHouseへのMQTTデータ取り込み
ClickHouse は、高性能なカラム指向のSQLデータベース管理システム(DBMS)であり、オンライン分析処理(OLAP)に優れています。大量データの低レイテンシ処理と分析に優れ、優れたクエリ性能、柔軟なデータモデル、スケーラブルな分散アーキテクチャを備えているため、さまざまなデータ分析シナリオに適しています。EMQX CloudはClickHouseとの統合をサポートしており、MQTTメッセージおよびイベントデータをClickHouseに取り込んで、さらなる分析や処理を行うことが可能です。
動作概要
ClickHouseデータ統合は、EMQX Cloudに標準搭載された機能であり、MQTTのリアルタイムデータキャプチャと伝送能力をClickHouseの強力なデータ処理機能と組み合わせることを目的としています。組み込みのルールエンジンコンポーネントにより、EMQX CloudからClickHouseへのデータ取り込みプロセスを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQX CloudとClickHouse間のデータ統合の典型的なアーキテクチャを示しています。

MQTTデータをClickHouseに取り込む流れは以下の通りです。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQX Cloudに接続し、機械、センサー、製品ラインの稼働状態、計測値、またはトリガーされたイベントに基づくリアルタイムMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータ処理:メッセージが到着すると、ルールエンジンを通過し、EMQX Cloudで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをClickHouseにルーティングするかを決定します。ペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、ペイロードへの追加コンテキストの付加などが適用されます。
- ClickHouseへのデータ取り込み:ルールエンジンがClickHouseへの保存対象メッセージを特定すると、メッセージの転送アクションをトリガーします。処理済みデータはClickHouseデータベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データがClickHouseに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、物流やサプライチェーン管理分野では、GPSトラッカー、温度センサー、在庫管理システムなどのIoTデバイスからのデータをリアルタイムで監視・分析し、追跡、ルート最適化、需要予測、効率的な在庫管理に役立てることができます。
特長と利点
ClickHouseとのデータ統合は、効率的なデータ伝送、保存、活用を実現するために以下の特長と利点を提供します。
- リアルタイムデータストリーミング:EMQX Cloudはリアルタイムデータストリームの処理に特化しており、ソースシステムからClickHouseへの効率的かつ信頼性の高いデータ伝送を実現します。即時の洞察とアクションが必要なユースケースに最適です。
- 高性能とスケーラビリティ:EMQX Cloudの分散アーキテクチャとClickHouseのカラムナーストレージ形式により、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫した性能と応答性を維持します。
- 柔軟なデータ変換:EMQX Cloudは強力なSQLベースのルールエンジンを提供し、ClickHouseに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、エンリッチメントなど多様なデータ変換機能をサポートし、ニーズに応じてデータを整形できます。
- 簡単なデプロイと管理:EMQX Cloudはデータソースの設定、前処理ルール、ClickHouse保存設定をユーザーフレンドリーなインターフェースで提供し、データ統合プロセスのセットアップと継続的な管理を容易にします。
- 高度な分析:ClickHouseの強力なSQLクエリ言語と複雑な分析関数のサポートにより、IoTデータから価値ある洞察を得られ、予測分析や異常検知などに活用できます。
はじめる前に
このセクションでは、EMQX CloudコンソールでClickHouseデータ統合を作成する前に必要な準備について説明します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
ClickHouseサーバーの起動
ClickHouseサーバーはDockerを使うか、ClickHouse Cloudを利用して起動し、データベースを作成できます。
Dockerを使ったClickHouseサーバーの起動
以下の初期化SQL文を含む
init.sqlファイルを作成します。このファイルはコンテナ起動時にデータベースを初期化するために使用します。bashcat >init.sql <<SQL_INIT CREATE DATABASE IF NOT EXISTS mqtt_data; CREATE TABLE IF NOT EXISTS mqtt_data.messages ( data String, arrived TIMESTAMP ) ENGINE = MergeTree() ORDER BY arrived; SQL_INIT以下のコマンドでClickHouseサーバーを起動します。このコマンドはデータベース名、ポート番号、ユーザー名、パスワードを指定し、カレントディレクトリの
init.sqlファイルをコンテナ内にマウントします。bashdocker run \ --rm \ -e CLICKHOUSE_DB=mqtt_data \ -e CLICKHOUSE_USER=emqx \ -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \ -e CLICKHOUSE_PASSWORD=public \ -p 18123:8123 \ -p 19000:9000 \ --ulimit nofile=262144:262144 \ -v $pwd/init.sql:/docker-entrypoint-initdb.d/init.sql \ clickhouse/clickhouse-server
DockerでのClickHouseの実行に関する詳細はdockerhubをご参照ください。
ClickHouse Cloudを使ったサーバー起動
https://clickhouse.cloud/ にアクセスし、サービスを作成するためにサインアップします。
公式ドキュメントのClickHouse Cloud - クイックスタートでClickHouseの使い方を学びます。
ClickHouse Cloudを作成後、Servicesページに移動し、対象のサービスをクリックしてSQLコンソールを開きます。
SQLコンソールの左側でQueriesをクリックし、+New queryをクリックします。
以下のSQLで
mqtt_dataデータベースを作成します。bashCREATE DATABASE IF NOT EXISTS mqtt_data; CREATE TABLE IF NOT EXISTS mqtt_data.messages ( data String, arrived TIMESTAMP ) ENGINE = MergeTree() ORDER BY arrived;
コネクターの作成
データ統合ルールを作成する前に、ClickHouseサーバーにアクセスするためのClickHouseコネクターを作成する必要があります。
ご自身のデプロイメントに移動し、左ナビゲーションメニューからData Integrationをクリックします。初めてコネクターを作成する場合は、Data Persistenceカテゴリの下にあるClickHouseを選択します。すでにコネクターを作成済みの場合は、New Connectorを選択し、同じくClickHouseを選択します。
Connector Nameはシステムが自動生成します。
接続情報を入力します。
- Connector name:コネクター名を入力します。大文字・小文字の英数字の組み合わせが推奨されます。例:
my_clickhouse - Server URL:
http://{host}:{port} - Database Name:
mqtt_data - Username:
emqx - Password:
public - ビジネスニーズに応じて詳細設定を行います(任意)。
- Connector name:コネクター名を入力します。大文字・小文字の英数字の組み合わせが推奨されます。例:
Testボタンをクリックします。ClickHouseサービスにアクセス可能であれば、connector availableのメッセージが表示されます。
Newボタンをクリックして作成を完了します。
ルールの作成
このセクションでは、EMQX CloudコンソールでClickHouseルールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。
使用する機能に基づいてSQL Editorでルールを設定します。ここでは、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信したときにエンジンをトリガーするSQLを作成します。以下のSQLを使用します。sqlSELECT timestamp div 1000 as up_timestamp, clientid as client_id, payload FROM "temp_hum/emqx"TIP
初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。
Nextをクリックしてアクションを追加します。
Connectorドロップダウンから先ほど作成したコネクターを選択します。
Batch Value Separatorを設定します。複数の入力項目を区切るための区切り文字で、ここではデフォルトの
,(カンマ)を使用します。注:デフォルトの
,はVALUES形式に適しています。その他の区切り文字も使用可能です。詳細はClickHouseのデータフォーマットをご参照ください。SQLテンプレートに以下のコマンドを入力します(ルールエンジンを使用して、入力SQL文の文字列を適切にエスケープし、SQLインジェクション攻撃を防止してください)。
sqlINSERT INTO messages(data, arrived) VALUES ('${payload}', ${up_timestamp})ここで、
${payload}と${up_timestamp}はメッセージ内容とタイムスタンプを表し、後でメッセージ転送ルールで設定されます。EMQX Cloudは転送前にこれらを対応する内容に置き換えます。SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上のUndefined Vars as Nullスイッチでルールエンジンの動作を切り替えられます。
Disabled(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。Enabled:変数が未定義の場合、ルールエンジンは
NULLを挿入します。TIP
可能な限りこのオプションは有効にしてください。無効化は後方互換性確保のためのみ推奨されます。
詳細設定(任意):高度な設定を参照してください。
Confirmボタンをクリックしてルール作成を完了します。
Successful new ruleポップアップでBack to Rulesをクリックし、データ統合設定チェーンを完了します。
ルールのテスト
MQTTXを使って温度・湿度データの送信をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxクライアントID:
test_clientペイロード:
json{ "temp": "27.5", "hum": "41.8" }
Publishをクリックしてメッセージを送信します。ClickHouseサーバーの
mqtt_dataデータベース内のmessagesテーブルにエントリが挿入されているはずです。以下のコマンドで確認できます。bashcurl -u emqx:public -X POST -d "SELECT * FROM mqtt_data.messages" http://{host}:18123正常に動作していれば、以下のような出力が得られます(タイムスタンプは異なります)。
bash{\n "temp": "27.5",\n "hum": "41.8"\n} 2024-03-27 09:35:11
高度な設定
このセクションでは、EMQX Cloud ClickHouseコネクターの高度な設定オプションについて詳しく説明します。コンソールでコネクターを設定する際、Advanced Settingsに移動し、以下のパラメータをニーズに合わせて調整してください。
| 項目 | 説明 | 推奨値 |
|---|---|---|
| Connection Pool Size | ClickHouseサービスとの接続プールで維持可能な同時接続数を指定します。システムのスケーラビリティと性能を管理するための設定です。 注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションの負荷に依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の原因となります。 | 8 |
| Clickhouse Timeout | ClickHouseサーバーへの接続確立を試みる際の最大待機時間(秒)を指定します。 注意:パフォーマンスとリソース使用のバランスを取るため、ネットワーク環境に応じて最適な値をテストしてください。 | 15 |
| Start Timeout | 自動起動されたリソースが正常な状態になるまで待機する最大時間(秒)を指定します。これにより、ClickHouseのデータベースインスタンスなどが完全に稼働準備できるまで処理を進めません。 | 5 |
| Buffer Pool Size | EMQX CloudとClickHouse間のデータ送信(イグレス)を管理するバッファワーカーの数を指定します。これらのワーカーは送信前のデータを一時的に保持・処理します。イングレスのみのブリッジでは「0」に設定可能です。 | 16 |
| Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。TTLを超えたリクエストや、ClickHouseからの応答・アックが得られない場合、リクエストは期限切れとみなされます。 | 45 |
| Health Check Interval | ClickHouseとの接続状態を自動的にチェックする間隔(秒)を指定します。 | 15 |
| Max Buffer Queue Size | ClickHouseコネクターの各バッファワーカーが保持可能な最大バイト数を指定します。データ転送要件やシステム性能に応じて調整してください。 | 256 |
| Batch Size | EMQX CloudからClickHouseへ一度に転送可能なデータバッチの最大サイズを指定します。1に設定すると、データはバッチ化せず個別に送信されます。 | 1 |
| Query Mode | メッセージ送信を最適化するために、asynchronous(非同期)またはsynchronous(同期)クエリモードを選択します。非同期モードでは、ClickHouseへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがClickHouseへの到達前にメッセージを受信する可能性があります。 | Async |
| Inflight Window | ClickHouseとの通信で応答待ちの「インフライトクエリ」の最大数を制御します。 Query Modeが asyncの場合、この設定は特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、値を1に設定してください。 | 100 |
さらに詳しく
以下のリンクから詳細情報をご覧いただけます。
ブログ: