Skip to content

TimescaleDBへMQTTデータをストリーム送信する

TimescaleDB(Timescale)は、時系列データの保存と解析に特化したデータベースです。優れたデータスループットと信頼性の高いパフォーマンスにより、IoT(Internet of Things)分野に最適な選択肢であり、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存と解析ソリューションを提供します。

本ページでは、TimescaleDBデータ統合の機能について詳しく紹介し、作成方法の実践的な手順を解説します。内容はTimescaleDBコネクターの作成、ルールの作成、ルールのテストを含みます。シミュレートした温度・湿度データをMQTTプロトコル経由でEMQXプラットフォームに報告し、設定したデータ統合を通じてTimescaleDBにデータを保存する方法を示します。

動作概要

TimescaleDBデータ統合は、EMQXに組み込まれた機能であり、EMQXのリアルタイムデータキャプチャと転送機能とTimescaleDBのデータ保存・解析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからTimescaleDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、産業用IoTにおけるEMQXとTimescaleDBのデータ統合の典型的なアーキテクチャを示しています。

EMQX Platform Integration TimescaleDB

EMQXとTimescaleDBは、エネルギー消費データをリアルタイムで効率的に収集・解析するためのスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイスの接続、メッセージ転送、データルーティングを担当するIoTプラットフォームとして機能し、TimescaleDBはデータ保存と解析プラットフォームとして役割を担います。

EMQXはルールエンジンとSinkを通じてデバイスデータをTimescaleDBに転送します。TimescaleDBはSQL文でデータを解析し、レポートやグラフなどの解析結果を生成し、TimescaleDBの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定の送信元からのメッセージを処理します。メッセージが到着すると、ルールエンジンが対応するルールと照合し、メッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
  3. TimescaleDBへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTimescaleDBに書き込む操作をトリガーします。TimescaleDB SinkはSQLテンプレートを提供し、特定のメッセージフィールドをTimescaleDBの対応テーブルやカラムに柔軟に書き込むことが可能です。

エネルギー消費データがTimescaleDBに書き込まれた後は、SQL文を用いて柔軟にデータ解析が行えます。例えば:

  • Grafanaなどの可視化ツールに接続し、グラフを生成してエネルギー消費データを表示する。
  • ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
  • 業務システムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を支援する。

特長と利点

EMQXにおけるTimescaleDBデータ統合は、以下の特長と利点をビジネスにもたらします。

  • 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。TimescaleDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに過度な負担をかけずに満たします。
  • メッセージ変換:メッセージはEMQXのルール内で豊富な処理や変換を経てからTimescaleDBに書き込まれます。
  • 効率的な保存とスケーラビリティ:EMQXとTimescaleDBは共にクラスターのスケーリング機能を備え、ビジネスの成長に応じて柔軟に水平スケールが可能です。
  • 高度なクエリ機能:TimescaleDBはタイムスタンプデータの効率的なクエリ・解析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから正確な洞察を引き出せます。

はじめる前に

本節では、EMQXプラットフォームでTimescaleDBデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Timescaleのインストールとデータテーブルの作成

EMQXプラットフォームは、自身で構築したTimescaleDBまたはクラウド上のTimescaleサービスとの統合をサポートします。Timescaleサービスをクラウドサービスとして利用するか、Dockerを使ってTimescaleDBインスタンスをデプロイできます。

クラウド上でTimescaleサービスインスタンスとテーブルを作成する

TimescaleDB Cloudを初めて利用する場合は、ヘルプドキュメントを参照してください。

  1. Timescaleアカウントをお持ちでない場合は、Timescaleアカウントの作成を参考にアカウントを作成します。

  2. Timescaleポータルにログインし、Timescaleサービスの作成を行います。サービスのパスワードを控えておきます。

  3. サービス概要ページから接続情報を取得します。EMQXで必要な項目はデータベース名ホストポートユーザー名です。

  4. psqlクライアントでサービスに接続します。

    bash
    # サービスURLで接続
    psql "postgres://tsdbadmin@xxxxx.xxxxx.tsdb.cloud.timescale.com:xxxxx/tsdb?sslmode=require"
  5. クライアントからのメッセージデータを保存するためのテーブルtemp_humを作成します。

    bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );
    
    SELECT create_hypertable('temp_hum', 'up_timestamp');

    テストデータを挿入し、確認します。

    bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
    
    SELECT * from temp_hum;

    テーブル作成後、サービスのExplorerタブでtemp_humテーブルの情報を確認できます。

  6. TimescaleインスタンスのMax_Connectionsの最適化

    Timescaleインスタンス詳細ページのOperations -> Database Parameters -> Common Parametersに移動し、最大接続数を100以上に増やします。

    EMQX Platform Integration TimescaleDB

DockerでTimescaleDBをインストールしテーブルを作成する

  1. Docker環境がない場合は、Dockerのインストールを参照してください。

  2. DockerでTimescaleDBコンテナを作成し、POSTGRES_PASSWORD環境変数でデータベースのパスワードを設定します。

    bash
    docker run -d --name timescaledb \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=public \
    timescale/timescaledb:latest-pg13
  3. クライアントデータを保存するデータベースを作成します。

    bash
    docker exec -it timescaledb psql -U postgres
    
    ## tsdbデータベースを作成
    CREATE database tsdb;
    
    \c tsdb;
  4. クライアントからのメッセージデータを保存するテーブルtemp_humを作成します。デバイスから報告される温度・湿度データを保存するためのテーブルです。

    bash
    CREATE TABLE temp_hum (
    up_timestamp   TIMESTAMPTZ       NOT NULL,
    client_id      TEXT              NOT NULL,
    temp           DOUBLE PRECISION  NULL,
    hum            DOUBLE PRECISION  NULL
    );
    
    SELECT create_hypertable('temp_hum', 'up_timestamp');

    テストデータを挿入し、確認します。

    bash
    INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) values (to_timestamp(1603963414), 'temp_hum-001', 19.1, 55);
    
    SELECT * from temp_hum;

TimescaleDBコネクターを作成する

データ統合ルールを作成する前に、TimescaleDBサーバーにアクセスするためのコネクターを作成します。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からTimescaleDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、データ転送カテゴリの中からTimescaleDBを選択します。
  3. 新規コネクターページで以下のオプションを設定します。
    • サーバーホスト:TimescaleDBサービスのホストとポートを入力します。Docker利用時はhttp://<サーバーアドレス>:5432を使用します。
    • データベース名:TimescaleDBのデータベース名を入力します。
    • ユーザー名とパスワード:TimescaleDBのユーザー名とパスワードを入力します。Dockerでデプロイした場合のデフォルトは以下です。
      • データベース名:tsdb
      • ユーザー名:postgres
      • パスワード:public
    • 詳細設定:接続プールサイズはTimescaleDB Cloudを利用する場合、1に設定します。
    • その他の設定はデフォルト値を使用するか、ビジネス要件に応じて調整してください。
  4. テストボタンをクリックし、TimescaleDBサービスにアクセス可能か確認します。成功すると成功メッセージが表示されます。
  5. 新規作成ボタンをクリックして作成を完了します。

ルールを作成する

次に、書き込むデータを指定するルールを作成し、処理済みデータをTimescaleDBに転送するアクションをルールに追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のSQL例は、temp_hum/emqxトピックに送信されたメッセージから報告時刻up_timestamp、クライアントID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。

    sql
     SELECT 
     timestamp div 1000 AS up_timestamp, 
     clientid AS client_id, 
     payload.temp AS temp, 
     payload.hum AS hum
    
     FROM
     "temp_hum/emqx"
  3. テスト有効化を使ってデータ入力をシミュレートし、結果をテストできます。

  4. 次へをクリックしてアクションを追加します。

  5. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  6. SQLテンプレートに以下のデータを入力し、SQLテンプレートに挿入します。

    sql
     INSERT INTO temp_hum(up_timestamp, client_id, temp, hum) VALUES (to_timestamp(${up_timestamp}), ${client_id}, ${temp}, ${hum})

    詳細設定(任意)については、詳細設定を参照してください。

  7. 確定ボタンをクリックしてルール作成を完了します。

  8. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールをテストする

温度・湿度データの報告をシミュレートするには、MQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": 27.5,
        "hum": 41.8
      }
  2. メッセージがTimescaleDBに転送されているか確認します。

    ターミナルで以下のコマンドを実行し、Timescaleデータベースに取り込まれたデータを確認します。

    bash
    SELECT * from temp_hum order by up_timestamp desc limit 10;

    TimescaleDBのtemp_humテーブルを確認し、新しいレコードが挿入されていることを検証してください。

  3. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。