Skip to content

OpenTSDBへのMQTTデータ取り込み

OpenTSDBはスケーラブルで分散型の時系列データベースです。EMQX PlatformはOpenTSDBとの連携をサポートしており、MQTTメッセージをOpenTSDBに保存して後続の分析や取得に利用できます。

本ページでは、EMQX PlatformとOpenTSDB間のデータ統合について包括的に解説し、データ統合の作成および検証方法を実践的に紹介します。

動作概要

OpenTSDBデータ統合はEMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータキャプチャと転送機能とOpenTSDBのデータ保存および分析機能を組み合わせています。内蔵のルールエンジンコンポーネントにより、EMQX PlatformからOpenTSDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQX PlatformとOpenTSDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Platform-Integration OpentsDB

EMQX Platformはルールエンジンとアクションを通じてデバイスデータをOpenTSDBに挿入します。OpenTSDBは豊富なクエリ機能を提供し、レポートやチャート、その他のデータ分析結果の生成をサポートします。産業用エネルギー管理シナリオを例にすると、ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを介してEMQX Platformに正常に接続し、エネルギー消費データを定期的にパブリッシュします。このデータには生産ラインの識別子やエネルギー消費値が含まれます。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:内蔵のルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールと照合されてメッセージデータが処理されます。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
  3. OpenTSDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをOpenTSDBに書き込む操作が実行されます。

OpenTSDBにデータが書き込まれた後は、以下のように柔軟にデータを活用できます。

  • Grafanaなどの可視化ツールと接続し、エネルギー蓄積データを表示するチャートを生成する。
  • 業務システムと連携してエネルギー蓄積装置の状態監視やアラート発報を行う。

特長と利点

OpenTSDBデータ統合は以下の特長と利点を提供します。

  • 効率的なデータ処理:EMQX Platformは膨大な数のIoTデバイス接続とメッセージスループットを処理可能であり、OpenTSDBはデータ書き込み、保存、クエリに優れた性能を発揮します。これによりIoTシナリオのデータ処理要件をシステムに過度な負荷をかけずに満たせます。
  • メッセージ変換:EMQX Platformのルールを通じてメッセージは多様な処理や変換を経てからOpenTSDBに書き込まれます。
  • 大規模データ保存:EMQX PlatformとOpenTSDBを統合することで、大量のデバイスデータを直接OpenTSDBに保存可能です。OpenTSDBは大規模時系列データの保存とクエリに特化したデータベースであり、IoTデバイスが生成する膨大な時系列データを効率的に扱えます。
  • 豊富なクエリ機能:OpenTSDBの最適化されたストレージ構造とインデックスにより、数十億のデータポイントの高速な書き込みとクエリが可能です。これはIoTデバイスデータのリアルタイム監視、分析、可視化を必要とするアプリケーションに非常に有益です。
  • スケーラビリティ:EMQX PlatformとOpenTSDBは共にクラスター拡張に対応しており、ビジネスの成長に応じて柔軟に水平スケールが可能です。

はじめる前に

本節ではOpenTSDBデータ統合の作成を開始する前に必要な準備、特にOpenTSDBサーバーのセットアップ方法について説明します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

OpenTSDBのインストール

Dockerを使ってOpenTSDBをインストールし、Dockerイメージを起動します(現時点ではx86プラットフォームのみ対応)。

bash
docker pull petergrace/opentsdb-docker

docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker

コネクターの作成

データ統合ルールを作成する前に、OpenTSDBサーバーにアクセスするためのOpenTSDBコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるOpenTSDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのOpenTSDBを選択します。

  2. コネクター名:システムが自動的にコネクター名を生成します。

  3. 接続情報を入力します:

    • サーバーホストhttp://127.0.0.1:4242を入力、またはOpenTSDBサーバーがリモートで稼働している場合は実際のURLを入力します。
    • その他のオプションはデフォルトのままにします。
    • ビジネス要件に応じて詳細設定を行います(任意)。
  4. テストボタンをクリックします。OpenTSDBサービスにアクセス可能であれば、コネクター利用可能のメッセージが返されます。

  5. 新規作成ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

ルールの作成

本節では、EMQX Platformコンソールを使ってOpenTSDBルールを作成し、ルールにアクションを追加する手順を示します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターで以下の文を使ってルールを設定します。これはトピックt/#配下のMQTTメッセージをOpenTSDBに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、ルールで必要なすべてのフィールドがSELECT部分に含まれていることを確認してください。

    sql
    	SELECT
      		payload.metric as metric, payload.tags as tags, payload.value as value
    	FROM
      		"t/#"

    TIP

    初心者の方はSQL例をクリックし、テスト有効化を使ってSQLルールの学習とテストを行うことを推奨します。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. データ書き込み欄で、MQTTメッセージをOpenTSDBが要求する形式に正しく変換するための書き込み方法を指定します。例えば、クライアントが以下のデータを報告したとします。

    • トピック:t/opents
    • ペイロード:
    json
    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value": 12
    }

    上記ペイロードのデータ形式に基づき、以下のフォーマット情報を設定します。

    • タイムスタンプ:OpenTSDBはデータポイントの時刻を記録するためにタイムスタンプを必要とします。MQTTメッセージにタイムスタンプが含まれない場合は、EMQX Platformのアクション設定時に現在時刻をタイムスタンプとして使用するか、クライアントの報告データ形式にタイムスタンプフィールドを追加する必要があります。
    • メトリック:例では"metric": "cpu"であり、メトリック名はcpuです。
    • タグ:タグはメトリックに関する追加情報を表します。ここでは"tags": {"host": "serverA"}で、このメトリックデータがホストserverAからのものであることを示します。
    • :実際のメトリック値です。例では"value": 12で、メトリック値は12です。
  6. 詳細設定(任意):必要に応じて同期または非同期クエリモードを選択します。

  7. 確認ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

MQTTXを使ってトピックt/opentsにメッセージをパブリッシュします。

bash
mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'

ルールの実行状況を確認すると、1件の新規受信メッセージと1件の新規送信メッセージがあるはずです。

OpenTSDBにデータが書き込まれているか確認します。

bash
curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
    "start": "1h-ago",
    "queries": [
        {
            "aggregator": "last",
            "metric": "cpu",
            "tags": {
                "host": "*"
            }
        }
    ],
    "showTSUIDs": "true",
    "showQuery": "true",
    "delete": "false"
}'

クエリ結果の整形済み出力は以下の通りです。

json
[
  {
    "metric": "cpu",
    "tags": {
      "host": "serverA"
    },
    "aggregateTags": [],
    "query": {
      "aggregator": "last",
      "metric": "cpu",
      "tsuids": null,
      "downsample": null,
      "rate": false,
      "filters": [
        {
          "tagk": "host",
          "filter": "*",
          "group_by": true,
          "type": "wildcard"
        }
      ],
      "percentiles": null,
      "index": 0,
      "rateOptions": null,
      "filterTagKs": [
        "AAAB"
      ],
      "explicitTags": false,
      "useFuzzyFilter": true,
      "preAggregate": false,
      "rollupUsage": null,
      "rollupTable": "raw",
      "showHistogramBuckets": false,
      "useMultiGets": true,
      "tags": {
        "host": "wildcard(*)"
      },
      "histogramQuery": false
    },
    "tsuids": [
      "000001000001000001"
    ],
    "dps": {
      "1711678449": 12
    }
  }
]%