Skip to content

OpenTSDBへのMQTTデータ取り込み

OpenTSDBはスケーラブルで分散型の時系列データベースです。EMQX CloudはOpenTSDBとの連携をサポートしており、MQTTメッセージをOpenTSDBに保存して後続の分析や取得に活用できます。

本ページでは、EMQX CloudとOpenTSDB間のデータ統合について包括的に紹介し、データ統合の作成および検証手順を実践的に解説します。

動作概要

OpenTSDBデータ統合は、EMQX Cloudに標準搭載された機能であり、EMQX Cloudのリアルタイムデータキャプチャおよび送信機能とOpenTSDBのデータ保存・分析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX CloudからOpenTSDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にしています。

以下の図は、EMQX CloudとOpenTSDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Cloud-Integration OpentsDB

EMQX Cloudはルールエンジンとアクションを通じてデバイスデータをOpenTSDBに挿入します。OpenTSDBは豊富なクエリ機能を提供し、レポートやチャート、その他のデータ分析結果の生成をサポートします。産業用エネルギー管理シナリオを例にとると、ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールと照合してメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
  3. OpenTSDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをOpenTSDBに書き込む操作が実行されます。

OpenTSDBにデータが書き込まれた後は、以下のように柔軟に活用できます。

  • Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データを表示するチャートを生成する。
  • 業務システムに接続し、エネルギー貯蔵装置の状態監視やアラート通知を行う。

特長と利点

OpenTSDBデータ統合は以下の特長と利点を提供します。

  • 効率的なデータ処理:EMQX Cloudは大量のIoTデバイス接続とメッセージスループットを処理可能であり、OpenTSDBはデータ書き込み、保存、クエリに優れた性能を発揮します。これによりIoTシナリオのデータ処理要件をシステムに過度な負荷をかけずに満たせます。
  • メッセージ変換:EMQX Cloudのルールを通じてメッセージは多様な処理や変換を経てからOpenTSDBに書き込まれます。
  • 大規模データ保存:EMQX CloudとOpenTSDBの連携により、大量のデバイスデータを直接OpenTSDBに保存可能です。OpenTSDBは大規模時系列データの保存とクエリに特化したデータベースであり、IoTデバイスから生成される膨大な時系列データを効率的に扱えます。
  • 豊富なクエリ機能:OpenTSDBの最適化されたストレージ構造とインデックスにより、数十億のデータポイントの高速書き込みとクエリが可能です。これはIoTデバイスデータのリアルタイム監視、分析、可視化に非常に有益です。
  • スケーラビリティ:EMQX CloudとOpenTSDBの両方がクラスター拡張に対応しており、ビジネスの成長に応じて柔軟にクラスターの水平拡張が可能です。

はじめる前に

本節では、OpenTSDBデータ統合の作成を開始する前に必要な準備について説明します。OpenTSDBサーバーのセットアップ方法も含まれます。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

OpenTSDBのインストール

Dockerを使ってOpenTSDBをインストールし、Dockerイメージを起動します(現時点ではx86プラットフォームのみ対応)。

bash
docker pull petergrace/opentsdb-docker

docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker

コネクターの作成

データ統合ルールを作成する前に、OpenTSDBサーバーへアクセスするためのOpenTSDBコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるOpenTSDBを選択します。すでにコネクターを作成している場合は、新規コネクターを選択し、続いてデータ永続化カテゴリのOpenTSDBを選択します。

  2. コネクター名:システムが自動的にコネクター名を生成します。

  3. 接続情報を入力します:

    • サーバーホストhttp://127.0.0.1:4242を入力するか、OpenTSDBサーバーがリモートで稼働している場合は実際のURLを入力します。
    • その他のオプションはデフォルトのままにします。
    • ビジネス要件に応じて詳細設定を行います(任意)。
  4. テストボタンをクリックします。OpenTSDBサービスにアクセス可能であれば、コネクター利用可能のメッセージが返されます。

  5. 新規作成ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

ルールの作成

本節では、EMQX Cloudコンソールを使ってOpenTSDBルールを作成し、ルールにアクションを追加する方法を示します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターで以下のステートメントを設定します。これはトピックt/#配下のMQTTメッセージをOpenTSDBに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、ルールで必要なすべてのフィールドがSELECT部分に含まれていることを確認してください。

    sql
    	SELECT
      		payload.metric as metric, payload.tags as tags, payload.value as value
    	FROM
      		"t/#"

    TIP

    初心者の方はSQL例をクリックし、試してみるでSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. データ書き込み欄で、MQTTメッセージをOpenTSDBが要求する形式に正しく変換するための設定を行います。例えば、クライアントが以下のデータを報告するとします。

    • トピック: t/opents
    • ペイロード:
    json
    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value": 12
    }

    提供されたペイロードデータ形式に基づき、以下の形式情報を設定します。

    • タイムスタンプ:OpenTSDBはデータポイントの時刻を記録するためにタイムスタンプを必要とします。MQTTメッセージにタイムスタンプが含まれない場合は、EMQX Cloudのアクション設定時に現在時刻をタイムスタンプとして使用するか、クライアントの報告データ形式を修正してタイムスタンプフィールドを含める必要があります。
    • メトリック:この例では"metric": "cpu"がメトリック名cpuを示しています。
    • タグ:タグはメトリックに関する追加情報を表します。ここでは"tags": {"host": "serverA"}で、このメトリックデータがホストserverAからのものであることを示します。
    • :実際のメトリック値です。この例では"value": 12で、メトリック値が12であることを示します。
  6. 詳細設定(任意):必要に応じて**同期(sync)または非同期(async)**クエリモードを選択します。

  7. 確定ボタンをクリックしてルール作成を完了します。

  8. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

MQTTXを使ってトピックt/opentsにメッセージをパブリッシュします。

bash
mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'

ルールの稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

OpenTSDBにデータが書き込まれているかを確認します。

bash
curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
    "start": "1h-ago",
    "queries": [
        {
            "aggregator": "last",
            "metric": "cpu",
            "tags": {
                "host": "*"
            }
        }
    ],
    "showTSUIDs": "true",
    "showQuery": "true",
    "delete": "false"
}'

クエリ結果の整形済み出力は以下の通りです。

json
[
  {
    "metric": "cpu",
    "tags": {
      "host": "serverA"
    },
    "aggregateTags": [],
    "query": {
      "aggregator": "last",
      "metric": "cpu",
      "tsuids": null,
      "downsample": null,
      "rate": false,
      "filters": [
        {
          "tagk": "host",
          "filter": "*",
          "group_by": true,
          "type": "wildcard"
        }
      ],
      "percentiles": null,
      "index": 0,
      "rateOptions": null,
      "filterTagKs": [
        "AAAB"
      ],
      "explicitTags": false,
      "useFuzzyFilter": true,
      "preAggregate": false,
      "rollupUsage": null,
      "rollupTable": "raw",
      "showHistogramBuckets": false,
      "useMultiGets": true,
      "tags": {
        "host": "wildcard(*)"
      },
      "histogramQuery": false
    },
    "tsuids": [
      "000001000001000001"
    ],
    "dps": {
      "1711678449": 12
    }
  }
]