# OpenTSDBへのMQTTデータ取り込み

[OpenTSDB](http://opentsdb.net/)はスケーラブルで分散型の時系列データベースです。EMQX PlatformはOpenTSDBとの連携をサポートしており、MQTTメッセージをOpenTSDBに保存して後続の分析や取得に利用できます。

本ページでは、EMQX PlatformとOpenTSDB間のデータ統合について包括的に解説し、データ統合の作成および検証方法を実践的に紹介します。

## 動作概要

OpenTSDBデータ統合はEMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータキャプチャと転送機能とOpenTSDBのデータ保存および分析機能を組み合わせています。内蔵の[ルールエンジン](./rules.md)コンポーネントにより、EMQX PlatformからOpenTSDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQX PlatformとOpenTSDB間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform-Integration OpentsDB](./_assets/data_integration_opentsdb.jpg)

EMQX Platformはルールエンジンとアクションを通じてデバイスデータをOpenTSDBに挿入します。OpenTSDBは豊富なクエリ機能を提供し、レポートやチャート、その他のデータ分析結果の生成をサポートします。産業用エネルギー管理シナリオを例にすると、ワークフローは以下の通りです。

1. **メッセージのパブリッシュと受信**：産業用デバイスはMQTTプロトコルを介してEMQX Platformに正常に接続し、エネルギー消費データを定期的にパブリッシュします。このデータには生産ラインの識別子やエネルギー消費値が含まれます。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：内蔵のルールエンジンはトピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールと照合されてメッセージデータが処理されます。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
3. **OpenTSDBへのデータ取り込み**：ルールエンジンで定義されたルールがトリガーとなり、メッセージをOpenTSDBに書き込む操作が実行されます。

OpenTSDBにデータが書き込まれた後は、以下のように柔軟にデータを活用できます。

- Grafanaなどの可視化ツールと接続し、エネルギー蓄積データを表示するチャートを生成する。
- 業務システムと連携してエネルギー蓄積装置の状態監視やアラート発報を行う。

## 特長と利点

OpenTSDBデータ統合は以下の特長と利点を提供します。

- **効率的なデータ処理**：EMQX Platformは膨大な数のIoTデバイス接続とメッセージスループットを処理可能であり、OpenTSDBはデータ書き込み、保存、クエリに優れた性能を発揮します。これによりIoTシナリオのデータ処理要件をシステムに過度な負荷をかけずに満たせます。
- **メッセージ変換**：EMQX Platformのルールを通じてメッセージは多様な処理や変換を経てからOpenTSDBに書き込まれます。
- **大規模データ保存**：EMQX PlatformとOpenTSDBを統合することで、大量のデバイスデータを直接OpenTSDBに保存可能です。OpenTSDBは大規模時系列データの保存とクエリに特化したデータベースであり、IoTデバイスが生成する膨大な時系列データを効率的に扱えます。
- **豊富なクエリ機能**：OpenTSDBの最適化されたストレージ構造とインデックスにより、数十億のデータポイントの高速な書き込みとクエリが可能です。これはIoTデバイスデータのリアルタイム監視、分析、可視化を必要とするアプリケーションに非常に有益です。
- **スケーラビリティ**：EMQX PlatformとOpenTSDBは共にクラスター拡張に対応しており、ビジネスの成長に応じて柔軟に水平スケールが可能です。

## はじめる前に

本節ではOpenTSDBデータ統合の作成を開始する前に必要な準備、特にOpenTSDBサーバーのセットアップ方法について説明します。

### 前提条件

- [データ統合](./introduction.md)に関する知識
- データ統合の[ルール](./rules.md)に関する知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### OpenTSDBのインストール

Dockerを使ってOpenTSDBをインストールし、Dockerイメージを起動します（現時点ではx86プラットフォームのみ対応）。

```bash
docker pull petergrace/opentsdb-docker

docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker
```

## コネクターの作成

データ統合ルールを作成する前に、OpenTSDBサーバーにアクセスするためのOpenTSDBコネクターを作成する必要があります。

1. デプロイメントに移動し、左側ナビゲーションメニューから**データ統合**をクリックします。初めてコネクターを作成する場合は、**データ永続化**カテゴリの下にある**OpenTSDB**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、続いて**データ永続化**カテゴリの**OpenTSDB**を選択します。

2. **コネクター名**：システムが自動的にコネクター名を生成します。

3. 接続情報を入力します：

    - **サーバーホスト**に`http://127.0.0.1:4242`を入力、またはOpenTSDBサーバーがリモートで稼働している場合は実際のURLを入力します。
    - その他のオプションはデフォルトのままにします。
    - ビジネス要件に応じて詳細設定を行います（任意）。

4. **テスト**ボタンをクリックします。OpenTSDBサービスにアクセス可能であれば、**コネクター利用可能**のメッセージが返されます。

5. **新規作成**ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

## ルールの作成

本節では、EMQX Platformコンソールを使ってOpenTSDBルールを作成し、ルールにアクションを追加する手順を示します。

1. ルールエリアの**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**で以下の文を使ってルールを設定します。これはトピック`t/#`配下のMQTTメッセージをOpenTSDBに保存することを意味します。

   注意：独自のSQL構文を指定する場合は、ルールで必要なすべてのフィールドが`SELECT`部分に含まれていることを確認してください。

   ```sql
   	SELECT
     		payload.metric as metric, payload.tags as tags, payload.value as value
   	FROM
     		"t/#"
   ```

   ::: tip

   初心者の方は**SQL例**をクリックし、**テスト有効化**を使ってSQLルールの学習とテストを行うことを推奨します。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. **データ書き込み**欄で、MQTTメッセージをOpenTSDBが要求する形式に正しく変換するための書き込み方法を指定します。例えば、クライアントが以下のデータを報告したとします。

   - トピック：`t/opents`
   - ペイロード：

   ```json
   {
     "metric": "cpu",
     "tags": {
       "host": "serverA"
     },
     "value": 12
   }
   ```

   上記ペイロードのデータ形式に基づき、以下のフォーマット情報を設定します。

   - **タイムスタンプ**：OpenTSDBはデータポイントの時刻を記録するためにタイムスタンプを必要とします。MQTTメッセージにタイムスタンプが含まれない場合は、EMQX Platformのアクション設定時に現在時刻をタイムスタンプとして使用するか、クライアントの報告データ形式にタイムスタンプフィールドを追加する必要があります。
   - **メトリック**：例では`"metric": "cpu"`であり、メトリック名は`cpu`です。
   - **タグ**：タグはメトリックに関する追加情報を表します。ここでは`"tags": {"host": "serverA"}`で、このメトリックデータがホスト`serverA`からのものであることを示します。
   - **値**：実際のメトリック値です。例では`"value": 12`で、メトリック値は12です。

6. 詳細設定（任意）：必要に応じて**同期**または**非同期**クエリモードを選択します。

7. **確認**ボタンをクリックしてルール作成を完了します。

8. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

MQTTXを使ってトピック`t/opents`にメッセージをパブリッシュします。

```bash
mqttx pub -i emqx_c -t t/opents -m '{"metric":"cpu","tags":{"host":"serverA"},"value":12}'
```

ルールの実行状況を確認すると、1件の新規受信メッセージと1件の新規送信メッセージがあるはずです。

OpenTSDBにデータが書き込まれているか確認します。

```bash
curl -X POST -H "Accept: Application/json" -H "Content-Type: application/json" http://localhost:4242/api/query -d '{
    "start": "1h-ago",
    "queries": [
        {
            "aggregator": "last",
            "metric": "cpu",
            "tags": {
                "host": "*"
            }
        }
    ],
    "showTSUIDs": "true",
    "showQuery": "true",
    "delete": "false"
}'
```

クエリ結果の整形済み出力は以下の通りです。

```json
[
  {
    "metric": "cpu",
    "tags": {
      "host": "serverA"
    },
    "aggregateTags": [],
    "query": {
      "aggregator": "last",
      "metric": "cpu",
      "tsuids": null,
      "downsample": null,
      "rate": false,
      "filters": [
        {
          "tagk": "host",
          "filter": "*",
          "group_by": true,
          "type": "wildcard"
        }
      ],
      "percentiles": null,
      "index": 0,
      "rateOptions": null,
      "filterTagKs": [
        "AAAB"
      ],
      "explicitTags": false,
      "useFuzzyFilter": true,
      "preAggregate": false,
      "rollupUsage": null,
      "rollupTable": "raw",
      "showHistogramBuckets": false,
      "useMultiGets": true,
      "tags": {
        "host": "wildcard(*)"
      },
      "histogramQuery": false
    },
    "tsuids": [
      "000001000001000001"
    ],
    "dps": {
      "1711678449": 12
    }
  }
]%
```
