TimescaleDBへのMQTTデータ取り込み
TimescaleDB(Timescale)は、時系列データの保存と分析に特化したデータベースです。優れたデータスループットと信頼性の高いパフォーマンスにより、IoT(モノのインターネット)分野に最適であり、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存と分析ソリューションを提供します。
本ページでは、EMQXとTimescaleDBのデータ連携について、実践的な手順を交えて包括的に紹介します。
仕組み
TimescaleDBデータ連携はEMQXに組み込まれた機能であり、EMQXのリアルタイムデータキャプチャと送信能力をTimescaleDBのデータ保存・分析能力と組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからTimescaleDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、産業用IoTにおけるEMQXとTimescaleDBの典型的な連携アーキテクチャを示しています。
EMQXとTimescaleDBは、エネルギー消費データをリアルタイムに効率よく収集・分析するためのスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイスの接続、メッセージ送信、データルーティングを担うIoTプラットフォームとして機能し、TimescaleDBがデータ保存・分析プラットフォームとしてデータの保存と分析を担当します。
EMQXはルールエンジンとSinkを通じてデバイスデータをTimescaleDBに転送します。TimescaleDBはSQL文でデータを分析し、レポートやチャートなどの分析結果を生成し、TimescaleDBの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです:
- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子や消費値が含まれます。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
- TimescaleDBへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTimescaleDBに書き込む操作をトリガーします。TimescaleDB SinkはSQLテンプレートを提供し、特定のメッセージフィールドをTimescaleDBの対応テーブル・カラムに柔軟に書き込めるようにします。
エネルギー消費データがTimescaleDBに書き込まれた後は、SQL文を使って柔軟にデータ分析が可能です。例えば:
- Grafanaなどの可視化ツールに接続し、チャートを生成してエネルギー消費データを表示する。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整に活用する。
- 業務システムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援する。
特長とメリット
EMQXのTimescaleDBデータ連携は、以下の特長と利点をビジネスにもたらします:
- 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TimescaleDBはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理ニーズをシステムに負荷をかけずに満たします。
- メッセージ変換:メッセージはEMQXのルール内で豊富に処理・変換されてからTimescaleDBに書き込まれます。
- 効率的な保存とスケーラビリティ:EMQXとTimescaleDBはどちらもクラスターのスケールアウト機能を持ち、ビジネスの成長に応じて柔軟に水平スケールが可能です。
- 高度なクエリ機能:TimescaleDBはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を引き出せます。
はじめる前に
このセクションでは、TimescaleDBデータ連携を作成する前に必要な準備、TimescaleDBのインストールやデータテーブルの作成について説明します。
前提条件
Timescaleのインストールとデータテーブル作成
EMQXはセルフホストのTimescaleDBまたはクラウドのTimescale Serviceとの連携をサポートしています。Timescale Serviceをクラウドサービスとして利用するか、DockerでTimescaleDBインスタンスをデプロイできます。
コネクターの作成
TimescaleDB Sinkを作成する前に、TimescaleDBサービスに接続するためのTimescaleDBコネクターを作成する必要があります。
以下の手順は、EMQXとTimescaleDB(セルフホストの場合)をローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。
- EMQXダッシュボードにアクセスし、左のナビゲーションメニューからIntegration -> Connectorをクリックします。
- ページ右上のCreateをクリックします。
- コネクター一覧からTimescaleDBを選択し、Nextをクリックします。
- Connector Nameに名前を入力します。例:
my-timescale
。名前は英数字の組み合わせにしてください。 - TimescaleDBのデプロイ方法に応じて接続情報を入力します。Dockerでデプロイした場合は、Server Hostに
127.0.0.1:5432
、Database Nameにtsdb
、Usernameにpostgres
、Passwordにpublic
を入力します。 - 詳細設定(任意):詳細はSinkの機能を参照してください。
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTimescaleDBサーバーに接続できるか確認できます。
- Createボタンをクリックしてコネクター作成を完了します。
これでTimescaleDBコネクターが作成されました。次にルールとSinkを作成し、TimescaleDBデータベースに書き込むデータを指定します。
TimescaleDB Sink付きルールの作成
このセクションでは、ダッシュボードでMQTTトピックt/#
からのメッセージを処理し、処理結果を設定済みのSink経由でTimescaleDBに送信するルールの作成方法を示します。
EMQXダッシュボードにアクセスし、左のナビゲーションメニューからIntegration -> Rulesをクリックします。
ページ右上の**+ Create**をクリックします。
ルール作成ページで、ルールIDに
my_rule
を入力します。SQL Editorに以下のSQLルールを入力し、トピック
t/#
のMQTTメッセージをTimescaleDBに保存します:sqlSELECT payload.temp as temp, payload.humidity as humidity, payload.location as location FROM "t/#"
注:初心者の方はSQL Examplesをクリックし、Enable TestでSQLルールの学習とテストが可能です。
+ Add Actionボタンをクリックして、ルールによりトリガーされるアクションを定義します。Type of Actionドロップダウンリストから
TimescaleDB
を選択すると、EMQXはルールで処理したデータをTimescaleDBに送信します。Actionドロップダウンは
Create Action
のままにするか、既存のTimescaleDBアクションを選択できます。本デモでは新しいSinkを作成しルールに追加します。NameとDescriptionテキストボックスにSinkの名前と説明を入力します。
Connectorドロップダウンから先ほど作成した
my-timescale
を選択します。隣のボタンで新規コネクター作成も可能です。設定パラメータの詳細はコネクター作成を参照してください。以下のSQL文を使ってSQL Templateを設定します。
注:これは前処理済みのSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlINSERT INTO sensor_data (time, location, temperature, humidity) VALUES (NOW(), ${location}, ${temp}, ${humidity})
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):詳細設定を参照してください。
AddボタンをクリックしてSinkの設定を完了します。ルール作成ページのAction Outputsタブに新しいSinkが表示されます。
ルール作成ページで設定内容を確認し、Createボタンをクリックしてルールを生成します。作成したルールはルール一覧に表示され、statusは
connected
となります。
これでルールの作成が完了し、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいTimescaleDB Sinkが確認できます。
また、Integration -> Flow Designerをクリックするとトポロジーを確認できます。トピックt/#
のメッセージがルールmy_rule
で解析され、TimescaleDBに送信・保存されていることがわかります。
ルールのテスト
MQTTXを使ってトピックt/1
にメッセージを送信し、同時にオンライン/オフラインイベントをトリガーします:
mqttx pub -i emqx_c -t t/1 -m '{"temp":24,"humidity":30,"location":"hangzhou"}'
Sinkの稼働状況を確認すると、1件の新しいMatchedと1件のSent Successfullyメッセージがあるはずです。
TimescaleDBのsensor_data
テーブルを確認すると、新しいレコードが挿入されています:
tsdb=# select * from sensor_data;
time | location | temperature | humidity
-------------------------------+----------+-------------+----------
2023-07-10 08:28:48.813988+00 | hangzhou | 24 | 30
2023-07-10 08:28:57.737768+00 | hangzhou | 24 | 30
2023-07-10 08:28:58.599537+00 | hangzhou | 24 | 30
(3 rows)
詳細設定
このセクションでは、TimescaleDB Sinkの詳細設定オプションについて説明します。ダッシュボードでSinkを設定する際、Advanced Settingsに移動して以下のパラメータをニーズに合わせて調整できます。
項目 | 説明 | 推奨値 |
---|---|---|
Connection Pool Size | Timescaleサービスと接続する際に維持可能な同時接続数を指定します。この設定はEMQXとTimescaleDB間のアクティブな接続数を制御し、アプリケーションのスケーラビリティとパフォーマンスを管理します。 注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、ワークロードに依存します。大きすぎるとリソース枯渇、小さすぎるとスループット制限となる可能性があります。 | 8 |
Start Timeout | 自動起動されたリソースが正常な状態になるまで、コネクターがリソース作成要求に応答せずに待機する最大秒数を指定します。これにより、TimescaleDBのデータベースインスタンスなどのリソースが完全に稼働し、データ処理準備が整うまで操作を進めないようにします。 | 5 |
Buffer Pool Size | EMQXとTimescaleDB間の送信タイプSinkでデータフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータ送信前に一時的にデータを保持・処理します。受信のみのSinkでは不要なため0 に設定可能です。 | 16 |
Request TTL | バッファに入ったリクエストが有効とみなされる最大秒数を指定します。リクエストがTTLを超えてバッファに留まるか、送信後にTimescaleDBからの応答やアックがタイムリーに得られない場合、そのリクエストは期限切れとみなされます。 | 45 |
Health Check Interval | SinkがTimescaleDBへの接続状態を自動的にチェックする間隔(秒)を指定します。 | 15 |
Max Buffer Queue Size | TimescaleDB Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータ送信前の一時保管を担い、システム性能やデータ転送要件に応じて調整可能です。 | 256 |
Max Batch Size | EMQXからTimescaleDBへ一度に送信するデータバッチの最大サイズを指定します。サイズ調整によりデータ転送の効率とパフォーマンスを最適化できます。1 に設定すると、データレコードはバッチ化せず個別に送信されます。 | 1 |
Query Mode | メッセージ送信を最適化するために、asynchronous (非同期)またはsynchronous (同期)クエリモードを選択できます。非同期モードではTimescaleDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受信するタイミングがTimescaleDBへの書き込みより先行する可能性があります。 | Async |
Inflight Window | 「インフライトクエリ」とは開始されたがまだ応答やアックを受け取っていないクエリを指します。SinkがTimescaleDBと通信する際に同時に存在可能なインフライトクエリの最大数を制御します。 Query Modeが async の場合、この値は特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合は1 に設定してください。 | 100 |
さらに詳しく
以下のリンクから詳細情報をご覧いただけます:
ブログ: