Skip to content

TimescaleDB に MQTT データをストリームする

TimescaleDB(Timescale)は、時系列データの保存と分析に特化したデータベースです。優れたデータスループットと信頼性の高いパフォーマンスにより、IoT(モノのインターネット)分野に最適な選択肢となっており、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存と分析ソリューションを提供します。

本ページでは、TimescaleDB データ統合の機能的特徴を詳しく紹介し、作成手順について実践的なガイドを提供します。内容は、TimescaleDB コネクターの作成、ルールの作成、ルールのテストを含みます。シミュレートした温度・湿度データを MQTT プロトコル経由で EMQX Cloud に報告し、設定したデータ統合を通じて TimescaleDB に保存する方法を示します。

動作の仕組み

TimescaleDB データ統合は、EMQX に組み込まれた機能であり、EMQX のリアルタイムデータキャプチャおよび転送能力と TimescaleDB のデータ保存・分析能力を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX から TimescaleDB へのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、産業用 IoT における EMQX と TimescaleDB のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud Integration TimescaleDB

EMQX と TimescaleDB は、エネルギー消費データをリアルタイムで効率的に収集・分析するスケーラブルな IoT プラットフォームを提供します。このアーキテクチャでは、EMQX がデバイス接続、メッセージ送受信、データルーティングを担う IoT プラットフォームとして機能し、TimescaleDB はデータ保存と分析プラットフォームとして役割を果たします。

EMQX はルールエンジンと Sink を通じてデバイスデータを TimescaleDB に転送します。TimescaleDB は SQL 文を用いてデータを分析し、レポートやチャートなどの分析結果を生成し、TimescaleDB の可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスは MQTT プロトコルで EMQX に正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージを処理します。メッセージ到着時にルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
  3. TimescaleDB へのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージを TimescaleDB に書き込む操作を実行します。TimescaleDB Sink は SQL テンプレートを提供し、特定のメッセージフィールドを対応するテーブル・カラムに柔軟に書き込むデータ形式を定義できます。

エネルギー消費データが TimescaleDB に書き込まれた後は、SQL 文を用いて柔軟にデータ分析が可能です。例えば:

  • Grafana などの可視化ツールに接続し、チャートを生成してエネルギー消費データを表示する。
  • ERP などのアプリケーションシステムに接続し、生産分析や生産計画の調整に活用する。
  • 業務システムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を支援する。

特徴と利点

EMQX における TimescaleDB データ統合は、以下の特徴と利点をビジネスにもたらします。

  • 効率的なデータ処理:EMQX は多数の IoT デバイス接続とメッセージスループットを効率的に処理可能です。TimescaleDB はデータの書き込み、保存、クエリに優れており、IoT シナリオのデータ処理ニーズをシステムに負荷をかけずに満たします。
  • メッセージ変換:メッセージは EMQX のルール内で豊富な処理や変換を経てから TimescaleDB に書き込まれます。
  • 効率的な保存とスケーラビリティ:EMQX と TimescaleDB はいずれもクラスターのスケールアウト機能を持ち、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
  • 高度なクエリ機能:TimescaleDB はタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT 時系列データから精緻な洞察を引き出せます。

はじめる前に

本節では、EMQX Cloud で TimescaleDB データ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

Timescale のインストールとデータテーブルの作成

EMQX Cloud は、セルフホストの TimescaleDB またはクラウド上の Timescale サービスとの統合をサポートしています。Timescale サービスをクラウドサービスとして利用するか、Docker を用いて TimescaleDB インスタンスをデプロイできます。

クラウド上で Timescale サービスインスタンスとテーブルを作成する

TimescaleDB Cloud を初めて利用する場合は、ヘルプドキュメントを参照してください。

  1. Timescale アカウントをお持ちでない場合は、Timescale アカウントの作成を参照してアカウントを作成します。

  2. Timescale ポータルにログインし、Timescale サービスの作成を行います。サービスのパスワードを保存してください。

  3. サービス概要ページから接続情報を取得します。EMQX が必要とする項目は、データベース名ホストポートユーザー名です。

  4. psql クライアントでサービスに接続します。

    bash
    # サービス URL による接続例
    psql "postgres://tsdbadmin@xxxxx.xxxxx.tsdb.cloud.timescale.com:xxxxx/tsdb?sslmode=require"
  5. クライアントからのメッセージデータを保存するための temp_hum テーブルを作成します。

    bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );
    
    SELECT create_hypertable('temp_hum', 'up_timestamp');

    テストデータを挿入して確認します。

    bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
    
    SELECT * from temp_hum;

    テーブル作成後、Services の Explorer タブで temp_hum テーブルの情報を確認できます。

  6. Timescale インスタンスの Max_Connections を最適化します。

    Timescale インスタンス詳細ページの Operations -> Database Parameters -> Common Parameters に移動し、最大接続数を 100 以上に増やします。

    EMQX Cloud Integration TimescaleDB

Docker で TimescaleDB をインストールしテーブルを作成する

  1. Docker 環境がない場合は、Docker のインストールを参照してください。

  2. Docker で TimescaleDB コンテナを作成し、POSTGRES_PASSWORD 環境変数でデータベースのパスワードを設定します。

    bash
    docker run -d --name timescaledb \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=public \
    timescale/timescaledb:latest-pg13
  3. クライアントデータを保存するデータベースを作成します。

    bash
    docker exec -it timescaledb psql -U postgres
    
    ## tsdb データベースを作成
    CREATE database tsdb;
    
    \c tsdb;
  4. クライアントからのメッセージデータを保存する temp_hum テーブルを作成します。このテーブルはデバイスから報告される温度・湿度データを保存します。

    bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );
    
    SELECT create_hypertable('temp_hum', 'up_timestamp');

    テストデータを挿入して確認します。

    bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
    
    SELECT * from temp_hum;

TimescaleDB コネクターの作成

データ統合ルールを作成する前に、TimescaleDB サーバーにアクセスするためのコネクターを作成する必要があります。

  1. ご自身のデプロイメントにアクセスし、左側のナビゲーションメニューから データ統合 をクリックします。
  2. 初めてコネクターを作成する場合は、データ永続化 カテゴリの中から TimescaleDB を選択します。既にコネクターを作成済みの場合は、新規コネクター を選択し、データ転送 カテゴリの中から TimescaleDB を選択します。
  3. 新規コネクター ページで以下のオプションを設定します。
    • サーバーホスト:TimescaleDB サービスのホストとポートを入力します。Docker を使用している場合は http://<サーバーアドレス>:5432 を指定します。
    • データベース名:TimescaleDB のデータベース名を入力します。
    • ユーザー名とパスワード:TimescaleDB のユーザー名とパスワードを入力します。Docker でデプロイした場合は以下の認証情報を使用します。
      • データベース名:tsdb
      • ユーザー名:postgres
      • パスワード:public
    • 詳細設定:接続プールサイズは TimescaleDB Cloud を使用する場合、1 に設定してください。
    • その他の設定はデフォルトのままか、ビジネス要件に応じて設定してください。
  4. テスト ボタンをクリックします。TimescaleDB サービスにアクセス可能であれば、成功メッセージが表示されます。
  5. 新規作成 ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定するルールを作成し、そのルールに対応するアクションを追加して処理済みデータを TimescaleDB に転送します。

  1. ルールエリアで 新規ルール をクリックするか、作成したコネクターの アクション 列にある新規ルールアイコンをクリックします。

  2. SQL エディター にルールのマッチング SQL 文を入力します。以下の SQL 例は、temp_hum/emqx トピックに送信されたメッセージから報告時刻 up_timestamp、クライアント ID、メッセージ本体(ペイロード)を読み取り、温度と湿度を抽出します。

    sql
     SELECT 
     timestamp div 1000 AS up_timestamp, 
     clientid AS client_id, 
     payload.temp AS temp, 
     payload.hum AS hum
    
     FROM
     "temp_hum/emqx"
  3. Try It Out を使ってデータ入力をシミュレートし、結果をテストできます。

  4. 次へ をクリックしてアクションを追加します。

  5. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  6. SQL テンプレートに以下のデータを入力し、SQL テンプレートに挿入します。

    sql
     INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES (to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum})

    詳細設定(任意)については、詳細設定を参照してください。

  7. 確定 ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功 のポップアップで ルールに戻る をクリックし、データ統合設定の一連の作業を完了します。

ルールのテスト

温度・湿度データの報告をシミュレートするために、MQTTX の使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": 27.5,
        "hum": 41.8
      }
  2. メッセージが TimescaleDB に転送されているか確認します。

    ターミナルで以下のコマンドを使用し、Timescale データベースに取り込まれたデータを表示します。

    bash
    SELECT * from temp_hum order by up_timestamp desc limit 10;

    TimescaleDB の temp_hum テーブルを確認し、新しいレコードが挿入されていることを検証してください。

  3. コンソールで運用データを確認します。ルール一覧でルール ID をクリックすると、ルールの統計情報とそのルールに属するすべてのアクションの統計情報が表示されます。