# GreptimeDBへのMQTTデータ取り込み

[GreptimeDB](https://github.com/GreptimeTeam/greptimedb)は、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの利点を享受できます。EMQXプラットフォームは、主流のGreptimeDB、GreptimeCloud、GreptimeDB Enterpriseとの接続をサポートしています。

本ページでは、EMQXプラットフォームとGreptimeDB間のデータ統合について、実践的な手順を交えて包括的に紹介します。

## 動作概要

GreptimeDBデータ統合はEMQXプラットフォームに組み込まれた機能であり、EMQXプラットフォームのリアルタイムデータ取得・送信能力とGreptimeDBのデータ保存・分析能力を組み合わせています。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQXプラットフォームからGreptimeDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。ワークフローは以下の通りです。

以下の図は、EMQXプラットフォームとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform-Integration GreptimeDB](./_assets/data_integration_greptimedb.jpg)

1. **メッセージのパブリッシュと受信**：産業機器はMQTTプロトコルを用いてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。  
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータフォーマットの変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
3. **GreptimeDBへのデータ取り込み**：ルールエンジンで定義されたルールがトリガーとなり、メッセージをGreptimeDBに書き込む操作が実行されます。GreptimeDBアクションはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むデータフォーマット定義が可能です。

エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば：

- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示する。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を促進する。

## 特長と利点

GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。

- **使いやすさ**：EMQXプラットフォームとGreptimeDBはどちらも開発者に優しい設計です。EMQXプラットフォームは標準のMQTTプロトコルに加え、多様な認証、認可、クラスタリング機能を標準で提供します。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を備えています。両者の統合により、ビジネス統合と開発のスピードアップが期待できます。
- **効率的なデータ処理**：EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに負担をかけずに満たします。
- **メッセージ変換**：メッセージはEMQXプラットフォームのルール内で豊富な処理・変換を経てからGreptimeDBに書き込まれます。
- **効率的なストレージとスケーラビリティ**：EMQXプラットフォームとGreptimeDBはどちらもクラスターのスケールアウト機能を備え、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
- **高度なクエリ機能**：GreptimeDBはタイムスタンプデータの効率的なクエリ・分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を抽出できます。

## はじめる前に

このセクションでは、GreptimeDBデータ統合の作成を始める前に必要な準備、特にGreptimeDBサーバーのインストール方法について説明します。

### 前提条件

- [データ統合](./introduction.md)に関する知識
- EMQXプラットフォームのデータ統合[ルール](./rules.md)に関する知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### GreptimeDBサーバーのインストール

1. Dockerを使って[GreptimeDB](https://greptime.com/download)をインストールし、Dockerイメージを起動します。

   ```bash
   # GreptimeDBのDockerイメージを起動する
   docker run -p 4000-4004:4000-4004 \
   -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \
   --name greptime --rm \
   greptime/greptimedb standalone start \
   --http-addr 0.0.0.0:4000 \
   --rpc-addr 0.0.0.0:4001 \
   --mysql-addr 0.0.0.0:4002 \
   --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
   ```

2. `user-provider`パラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細は[ドキュメント](https://docs.greptime.com/user-guide/clients/authentication#authentication)を参照してください。
3. GreptimeDBが起動したら、[http://localhost:4000/dashboard](http://localhost:4000/dashboard)にアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれ`greptime_user`と`greptime_pwd`です。

## コネクターの作成

データ統合ルールを作成する前に、GreptimeDBサーバーにアクセスするためのGreptimeDBコネクターを作成する必要があります。

1. デプロイメントに移動し、左側ナビゲーションメニューから**データ統合**をクリックします。初めてコネクターを作成する場合は、**データ永続化**カテゴリの下にある**GreptimeDB**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、続けて**データ永続化**カテゴリの下の**GreptimeDB**を選択します。

2. **コネクター名**はシステムが自動生成します。

3. 接続情報を入力します：

   - **サーバーホスト**：`{host}:4001`を入力します。GreptimeCloudに接続する場合はポートを443にして`{url}:443`を入力してください。
   - **データベース**：`public`を入力します。GreptimeCloudに接続する場合はサービス名を入力します。
   - **ユーザー名**と**パスワード**：`greptime_user`と`greptime_pwd`を入力します（[GreptimeDBサーバーのインストール](#greptimedbサーバーのインストール)で設定したもの）。GreptimeCloudの場合はサービスのユーザー名とパスワードを入力してください。
   - **TLSを有効化**：暗号化接続を確立したい場合はトグルスイッチをオンにします。
   - ビジネス要件に応じて詳細設定を行います（任意）。

4. **テスト**ボタンをクリックします。GreptimeDBサービスにアクセス可能であれば、**connector available**のメッセージが返されます。

5. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

このセクションでは、EMQXプラットフォームコンソールを使ってGreptimeDBルールを作成し、ルールにアクションを追加する方法を示します。

1. ルールエリアの**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. 利用したい機能に基づいて**SQLエディター**でルールを設定します。ここではクライアントが`temp_hum/emqx`トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目標とします。SQLは以下のように記述します。

   ```sql
    SELECT
     timestamp, clientid, payload
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は**SQL例**をクリックし、**テストを有効化**してSQLルールの学習とテストを行うことを推奨します。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. **書き込み構文**を設定します。これはテキストベースのフォーマットで、データポイントの計測名、タグ、フィールド、タイムスタンプを指定し、[InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/)の構文に準拠したプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータフォーマットをサポートしています。  
   本チュートリアルの例は以下の構文です。

   ```sql
    myMeasurement,tag1=${clientid} fieldKey=${payload}
   ```

   ::: tip

   - GreptimeDBに符号付き整数型の値を書き込む場合、プレースホルダーの後に`i`を型識別子として付加します。例：`${payload.int}i`
   - 符号なし整数型の場合は`u`を付加します。例：`${payload.int}u`

   :::

6. **時間精度**を指定します。デフォルトは`millisecond`です。

7. **詳細設定**を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します（任意）。

8. **確定**ボタンをクリックしてルール作成を完了します。

9. **新規ルール作成成功**ポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

温度・湿度データの送信シミュレーションには[MQTTX](https://mqttx.app/)の利用を推奨しますが、他のクライアントでも構いません。

1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - クライアントID：`test_client`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. ルールの稼働状況を確認し、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。

3. GreptimeDBダッシュボードで`SQL`を使い、メッセージがGreptimeDBに書き込まれているかを確認できます。
