Skip to content

PostgreSQLへのMQTTデータ取り込み

PostgreSQLは、世界で最も高度なオープンソースのリレーショナルデータベースであり、シンプルなアプリケーションから複雑なデータ処理まで対応可能な強力なデータ処理能力を備えています。EMQXプラットフォームはPostgreSQLとの統合をサポートしており、IoTデバイスからのリアルタイムデータストリームを効率的に処理できます。この統合により、大規模なデータ保存、正確なクエリ、複雑なデータ関連分析が可能となり、データの整合性も確保されます。EMQXの効率的なメッセージルーティングとPostgreSQLの柔軟なデータモデルを活用することで、デバイスの状態監視、イベント追跡、操作監査が容易になり、ビジネスに深いデータ洞察と強力なビジネスインテリジェンスの支援を提供します。

本ページでは、EMQXプラットフォームとPostgreSQL間のデータ統合について、作成方法と検証手順を実践的に紹介します。

TIP

本ページの内容はMatrixDBにも適用可能です。

動作概要

PostgreSQLデータ統合は、EMQXプラットフォームに標準搭載された機能で、MQTTベースのIoTデータとPostgreSQLの強力なデータ保存機能を橋渡しします。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからPostgreSQLへのデータ取り込みと管理が簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXプラットフォームとPostgreSQL間の典型的なデータ統合アーキテクチャを示しています。

EMQX Integration PostgreSQL

MQTTデータをPostgreSQLに取り込む流れは以下の通りです:

  • IoTデバイスがEMQXプラットフォームに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。このイベントにはデバイスID、送信元IPアドレスなどの情報が含まれます。
  • メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  • ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、特定のトピックに基づいてメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
  • PostgreSQLへの書き込み:ルールによりメッセージのPostgreSQLへの書き込みがトリガーされます。SQLテンプレートを活用し、ルール処理結果からデータを抽出してSQLを構築し、PostgreSQLに送信して実行します。これにより、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりできます。

イベントやメッセージデータがPostgreSQLに書き込まれた後は、PostgreSQLに接続してデータを読み取り、柔軟なアプリケーション開発が可能です。例えば:

  • Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
  • デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス挙動を検知して潜在的な問題を速やかに解消する。

特長とメリット

PostgreSQLは豊富な機能を持つ人気のオープンソースリレーショナルデータベースです。PostgreSQLとのデータ統合により、以下の特長とメリットをビジネスにもたらします:

  • 柔軟なイベント処理:EMQXルールエンジンを通じて、PostgreSQLはデバイスのライフサイクルイベントを処理可能で、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に促進します。イベントデータを分析することで、デバイスの故障や異常挙動、トレンド変化を迅速に検知し、適切な対応を取ることができます。
  • メッセージ変換:EMQXプラットフォームのルールにより、メッセージはPostgreSQLに書き込まれる前に多様な処理や変換を受け、保存や利用がより便利になります。
  • 柔軟なデータ操作:PostgreSQLデータブリッジが提供するSQLテンプレートを活用し、特定フィールドのデータをPostgreSQLデータベースの対応テーブル・カラムに簡単に書き込みや更新ができ、柔軟なデータ保存と管理を実現します。
  • ビジネスプロセスの統合:PostgreSQLデータブリッジにより、デバイスデータをPostgreSQLの豊富なエコシステムアプリケーションと連携可能です。ERP、CRM、その他カスタムビジネスシステムとの統合を促進し、高度なビジネスプロセスや自動化を実現します。
  • IoTとGIS技術の融合:PostgreSQLはGISデータの保存・クエリ機能を備え、地理空間インデックス、ジオフェンシングやアラート、リアルタイム位置追跡、地理情報処理などをサポートします。EMQXの信頼性の高いメッセージ伝送能力と組み合わせることで、車両などのモバイルデバイスからの地理位置情報を効率的に処理・分析し、リアルタイム監視、インテリジェントな意思決定、ビジネス最適化を実現します。
  • ランタイムメトリクス:各ルールのランタイムメトリクス(総メッセージ数、成功/失敗数、現在のレートなど)を閲覧可能です。

柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に貢献します。

はじめる前に

本節では、EMQXプラットフォームでPostgreSQLデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

PostgreSQLのインストール

  1. DockerでPostgreSQLをインストールし、dockerイメージを起動します。
bash
# PostgreSQLのdockerイメージを起動し、パスワードをpublicに設定
docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres

# コンテナにアクセス
docker exec -it PostgreSQL bash

# コンテナ内でPostgreSQLサーバーに接続し、設定したパスワードを入力
psql -U postgres -W

# データベースを作成し、選択する

CREATE DATABASE emqx_data;

\c emqx_data;
  1. テーブルを作成します。以下のSQLコマンドでtemp_humテーブルを作成します。このテーブルはデバイスから報告される温度・湿度データの保存に使用します。
sql
CREATE TABLE temp_hum (
  up_timestamp   TIMESTAMPTZ       NOT NULL,
  client_id      TEXT              NOT NULL,
  temp           DOUBLE PRECISION  NULL,
  hum            DOUBLE PRECISION  NULL
);
  1. テストデータを挿入し、確認します。
sql
INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
VALUES (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
bash
emqx_data=# SELECT * FROM temp_hum;
      up_timestamp      |  client_id   | temp | hum
------------------------+--------------+------+-----
 2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 |  55
(1 row)

コネクターの作成

データ統合ルールを作成する前に、PostgreSQLサーバーにアクセスするためのPostgreSQLコネクターを作成する必要があります。

  1. デプロイメントにアクセスし、左側ナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からPostgreSQLを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのPostgreSQLを選択します。

  3. コネクター名:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • サーバーホスト:サーバーのIPアドレスとポート番号
    • データベース名emqx_dataを入力
    • ユーザー名postgresを入力
    • パスワードpublicを入力
  5. 暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをクリックします。

  6. 詳細設定(任意)

  7. テストボタンをクリックし、PostgreSQLサービスにアクセス可能であれば成功メッセージが表示されます。

  8. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをPostgreSQLに転送するためのアクションをルールに追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時刻up_timestamp、クライアントID、temp_hum/emqxトピックからのメッセージ本文(ペイロード)内の温度と湿度をそれぞれ読み取ります。

    sql
     SELECT
       timestamp div 1000 as up_timestamp,
       clientid as client_id,
       payload.temp as temp,
       payload.hum as hum
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、テストを有効にするを使ってSQLルールを学習・検証できます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 利用する機能に基づいてSQLテンプレートを設定します。注意:これは事前処理されたSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
     INSERT INTO temp_hum(up_timestamp, client_id, temp, hum)
     VALUES (
       to_timestamp(${up_timestamp}),
       ${client_id},
       ${temp},
       ${hum}
     )
  6. 詳細設定(任意)

  7. 確認ボタンをクリックしてルール作成を完了します。

  8. ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温度・湿度データの報告をシミュレーションするために、MQTTXの使用を推奨しますが、他のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. データダンプ結果を確認します。

bash
emqx_data=# SELECT * from temp_hum ORDER BY up_timestamp DESC LIMIT 10;
      up_timestamp      |  client_id   | temp | hum
------------------------+--------------+------+------
 2024-03-20 09:39:17+00 | test_client  | 27.5 | 41.8
 2020-10-29 09:23:34+00 | temp_hum-001 | 19.1 |   55
(2 rows)
  1. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報を閲覧できます。