MQTTデータをClickHouseに取り込む
ClickHouseは、高性能なカラム指向のSQLデータベース管理システム(DBMS)であり、オンライン分析処理(OLAP)に適しています。大量のデータを低レイテンシで処理・分析することに優れており、優れたクエリ性能、柔軟なデータモデル、スケーラブルな分散アーキテクチャを備えているため、さまざまなデータ分析シナリオに適しています。EMQXプラットフォームはClickHouseとの統合をサポートしており、MQTTメッセージおよびイベントデータをClickHouseに取り込んで、さらなる分析や処理を行うことが可能です。
動作概要
ClickHouseとのデータ統合は、EMQXプラットフォームに標準搭載された機能であり、MQTTのリアルタイムデータキャプチャと転送機能をClickHouseの強力なデータ処理機能と組み合わせることを目的としています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからClickHouseへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQXプラットフォームとClickHouse間のデータ統合の典型的なアーキテクチャを示しています。
MQTTデータをClickHouseに取り込む流れは以下の通りです:
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQXプラットフォームに正常に接続し、機械、センサー、製品ラインの稼働状態、計測値、またはトリガーされたイベントに基づくリアルタイムMQTTデータをEMQXプラットフォームにパブリッシュします。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQXプラットフォームで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをClickHouseにルーティングするかを決定します。ペイロードの変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
- ClickHouseへのデータ取り込み:ルールエンジンがClickHouseへの保存対象のメッセージを特定すると、メッセージをClickHouseに転送するアクションをトリガーします。処理済みデータはClickHouseデータベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データがClickHouseに保存されることで、企業はそのクエリ機能を活用してさまざまなユースケースに対応できます。例えば、物流やサプライチェーン管理分野では、GPSトラッカー、温度センサー、在庫管理システムなどのIoTデバイスからのデータをリアルタイムで監視・分析し、追跡、ルート最適化、需要予測、効率的な在庫管理に役立てることができます。
特長と利点
ClickHouseとのデータ統合は、効率的なデータ転送、保存、活用を実現するための多彩な特長と利点を提供します。
- リアルタイムデータストリーミング:EMQXプラットフォームはリアルタイムデータストリームの処理に最適化されており、ソースシステムからClickHouseへの効率的かつ信頼性の高いデータ転送を保証します。これにより、即時の洞察とアクションが必要なユースケースに最適です。
- 高性能かつスケーラブル:EMQXプラットフォームの分散アーキテクチャとClickHouseのカラムナストレージ形式により、データ量の増加に応じてシームレスにスケール可能です。大規模データセットでも一貫した性能と応答性を維持します。
- 柔軟なデータ変換:EMQXプラットフォームは強力なSQLベースのルールエンジンを提供し、ClickHouseに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様な変換機能をサポートし、ニーズに応じてデータを整形可能です。
- 簡単なデプロイと管理:EMQXプラットフォームはデータソースの設定、前処理ルール、ClickHouse保存設定のためのユーザーフレンドリーなインターフェースを提供し、データ統合プロセスのセットアップと運用管理を簡素化します。
- 高度な分析機能:ClickHouseの強力なSQLクエリ言語と複雑な分析関数のサポートにより、IoTデータから価値ある洞察を得られ、予測分析や異常検知などに活用できます。
はじめる前に
このセクションでは、EMQXプラットフォームコンソールでClickHouseデータ統合を作成する前に必要な準備について説明します。
前提条件
ネットワークの設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
ClickHouseサーバーの起動
ClickHouseサーバーは、Dockerを使う方法とClickHouse Cloudを使う方法のいずれかで起動し、データベースを作成できます。
Dockerを使ってClickHouseサーバーを起動する
以下の初期化SQL文を含む
init.sql
ファイルを作成します。このファイルはコンテナ起動時にデータベースを初期化するために使用されます。bashcat >init.sql <<SQL_INIT CREATE DATABASE IF NOT EXISTS mqtt_data; CREATE TABLE IF NOT EXISTS mqtt_data.messages ( data String, arrived TIMESTAMP ) ENGINE = MergeTree() ORDER BY arrived; SQL_INIT
以下のコマンドでClickHouseサーバーを起動します。このコマンドはデータベース名、ポート番号、ユーザー名、パスワードを指定し、カレントディレクトリの
init.sql
ファイルをDockerのディレクトリにマウントします。bashdocker run \ --rm \ -e CLICKHOUSE_DB=mqtt_data \ -e CLICKHOUSE_USER=emqx \ -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \ -e CLICKHOUSE_PASSWORD=public \ -p 18123:8123 \ -p 19000:9000 \ --ulimit nofile=262144:262144 \ -v $pwd/init.sql:/docker-entrypoint-initdb.d/init.sql \ clickhouse/clickhouse-server
DockerでのClickHouseの実行に関する詳細はdockerhubをご参照ください。
ClickHouse Cloudを使ってClickHouseサーバーを起動する
https://clickhouse.cloud/ にアクセスし、サインアップしてサービスを作成します。
公式ドキュメントのClickHouse Cloud - クイックスタートを参照して、ClickHouseの使い方を学びます。
ClickHouse Cloudを作成後、Servicesページで該当サービスをクリックし、SQLコンソールを開きます。
SQLコンソールの左側でQueriesをクリックし、+New queryをクリックします。
以下のSQLを実行して
mqtt_data
データベースを作成します。bashCREATE DATABASE IF NOT EXISTS mqtt_data; CREATE TABLE IF NOT EXISTS mqtt_data.messages ( data String, arrived TIMESTAMP ) ENGINE = MergeTree() ORDER BY arrived;
コネクターの作成
データ統合ルールを作成する前に、ClickHouseサーバーにアクセスするためのClickHouseコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるClickHouseを選択します。すでにコネクターを作成済みの場合は、新しいコネクターを選択し、同じくデータ永続化カテゴリのClickHouseを選択します。
コネクター名はシステムが自動的に生成します。
接続情報を入力します:
- コネクター名:英数字の組み合わせで名前を入力します(例:
my_clickhouse
)。 - サーバーURL:
http://{host}:{port}
- データベース名:
mqtt_data
- ユーザー名:
emqx
- パスワード:
public
- ビジネスニーズに応じて詳細設定を行います(任意)。
- コネクター名:英数字の組み合わせで名前を入力します(例:
テストボタンをクリックし、ClickHouseサービスにアクセス可能であれば「コネクター利用可能」のメッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
このセクションでは、EMQXプラットフォームコンソールを使ってClickHouseルールを作成し、ルールにアクションを追加する方法を説明します。
ルールエリアで新しいルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
使用する機能に基づいてSQLエディターでルールを設定します。例えば、クライアントが
temp_hum/emqx
トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするには、以下のSQLを使用します。sqlSELECT timestamp div 1000 as up_timestamp, clientid as client_id, payload FROM "temp_hum/emqx"
TIP
初心者の方はSQL例をクリックし、テスト有効化を使ってSQLルールの学習とテストを行うことをおすすめします。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
バッチ値区切り文字:複数の入力項目を区別するために、この例ではデフォルトの
,
をそのまま使用します。※デフォルトはカンマ(
,
)で、VALUES形式に適しています。その他の区切り文字も使用可能です。詳細はClickHouseのデータ形式をご参照ください。SQLテンプレートに以下のコマンドを入力します(ルールエンジンを使い、SQLインジェクション攻撃を防ぐために入力SQL内の文字列が適切にエスケープされていることを確認してください)。
sqlINSERT INTO messages(data, arrived) VALUES ('${payload}', ${up_timestamp})
ここで
${payload}
と${up_timestamp}
はそれぞれメッセージ内容とタイムスタンプを表し、後でルール内でメッセージ転送時に設定されます。EMQXプラットフォームは転送前にこれらを対応する内容に置き換えます。SQLテンプレート内でプレースホルダー変数が未定義の場合は、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチを切り替えてルールエンジンの動作を設定できます。
無効(デフォルト):ルールエンジンは文字列
undefined
をデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULL
を挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。
詳細設定(任意):高度な設定を参照してください。
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度・湿度データの報告をシミュレートするためにMQTTXの使用を推奨しますが、他のクライアントでも可能です。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
パブリッシュをクリックしてメッセージを送信します。ClickHouseサーバーの
mqtt_data
データベース内のmessages
テーブルにエントリが挿入されているはずです。以下のコマンドをターミナルで実行して確認できます。bashcurl -u emqx:public -X POST -d "SELECT * FROM mqtt_data.messages" http://{host}:18123
正常に動作していれば、以下のような出力が得られます(タイムスタンプは異なります)。
bash{\n "temp": "27.5",\n "hum": "41.8"\n} 2024-03-27 09:35:11
高度な設定
このセクションでは、EMQXプラットフォームのClickHouseコネクターで利用可能な高度な設定オプションについて詳述します。コンソールでコネクターを設定する際、詳細設定に移動して以下のパラメーターをニーズに合わせて調整してください。
項目 | 説明 | 推奨値 |
---|---|---|
接続プールサイズ | ClickHouseサービスとの接続プールで維持できる同時接続数を指定します。この設定はEMQXプラットフォームとClickHouse間のアクティブな接続数を制御し、アプリケーションのスケーラビリティと性能管理に役立ちます。 注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションのワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限の原因となります。 | 8 |
ClickHouseタイムアウト | ClickHouseサーバーへの接続確立時にコネクターが待機する最大時間(秒)を指定します。 注意:適切なタイムアウト設定はシステム性能とリソース利用のバランスを取るために重要です。ネットワーク状況に応じて最適値をテストしてください。 | 15 |
起動タイムアウト | 自動起動されたリソースが正常状態になるまでコネクターが待機する最大時間(秒)を指定します。この設定により、ClickHouseのデータベースインスタンスなどのリソースが完全に稼働し、データ処理準備が整うまで操作を進めないようにします。 | 5 |
バッファプールサイズ | EMQXプラットフォームとClickHouse間の出口方向(egress)データフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前に一時的にデータを保持・処理します。出口方向のみを扱うブリッジに適した設定であり、入口方向(ingress)のみのブリッジでは0 に設定可能です。 | 16 |
リクエストTTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストがTTLを超えてバッファに滞留するか、送信後にClickHouseからの応答やアックが得られない場合、そのリクエストは期限切れとみなされます。 | 45 |
ヘルスチェック間隔 | コネクターがClickHouseへの接続状態を自動的にチェックする間隔(秒)を指定します。 | 15 |
最大バッファキューサイズ | ClickHouseコネクターの各バッファワーカーがバッファリングできる最大バイト数を指定します。バッファワーカーはClickHouseへのデータ送信前にデータを一時的に保持し、データフローの効率化を図ります。システム性能やデータ転送要件に応じて調整してください。 | 256 |
バッチサイズ | EMQXプラットフォームからClickHouseへ一度に転送可能なデータバッチの最大サイズを指定します。サイズを調整することで、データ転送の効率と性能を最適化できます。 「最大バッチサイズ」が 1 に設定されている場合、データレコードはバッチ化されず個別に送信されます。 | 1 |
クエリモード | メッセージ転送要件に応じて非同期(asynchronous) または同期(synchronous) のクエリモードを選択できます。非同期モードでは、ClickHouseへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしません。ただし、クライアントがClickHouseへの書き込み完了前にメッセージを受信する可能性があります。 | Async |
インフライトウィンドウ | 「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。この設定は、ClickHouseと通信中に同時に存在可能なインフライトクエリの最大数を制御します。 クエリモードが async の場合、同一MQTTクライアントからのメッセージを厳密な順序で処理する必要がある場合、この値を1 に設定してください。 | 100 |
さらに詳しく
以下のリンクから詳細情報をご覧いただけます。
ブログ: