PostgreSQLへのMQTTデータ取り込み
PostgreSQLは、世界で最も高度なオープンソースのリレーショナルデータベースであり、シンプルなアプリケーションから複雑なデータ処理まで対応可能な強力なデータ処理能力を備えています。EMQX CloudはPostgreSQLとの統合をサポートしており、IoTデバイスからのリアルタイムデータストリームを効率的に処理できます。この統合により、大規模なデータ保存、精密なクエリ、複雑なデータ関連分析が可能となり、データの整合性も確保されます。EMQXの効率的なメッセージルーティングとPostgreSQLの柔軟なデータモデルを活用することで、デバイスの状態監視、イベント追跡、操作監査が容易になり、企業に深いデータ洞察と強力なビジネスインテリジェンス支援を提供します。
本ページでは、EMQX CloudとPostgreSQL間のデータ統合について、作成方法と検証手順を実践的に紹介します。
TIP
本ページの内容はMatrixDBにも適用可能です。
動作概要
PostgreSQLデータ統合は、EMQX Cloudに標準搭載された機能で、MQTTベースのIoTデータとPostgreSQLの強力なデータ保存機能を橋渡しします。組み込みのルールエンジンコンポーネントにより、EMQX CloudからPostgreSQLへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQX CloudとPostgreSQL間の典型的なデータ統合アーキテクチャを示しています。

PostgreSQLへのMQTTデータ取り込みの流れは以下の通りです:
- IoTデバイスがEMQX Cloudに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの情報が含まれます。
- メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、特定のトピックに基づくメッセージやイベントを処理します。ルールにマッチしたメッセージやイベントは、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
- PostgreSQLへの書き込み:ルールがトリガーされると、メッセージがPostgreSQLに書き込まれます。SQLテンプレートを利用して、ルール処理結果からデータを抽出しSQLを構築、PostgreSQLに送信して実行することで、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりします。
イベントやメッセージデータがPostgreSQLに書き込まれた後は、PostgreSQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です:
- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常動作を検知して潜在的な問題を迅速に解消。
特長とメリット
PostgreSQLは豊富な機能を持つ人気のオープンソースリレーショナルデータベースです。PostgreSQLとのデータ統合により、以下の特長と利点が得られます:
- 柔軟なイベント処理:EMQXのルールエンジンを介して、PostgreSQLはデバイスのライフサイクルイベントを処理可能であり、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータの分析により、デバイスの故障や異常動作、傾向変化を迅速に検知し、適切な対策を講じられます。
- メッセージ変換:メッセージはEMQX Cloudのルールを通じて多様な処理や変換が可能であり、PostgreSQLへの保存や利用がより便利になります。
- 柔軟なデータ操作:PostgreSQLデータブリッジが提供するSQLテンプレートを利用することで、特定フィールドのデータを対応するテーブルやカラムに容易に書き込み・更新でき、柔軟なデータ保存・管理を実現します。
- 業務プロセスの統合:PostgreSQLデータブリッジにより、デバイスデータをPostgreSQLの豊富なエコシステムアプリケーションと連携可能です。ERP、CRM、その他カスタム業務システムとの統合を促進し、高度な業務プロセスや自動化を実現します。
- IoTとGIS技術の融合:PostgreSQLはGISデータの保存・クエリ機能を備え、地理空間インデックス、ジオフェンシングやアラート、リアルタイム位置追跡、地理情報処理などをサポートします。EMQXの信頼性の高いメッセージ伝送機能と組み合わせることで、車両などのモバイルデバイスからの地理位置情報を効率的に処理・分析し、リアルタイム監視、インテリジェントな意思決定、業務最適化を実現します。
- ランタイムメトリクス:各ルールのランタイムメトリクス(総メッセージ数、成功/失敗数、現在のレートなど)の閲覧をサポート。
柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。
はじめる前に
このセクションでは、EMQX CloudでPostgreSQLデータ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
PostgreSQLのインストール
- Docker経由でPostgreSQLをインストールし、dockerイメージを起動します。
# PostgreSQLのdockerイメージを起動し、パスワードをpublicに設定
docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres
# コンテナにアクセス
docker exec -it PostgreSQL bash
# コンテナ内のPostgreSQLサーバーに接続し、設定したパスワードを入力
psql -U postgres -W
# データベースを作成し、選択
CREATE DATABASE emqx_data;
\c emqx_data;- テーブル作成。以下のSQLコマンドで
temp_humテーブルを作成します。このテーブルはデバイスから報告される温度・湿度データを保存します。
CREATE TABLE temp_hum (
up_timestamp TIMESTAMPTZ NOT NULL,
client_id TEXT NOT NULL,
temp DOUBLE PRECISION NULL,
hum DOUBLE PRECISION NULL
);- テストデータの挿入と確認
INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
VALUES (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);emqx_data=# SELECT * FROM temp_hum;
up_timestamp | client_id | temp | hum
------------------------+--------------+------+-----
2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 | 55
(1 row)コネクターの作成
データ統合ルールを作成する前に、PostgreSQLサーバーにアクセスするためのコネクターを作成する必要があります。
デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるPostgreSQLを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのPostgreSQLを選択します。
コネクター名:システムが自動でコネクター名を生成します。
接続情報を入力します:
- サーバーホスト:サーバーのIPアドレスとポート番号
- データベース名:
emqx_dataを入力 - ユーザー名:
postgresを入力 - パスワード:
publicを入力
暗号化接続を確立したい場合は、TLSを有効にするトグルスイッチをクリックします。
詳細設定(任意)
テストボタンをクリックし、PostgreSQLサービスにアクセス可能であれば成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをPostgreSQLに転送するためのアクションを追加するルールを作成します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、
temp_hum/emqxトピックからメッセージの報告時刻up_timestamp、クライアントID、メッセージ本文(ペイロード)から温度と湿度をそれぞれ取得しています。sqlSELECT timestamp div 1000 as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
利用する機能に応じてSQLテンプレートを設定します。注意:これは事前処理されたSQLのため、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlINSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES ( to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum} )詳細設定(任意)
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
データのダンプ結果を確認
emqx_data=# SELECT * from temp_hum ORDER BY up_timestamp DESC LIMIT 10;
up_timestamp | client_id | temp | hum
------------------------+--------------+------+------
2024-03-20 09:39:17+00 | test_client | 27.5 | 41.8
2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 | 55
(2 rows)- コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報および当該ルールに属するすべてのアクションの統計情報が表示されます。