PostgreSQLへのMQTTデータ取り込み
PostgreSQLは、世界で最も高度なオープンソースのリレーショナルデータベースであり、シンプルなアプリケーションから複雑なデータ処理まで対応可能な強力なデータ処理能力を備えています。EMQXプラットフォームはPostgreSQLとの統合をサポートしており、IoTデバイスからのリアルタイムデータストリームを効率的に処理できます。この統合により、大規模なデータ保存、正確なクエリ、複雑なデータ関連分析が可能となり、データの整合性も確保されます。EMQXの効率的なメッセージルーティングとPostgreSQLの柔軟なデータモデルを活用することで、デバイスの状態監視、イベント追跡、操作監査が容易になり、ビジネスに深いデータ洞察と強力なビジネスインテリジェンスの支援を提供します。
本ページでは、EMQXプラットフォームとPostgreSQL間のデータ統合について、作成方法と検証手順を実践的に紹介します。
TIP
本ページの内容はMatrixDBにも適用可能です。
動作概要
PostgreSQLデータ統合は、EMQXプラットフォームに標準搭載された機能で、MQTTベースのIoTデータとPostgreSQLの強力なデータ保存機能を橋渡しします。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからPostgreSQLへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。
以下の図は、EMQXプラットフォームとPostgreSQL間の典型的なデータ統合アーキテクチャを示しています。
MQTTデータをPostgreSQLに取り込む流れは以下の通りです:
- IoTデバイスがEMQXプラットフォームに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。このイベントにはデバイスID、送信元IPアドレスなどの情報が含まれます。
- メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、特定のトピックに基づいてメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
- PostgreSQLへの書き込み:ルールによりメッセージのPostgreSQLへの書き込みがトリガーされます。SQLテンプレートを活用し、ルール処理結果からデータを抽出してSQLを構築し、PostgreSQLに送信して実行します。これにより、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりできます。
イベントやメッセージデータがPostgreSQLに書き込まれた後は、PostgreSQLに接続してデータを読み取り、柔軟なアプリケーション開発が可能です。例えば:
- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス挙動を検知して潜在的な問題を速やかに解消する。
特長とメリット
PostgreSQLは豊富な機能を持つ人気のオープンソースリレーショナルデータベースです。PostgreSQLとのデータ統合により、以下の特長とメリットをビジネスにもたらします:
- 柔軟なイベント処理:EMQXルールエンジンを通じて、PostgreSQLはデバイスのライフサイクルイベントを処理可能で、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に促進します。イベントデータを分析することで、デバイスの故障や異常挙動、トレンド変化を迅速に検知し、適切な対応を取ることができます。
- メッセージ変換:EMQXプラットフォームのルールにより、メッセージはPostgreSQLに書き込まれる前に多様な処理や変換を受け、保存や利用がより便利になります。
- 柔軟なデータ操作:PostgreSQLデータブリッジが提供するSQLテンプレートを活用し、特定フィールドのデータをPostgreSQLデータベースの対応テーブル・カラムに簡単に書き込みや更新ができ、柔軟なデータ保存と管理を実現します。
- ビジネスプロセスの統合:PostgreSQLデータブリッジにより、デバイスデータをPostgreSQLの豊富なエコシステムアプリケーションと連携可能です。ERP、CRM、その他カスタムビジネスシステムとの統合を促進し、高度なビジネスプロセスや自動化を実現します。
- IoTとGIS技術の融合:PostgreSQLはGISデータの保存・クエリ機能を備え、地理空間インデックス、ジオフェンシングやアラート、リアルタイム位置追跡、地理情報処理などをサポートします。EMQXの信頼性の高いメッセージ伝送能力と組み合わせることで、車両などのモバイルデバイスからの地理位置情報を効率的に処理・分析し、リアルタイム監視、インテリジェントな意思決定、ビジネス最適化を実現します。
- ランタイムメトリクス:各ルールのランタイムメトリクス(総メッセージ数、成功/失敗数、現在のレートなど)を閲覧可能です。
柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に貢献します。
はじめる前に
本節では、EMQXプラットフォームでPostgreSQLデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
PostgreSQLのインストール
- DockerでPostgreSQLをインストールし、dockerイメージを起動します。
# PostgreSQLのdockerイメージを起動し、パスワードをpublicに設定
docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres
# コンテナにアクセス
docker exec -it PostgreSQL bash
# コンテナ内でPostgreSQLサーバーに接続し、設定したパスワードを入力
psql -U postgres -W
# データベースを作成し、選択する
CREATE DATABASE emqx_data;
\c emqx_data;
- テーブルを作成します。以下のSQLコマンドで
temp_hum
テーブルを作成します。このテーブルはデバイスから報告される温度・湿度データの保存に使用します。
CREATE TABLE temp_hum (
up_timestamp TIMESTAMPTZ NOT NULL,
client_id TEXT NOT NULL,
temp DOUBLE PRECISION NULL,
hum DOUBLE PRECISION NULL
);
- テストデータを挿入し、確認します。
INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
VALUES (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
emqx_data=# SELECT * FROM temp_hum;
up_timestamp | client_id | temp | hum
------------------------+--------------+------+-----
2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 | 55
(1 row)
コネクターの作成
データ統合ルールを作成する前に、PostgreSQLサーバーにアクセスするためのPostgreSQLコネクターを作成する必要があります。
デプロイメントにアクセスし、左側ナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの中からPostgreSQLを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのPostgreSQLを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- サーバーホスト:サーバーのIPアドレスとポート番号
- データベース名:
emqx_data
を入力 - ユーザー名:
postgres
を入力 - パスワード:
public
を入力
暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをクリックします。
詳細設定(任意)
テストボタンをクリックし、PostgreSQLサービスにアクセス可能であれば成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをPostgreSQLに転送するためのアクションをルールに追加します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時刻
up_timestamp
、クライアントID、temp_hum/emqx
トピックからのメッセージ本文(ペイロード)内の温度と湿度をそれぞれ読み取ります。sqlSELECT timestamp div 1000 as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
初心者の方は、SQL例をクリックし、テストを有効にするを使ってSQLルールを学習・検証できます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
利用する機能に基づいてSQLテンプレートを設定します。注意:これは事前処理されたSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlINSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES ( to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum} )
詳細設定(任意)
確認ボタンをクリックしてルール作成を完了します。
ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度・湿度データの報告をシミュレーションするために、MQTTXの使用を推奨しますが、他のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
データダンプ結果を確認します。
emqx_data=# SELECT * from temp_hum ORDER BY up_timestamp DESC LIMIT 10;
up_timestamp | client_id | temp | hum
------------------------+--------------+------+------
2024-03-20 09:39:17+00 | test_client | 27.5 | 41.8
2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 | 55
(2 rows)
- コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報を閲覧できます。