# MQTTデータをInfluxDBに取り込む

[InfluxDB](https://www.influxdata.com/)は時系列データの保存と解析に特化したデータベースです。高いデータスループット性能と安定した動作により、IoT分野での活用に非常に適しています。EMQX Platformは、InfluxDB Cloud、InfluxDB OSS、InfluxDB Enterpriseの主要なバージョンへの接続をサポートしています。

本ページでは、EMQX PlatformとInfluxDB間のデータ統合について、作成方法や動作確認を含めて詳しく解説します。

## 動作の仕組み

InfluxDBデータ統合は、EMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータ取得・転送機能とInfluxDBのデータ保存・解析機能を組み合わせています。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQX PlatformからInfluxDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQX PlatformはルールエンジンとSinkを介してデバイスデータをInfluxDBに転送し保存・解析を行います。InfluxDBは解析結果としてレポートやグラフを生成し、InfluxDBの可視化ツールを通じてユーザーに提供します。

以下の図は、エネルギー貯蔵シナリオにおけるEMQXとInfluxDBの典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform InfluxDB Data Integration](./_assets/data_integration_influxdb.jpg)

EMQX PlatformとInfluxDBは、エネルギー消費データをリアルタイムに効率的に収集・解析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQX PlatformがIoTプラットフォームとしてデバイス接続、メッセージ送受信、データルーティングを担当し、InfluxDBがデータ保存と解析を担います。ワークフローは以下の通りです。

1. **メッセージのパブリッシュと受信**：エネルギー貯蔵機器や産業用IoT機器はMQTTプロトコルでEMQX Platformに接続し、電力消費量や入出力電力などのデータを定期的にパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：組み込みのルールエンジンを使い、特定のトピックに基づくメッセージを処理します。メッセージはルールエンジンを通過し、対応するルールにマッチすると、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
3. **InfluxDBへのデータ取り込み**：ルールエンジンで定義したルールがトリガーとなり、メッセージをInfluxDBに書き込む操作が実行されます。InfluxDB SinkはLine Protocolテンプレートを提供し、メッセージの特定フィールドをInfluxDBのメジャメントやフィールドに柔軟にマッピング可能です。

エネルギー消費データがInfluxDBに書き込まれた後は、Line Protocolを活用して以下のような解析が可能です。

- Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データのグラフを生成。
- 業務システムに接続し、エネルギー貯蔵機器の状態監視やアラート発報を実施。

## 特長とメリット

InfluxDBデータ統合は以下の特長と利点を提供します。

- **効率的なデータ処理**：EMQX Platformは大量のIoTデバイス接続とメッセージスループットを処理可能であり、InfluxDBは高速なデータ書き込み、保存、クエリ性能を持つため、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
- **メッセージ変換**：EMQX Platformのルールを用いて、InfluxDBに書き込む前にメッセージを多様に処理・変換可能です。
- **スケーラビリティ**：EMQX PlatformとInfluxDBの両方がクラスター拡張に対応し、ビジネスの成長に応じて柔軟に水平拡張できます。
- **豊富なクエリ機能**：InfluxDBは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと解析を実現し、IoTデータから価値あるインサイトを抽出します。
- **効率的なストレージ**：InfluxDBは高圧縮率のエンコード方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズ可能で、不要なデータによるストレージ占有を防止します。

## はじめる前に

本節では、EMQX PlatformでInfluxDBデータ統合を作成するための準備作業を紹介します。

### 前提条件

- [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2.5/reference/syntax/line-protocol/)の知識。
- [ルール](./rules.md)の理解。
- [データ統合](./introduction.md)の理解。

### ネットワーク設定

<!--@include: ./network-setting.md-->

### InfluxDBのインストールと設定

#### DockerによるInfluxDBのインストール

1. Dockerを使って[InfluxDB](https://docs.influxdata.com/influxdb/v2.5/install/)をインストールし、コンテナを起動します。

```bash
# InfluxDBのDockerイメージを起動
docker run --name influxdb -p 8086:8086 influxdb:2.5.1
```

2. InfluxDBが起動したら、サーバーの8086ポートにブラウザでアクセスし、**Username**、**Password**、**Organization Name**、**Bucket Name**を設定します。

3. InfluxDB UIで**Load Data** -> **API Token**をクリックし、[all-accessトークンの作成](https://docs.influxdata.com/influxdb/v2.5/install/#create-all-access-tokens)手順に従います。

#### InfluxDB Cloudでのサービス作成

1. [InfluxDB Cloud](https://cloud2.influxdata.com)にログインします。

2. InfluxDB UIで**Username**、**Password**、**Organization Name**、**Bucket Name**を設定します。

3. InfluxDB UIで**Load Data** -> **API Token**をクリックし、[all-accessトークンの作成](https://docs.influxdata.com/influxdb/v2.5/install/#create-all-access-tokens)手順に従います。

## InfluxDBコネクターの作成

データ統合ルールを作成する前に、まずInfluxDBサーバーにアクセスするためのInfluxDBコネクターを作成します。

1. デプロイメントに移動し、左メニューから**データ統合**をクリックします。

2. 初めてコネクターを作成する場合は、**Data Forward**カテゴリの下にある**InfluxDB**を選択します。すでにコネクターを作成済みの場合は、**New Connector**を選択し、続けて**Data Forward**カテゴリの**InfluxDB**を選択します。

3. **Connector Name**：システムが自動的にコネクター名を生成します。

4. 接続情報を入力します：

   - 必要に応じて**Version of InfluxDB**を選択（デフォルトは`V2`）。
   - **Server Host**：サーバーのIPアドレスとポート。InfluxDB Cloudを利用する場合はポート443を指定し、`{url}:443`の形式で入力し、**Enable TLS**を有効化してTLS接続を行います。
   - [InfluxDBのインストールと設定](#install-and-set-up-influxdb)に従い、**Token**、**Organization**、**Bucket**を設定します。InfluxDB v1を選択した場合は、**Database**、**Username**、**Password**の設定も必要です。

5. **Test**ボタンをクリックし、InfluxDBサービスにアクセスできるか確認します。成功すると成功メッセージが表示されます。

6. **New**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータをInfluxDBに転送するアクションを追加するルールを作成します。

1. ルールエリアで**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。

2. 使用する機能に基づいて**SQL Editor**でルールを設定します。ここでは、クライアントが`temp_hum/emqx`トピックに温度・湿度メッセージを送信した際にエンジンをトリガーするSQLの例を示します。

   ```sql
     SELECT
       timestamp,
       payload.location as location,
       payload.temp as temp,
       payload.hum as hum
     FROM "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は**SQL Examples**と**Enable Test**をクリックして、SQLルールの学習とテストが可能です。

   :::

3. **Next**をクリックしてアクションを追加します。

4. **Connector**ドロップダウンから先ほど作成したコネクターを選択します。

5. **Time Precision**を指定します。デフォルトは`millisecond`です。

6. **Data Format**を`JSON`または`Line Protocol`から選択し、InfluxDBへのデータ解析・書き込み方法を指定します。

   - JSON形式の場合、**Measurement**、**Timestamp**、**Fields**、**Tags**の解析方法を定義します。すべてのキー値は変数やプレースホルダーを使用可能で、[InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2.5/reference/syntax/line-protocol/)に準拠して設定できます。**Fields**はCSVファイルをインポートして一括設定も可能です。詳細は[バッチ設定](#batch-setting)を参照してください。

   - Line Protocol形式の場合、テキストベースのフォーマットで、メジャメント、タグセット、フィールドセット、タイムスタンプを指定し、プレースホルダーもサポートします。詳細は[InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/)および[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/)を参照してください。

     例：

     ```bash
      temp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}
     ```

     ::: tip

     - InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後に`i`を付けます。例：`${payload.int}i`。詳細は[InfluxDB 1.8整数値書き込み](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#write-the-field-value-1-as-an-integer-to-influxdb)を参照してください。
     - 符号なし整数型の値を書き込む場合は、`u`を付けます。例：`${payload.int}u`。詳細は同上を参照してください。

     :::

7. 高度な設定（任意）：[高度な設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-influxdb.html#advanced-configurations)を参照してください。

8. **Confirm**ボタンをクリックしてルール作成を完了します。

9. **Successful new rule**ポップアップで**Back to Rules**をクリックし、データ統合設定の一連の作業が完了します。

### バッチ設定

InfluxDBでは1つのデータエントリに数百のフィールドが含まれることが多く、データ形式の設定が複雑になる場合があります。これを解決するため、EMQX Platformはフィールドのバッチ設定機能を提供しています。

JSON形式でデータ形式を設定する際、CSVファイルからフィールドのキー・バリューを一括インポートできます。

1. **Fields**テーブルの**Import**ボタンをクリックし、**Import Batch Setting**ポップアップを開きます。

2. 指示に従い、まずバッチ設定テンプレートファイルをダウンロードし、テンプレートにフィールドのキー・バリューを記入します。テンプレートのデフォルト内容は以下の通りです。

   | Field  | Value              | 備考（任意）                                               |
   | :----- | :----------------- | :--------------------------------------------------------- |
   | temp   | ${payload.temp}    |                                                            |
   | hum    | ${payload.hum}     |                                                            |
   | precip | ${payload.precip}i | フィールド値に`i`を付けてInfluxDBに整数として保存する指示。 |

   - **Field**：フィールドキー。定数または`${var}`形式のプレースホルダーをサポート。
   - **Value**：フィールド値。定数またはプレースホルダーをサポートし、line protocolに従い型識別子を付加可能。
   - **備考**：CSVファイル内の注釈用で、EMQXへのインポート対象外。

   CSVファイルのバッチ設定データは2048行以内にしてください。

3. 記入済みテンプレートファイルを保存し、**Import Batch Setting**ポップアップにアップロード後、**Import**をクリックしてバッチ設定を完了します。

4. インポート後、**Fields**設定テーブルでキー・バリューの調整も可能です。

## ルールのテスト

温度・湿度データの送信シミュレーションには[MQTTX](https://mqttx.app/)の利用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": 27.5,
       "hum": 41.8,
       "location": "Prague"
     }
     ```

2. InfluxDB UIの**Data Explorer**ウィンドウで、メッセージがInfluxDBに書き込まれているか確認します。

3. InfluxDB V1を使用している場合は、InfluxDBコンテナに入り、データを確認します。

   ```bash
     $ docker exec -it influxdb influx
     $ use db
   ```

   ```bash
     > select * from "temp_hum"
     name: temp_hum
     time                hum  location temp
     ----                ---  -------- ----
     1711093437420000000 41.8 Prague   27.5
     >
   ```

4. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報および該当ルール下のすべてのアクション統計が表示されます。
