TimescaleDBへMQTTデータをストリーム送信する
TimescaleDB(Timescale)は、時系列データの保存と解析に特化したデータベースです。優れたデータスループットと信頼性の高いパフォーマンスにより、IoT(Internet of Things)分野に最適な選択肢であり、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存と解析ソリューションを提供します。
本ページでは、TimescaleDBデータ統合の機能について詳しく紹介し、作成方法の実践的な手順を解説します。内容はTimescaleDBコネクターの作成、ルールの作成、ルールのテストを含みます。シミュレートした温度・湿度データをMQTTプロトコル経由でEMQXプラットフォームに報告し、設定したデータ統合を通じてTimescaleDBにデータを保存する方法を示します。
動作概要
TimescaleDBデータ統合は、EMQXに組み込まれた機能であり、EMQXのリアルタイムデータキャプチャと転送機能とTimescaleDBのデータ保存・解析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからTimescaleDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、産業用IoTにおけるEMQXとTimescaleDBのデータ統合の典型的なアーキテクチャを示しています。
EMQXとTimescaleDBは、エネルギー消費データをリアルタイムで効率的に収集・解析するためのスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイスの接続、メッセージ転送、データルーティングを担当するIoTプラットフォームとして機能し、TimescaleDBはデータ保存と解析プラットフォームとして役割を担います。
EMQXはルールエンジンとSinkを通じてデバイスデータをTimescaleDBに転送します。TimescaleDBはSQL文でデータを解析し、レポートやグラフなどの解析結果を生成し、TimescaleDBの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです。
- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定の送信元からのメッセージを処理します。メッセージが到着すると、ルールエンジンが対応するルールと照合し、メッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
- TimescaleDBへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTimescaleDBに書き込む操作をトリガーします。TimescaleDB SinkはSQLテンプレートを提供し、特定のメッセージフィールドをTimescaleDBの対応テーブルやカラムに柔軟に書き込むことが可能です。
エネルギー消費データがTimescaleDBに書き込まれた後は、SQL文を用いて柔軟にデータ解析が行えます。例えば:
- Grafanaなどの可視化ツールに接続し、グラフを生成してエネルギー消費データを表示する。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
- 業務システムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を支援する。
特長と利点
EMQXにおけるTimescaleDBデータ統合は、以下の特長と利点をビジネスにもたらします。
- 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TimescaleDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに過度な負担をかけずに満たします。
- メッセージ変換:メッセージはEMQXのルール内で豊富な処理や変換を経てからTimescaleDBに書き込まれます。
- 効率的な保存とスケーラビリティ:EMQXとTimescaleDBは共にクラスターのスケーリング機能を備え、ビジネスの成長に応じて柔軟に水平スケールが可能です。
- 高度なクエリ機能:TimescaleDBはタイムスタンプデータの効率的なクエリ・解析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確な洞察を引き出せます。
はじめる前に
本節では、EMQXプラットフォームでTimescaleDBデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Timescaleのインストールとデータテーブルの作成
EMQXプラットフォームは、自身で構築したTimescaleDBまたはクラウド上のTimescaleサービスとの統合をサポートします。Timescaleサービスをクラウドサービスとして利用するか、Dockerを使ってTimescaleDBインスタンスをデプロイできます。
クラウド上でTimescaleサービスインスタンスとテーブルを作成する
TimescaleDB Cloudを初めて利用する場合は、ヘルプドキュメントを参照してください。
Timescaleアカウントをお持ちでない場合は、Timescaleアカウントの作成を参考にアカウントを作成します。
Timescaleポータルにログインし、Timescaleサービスの作成を行います。サービスのパスワードを控えておきます。
サービス概要ページから接続情報を取得します。EMQXで必要な項目はデータベース名、ホスト、ポート、ユーザー名です。
psqlクライアントでサービスに接続します。
bash# サービスURLで接続 psql "postgres://tsdbadmin@xxxxx.xxxxx.tsdb.cloud.timescale.com:xxxxx/tsdb?sslmode=require"
クライアントからのメッセージデータを保存するためのテーブル
temp_hum
を作成します。bashCREATE TABLE temp_hum ( up_timestamp TIMESTAMPTZ NOT NULL, client_id TEXT NOT NULL, temp DOUBLE PRECISION NULL, hum DOUBLE PRECISION NULL ); SELECT create_hypertable('temp_hum', 'up_timestamp');
テストデータを挿入し、確認します。
bashINSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55); SELECT * from temp_hum;
テーブル作成後、サービスのExplorerタブで
temp_hum
テーブルの情報を確認できます。TimescaleインスタンスのMax_Connectionsの最適化
Timescaleインスタンス詳細ページの
Operations
->Database Parameters
->Common Parameters
に移動し、最大接続数を100以上に増やします。
DockerでTimescaleDBをインストールしテーブルを作成する
Docker環境がない場合は、Dockerのインストールを参照してください。
DockerでTimescaleDBコンテナを作成し、POSTGRES_PASSWORD環境変数でデータベースのパスワードを設定します。
bashdocker run -d --name timescaledb \ -p 5432:5432 \ -e POSTGRES_PASSWORD=public \ timescale/timescaledb:latest-pg13
クライアントデータを保存するデータベースを作成します。
bashdocker exec -it timescaledb psql -U postgres ## tsdbデータベースを作成 CREATE database tsdb; \c tsdb;
クライアントからのメッセージデータを保存するテーブル
temp_hum
を作成します。デバイスから報告される温度・湿度データを保存するためのテーブルです。bashCREATE TABLE temp_hum ( up_timestamp TIMESTAMPTZ NOT NULL, client_id TEXT NOT NULL, temp DOUBLE PRECISION NULL, hum DOUBLE PRECISION NULL ); SELECT create_hypertable('temp_hum', 'up_timestamp');
テストデータを挿入し、確認します。
bashINSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55); SELECT * from temp_hum;
TimescaleDBコネクターを作成する
データ統合ルールを作成する前に、TimescaleDBサーバーにアクセスするためのコネクターを作成します。
- デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
- 初めてコネクターを作成する場合は、データ永続化カテゴリの中からTimescaleDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、データ転送カテゴリの中からTimescaleDBを選択します。
- 新規コネクターページで以下のオプションを設定します。
- サーバーホスト:TimescaleDBサービスのホストとポートを入力します。Docker利用時は
http://<サーバーアドレス>:5432
を使用します。 - データベース名:TimescaleDBのデータベース名を入力します。
- ユーザー名とパスワード:TimescaleDBのユーザー名とパスワードを入力します。Dockerでデプロイした場合のデフォルトは以下です。
- データベース名:
tsdb
- ユーザー名:
postgres
- パスワード:
public
- データベース名:
- 詳細設定:接続プールサイズはTimescaleDB Cloudを利用する場合、1に設定します。
- その他の設定はデフォルト値を使用するか、ビジネス要件に応じて調整してください。
- サーバーホスト:TimescaleDBサービスのホストとポートを入力します。Docker利用時は
- テストボタンをクリックし、TimescaleDBサービスにアクセス可能か確認します。成功すると成功メッセージが表示されます。
- 新規作成ボタンをクリックして作成を完了します。
ルールを作成する
次に、書き込むデータを指定するルールを作成し、処理済みデータをTimescaleDBに転送するアクションをルールに追加します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のSQL例は、
temp_hum/emqx
トピックに送信されたメッセージから報告時刻up_timestamp
、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。sqlSELECT timestamp div 1000 AS up_timestamp, clientid AS client_id, payload.temp AS temp, payload.hum AS hum FROM "temp_hum/emqx"
テスト有効化を使ってデータ入力をシミュレートし、結果をテストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
SQLテンプレートに以下のデータを入力し、SQLテンプレートに挿入します。
sqlINSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES (to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum})
詳細設定(任意)については、詳細設定を参照してください。
確定ボタンをクリックしてルール作成を完了します。
新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールをテストする
温度・湿度データの報告をシミュレートするには、MQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": 27.5, "hum": 41.8 }
メッセージがTimescaleDBに転送されているか確認します。
ターミナルで以下のコマンドを実行し、Timescaleデータベースに取り込まれたデータを確認します。
bashSELECT * from temp_hum order by up_timestamp desc limit 10;
TimescaleDBの
temp_hum
テーブルを確認し、新しいレコードが挿入されていることを検証してください。コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。