# Apache Doris に MQTT データを取り込む

::: tip 注意

Apache Doris とのデータ統合は、EMQX バージョン 5.91 以降の専用版で利用可能です。

:::

[Apache Doris](https://doris.apache.org/) は、高い同時接続性、高性能、使いやすさで知られる最新の大規模並列処理（MPP）分析データベースシステムです。リアルタイム分析やデータウェアハウジングのシナリオに特に適しています。EMQX 5.10.0 以降では、MQTT データを Apache Doris と統合でき、効率的な保存、リアルタイム分析、強力なデータ可視化を実現します。

本ガイドでは、EMQX プラットフォームと Apache Doris 間のデータ統合の設定および検証方法について実践的に説明します。

::: tip 注意

EMQX における Apache Doris データ統合は、Apache Doris バージョン 2.1.7 以降をサポートしています。

:::

## 動作概要

Apache Doris データ統合は EMQX プラットフォームの標準機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的な IoT アプリケーションでは、EMQX プラットフォームがデバイス接続とメッセージの転送を担当し、Apache Doris はデバイスの状態やメタデータ、メッセージデータの保存および分析を担当します。

<img src="./_assets/doris-integration.png" alt="doris-integration" style="zoom:67%;" />

EMQX プラットフォームはルールエンジンとアクションを通じてデバイスイベントやデータを Apache Doris に転送します。アプリケーションは Apache Doris のデータを読み取り、デバイスの状態を把握したり、オンライン・オフラインの記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです。

- **IoT デバイスが EMQX プラットフォームに接続**: IoT デバイスが MQTT プロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイス ID、送信元 IP アドレスなどの情報が含まれます。
- **メッセージのパブリッシュと受信**: デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQX プラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- **ルールエンジンによるメッセージ処理**: 組み込みのルールエンジンにより、特定のソースからのメッセージやイベントをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- **Apache Doris への書き込み**: ルールによりメッセージを Apache Doris に書き込む処理がトリガーされます。SQL テンプレートを利用して、ルール処理結果からデータを抽出し SQL を構築、Apache Doris に送信して実行することで、メッセージの特定フィールドを対応するテーブルやカラムに書き込んだり更新したりします。

イベントおよびメッセージデータが Apache Doris に書き込まれた後は、Apache Doris に接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です。

- Grafana などの可視化ツールに接続し、データに基づくチャートを生成してデータの変化を表示。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題をタイムリーに解消。

## 特長とメリット

Apache Doris とのデータ統合により、以下の特長と利点をビジネスにもたらします。

- **柔軟なイベント処理**: EMQX ルールエンジンを通じて、Apache Doris はデバイスのライフサイクルイベントを処理可能であり、IoT アプリケーション実装に必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイス故障や異常動作、トレンド変化を迅速に検知し、適切な対策を講じられます。
- **メッセージ変換**: メッセージは EMQX ルールを通じて多様な処理・変換が可能であり、Apache Doris への保存や利用をより便利にします。
- **リアルタイムデータ取り込み**: Apache Doris は HTTP および JDBC インターフェースによるリアルタイムデータ取り込みをサポートします。EMQX と統合することで、MQTT データを低レイテンシで直接 Doris テーブルに書き込め、即時のクエリや分析が必要なシナリオに最適です。
- **ストリーミング同期**: Apache Doris は Flink、Kafka、トランザクションデータベースなどのリアルタイムストリームの取り込みもサポートします。これにより、EMQX の MQTT データと他のストリーミングデータソースを統合した統一パイプラインを構築し、包括的なリアルタイム分析が可能です。
- **標準 SQL とエコシステム互換性**: Doris は MySQL 構文に完全互換で標準 SQL をサポートし、新たな言語を学ぶことなく強力な分析クエリを実行できます。BI ツールやクライアントアプリケーションと容易に統合でき、ダッシュボード、レポート、自動化ワークフローに活用できます。
- **実行時メトリクス**: 各アクションの実行時メトリクス（総メッセージ数、成功/失敗数、現在のレートなど）を閲覧可能です。

柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイム監視・分析機能により、効率的で信頼性が高くスケーラブルな IoT アプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。

## はじめる前に

このセクションでは、EMQX プラットフォームコンソールで Apache Doris データ統合を作成する前に必要な準備（Apache Doris サーバーのインストールやデータテーブルの作成）について説明します。

### 前提条件

- [ルール](./rules.md)を理解していること。
- [データ統合](./introduction.md)を理解していること。

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Apache Doris サーバーのインストール

[公式ガイド](https://doris.apache.org/docs/dev/gettingStarted/quick-start#use-docker-for-quick-deployment)に従い、Docker Compose を使ってローカルで Doris を起動してください。

### データテーブルの作成

MySQL クライアントを使って Doris Frontend に接続し、コマンドを発行します。詳細は[公式ドキュメント](https://doris.apache.org/docs/dev/gettingStarted/quick-start#run-queries)を参照してください。

例：

```sh
mysql -uroot -P9030 -h127.0.0.1
```

Apache Doris に以下のデータベースと2つのテーブルを作成してください。

- `emqx_messages` テーブルは、クライアント ID、トピック、ペイロード、作成日時を保存します。
- `emqx_client_events` テーブルは、クライアント ID、イベントタイプ、作成日時を保存します。

```sql
create database mqtt;
use mqtt;

create table if not exists
  emqx_messages(
    clientid varchar,
    topic string,
    payload string,
    created_at datetime
  )
  properties (replication_num = 1);

create table if not exists
  emqx_client_events(
    clientid varchar,
    event varchar,
    created_at datetime)
  properties (replication_num = 1);
```

## コネクターの作成

このセクションでは、EMQX プラットフォームが Apache Doris サーバーにデータを送信できるようにコネクターを作成する方法を説明します。

1. デプロイメントに移動し、左側メニューから **データ統合** をクリックします。
2. 初めてコネクターを作成する場合は、**データ永続化** カテゴリの下にある **Doris** を選択します。すでにコネクターを作成済みの場合は、**新規コネクター** を選択し、続いて **データ永続化** カテゴリの下の **Doris** を選択します。
3. **新規コネクター** ページで以下の情報を設定します。
   - **コネクター名**: システムが自動生成します。
   - **サーバーホスト**: `127.0.0.1:9030` または Apache Doris サーバーがリモートの場合は実際のホスト名を入力します。
   - **データベース名**: `mqtt` を入力します。
   - **ユーザー名**: `root` を入力します。
   - **パスワード**: `public` を入力します。
4. ビジネスニーズに応じて詳細設定（任意）を行います。
5. **新規** ボタンをクリックする前に、**テスト** をクリックしてコネクターが Apache Doris サーバーに接続できるか確認できます。
6. ページ下部の **新規** ボタンをクリックしてコネクターの作成を完了します。

## メッセージ保存用ルールの作成

このセクションでは、ソース MQTT トピック `t/#` からのメッセージを処理し、設定済みアクションを通じて Apache Doris のデータテーブル `emqx_messages` に保存するルールの作成方法を説明します。

1. ルールエリアの **新規ルール** をクリックするか、作成したコネクターの **アクション** 列にある新規ルールアイコンをクリックします。

2. ルール ID に `my_rule` を入力し、**SQL エディター** に以下のステートメントを設定します。これはトピック `t/#` 以下の MQTT メッセージを Apache Doris に保存することを意味します。

   注意: 独自の SQL 構文を指定する場合は、アクションで必要なすべてのフィールドを `SELECT` 部分に含めてください。

   ```sql
   SELECT
     *
   FROM
     "t/#"
   ```

   ::: tip

   初心者の方は **SQL Examples** をクリックし、**Enable Test** を有効にして SQL ルールを学習・テストできます。

   :::

3. **次へ** をクリックしてアクションを追加します。

4. **コネクター** ドロップダウンから先ほど作成したコネクターを選択します。

5. 利用する機能に応じて **SQL テンプレート** を設定します。

   注意: これは事前処理された SQL なので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

   ```sql
   INSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES(
     ${clientid},
     ${topic},
     ${payload},
     FROM_UNIXTIME(${timestamp}/1000)
   )
   ```

   SQL テンプレート内でプレースホルダ変数が未定義の場合、**SQL テンプレート** 上部の **Undefined Vars as Null** スイッチでルールエンジンの動作を切り替えられます。

   - **無効**（デフォルト）: ルールエンジンは文字列 `undefined` をデータベースに挿入します。
   - **有効**: 変数が未定義の場合、ルールエンジンは `NULL` を挿入します。

     ::: tip

     可能な限りこのオプションは常に有効にしてください。無効にするのは後方互換性を確保する場合のみです。

     :::

6. **フォールバックアクション（任意）**: メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細は [フォールバックアクション](./actions.md#fallback-actions) を参照してください。

7. **確認** ボタンをクリックしてアクション設定を完了します。

8. **新規ルール作成成功** ポップアップで **ルールに戻る** をクリックし、データ統合設定の一連の流れを完了します。

## イベント記録用ルールの作成

このセクションでは、クライアントのオンライン／オフライン状態を記録し、設定済みアクションを通じて Apache Doris テーブル `emqx_client_events` にイベントデータを保存するルールの作成方法を説明します。

ルール作成手順は[メッセージ保存用ルールの作成](#メッセージ保存用ルールの作成)とほぼ同様ですが、SQL ルール構文と SQL テンプレートが異なります。

オンライン／オフライン状態記録用のルールは、**SQL エディター** に以下のステートメントを入力します。

```sql
SELECT
  *
FROM
  "$events/client/connected", "$events/client/disconnected"
```

クライアントイベントデータをデータテーブルに挿入するための SQL テンプレートは以下の通りです。

```sql
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  ${clientid},
  ${event},
  FROM_UNIXTIME(${timestamp}/1000)
)
```

## ルールのテスト

MQTTX を使ってトピック `t/1` にメッセージを送信し、オンライン／オフラインイベントをトリガーします。

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Apache Doris" }'
```

2つのアクションの稼働状況を確認してください。新しい受信メッセージと送信メッセージがそれぞれ1件ずつ、イベントレコードが2件あるはずです。

`emqx_messages` データテーブルにデータが書き込まれているか確認します。

```bash
mysql> select * from emqx_messages;
+----------+-------+--------------------------+---------------------+
| clientid | topic | payload                  | created_at          |
+----------+-------+--------------------------+---------------------+
| emqx_c   | t/1   | { "msg": "hello Apache Doris" } | 2022-12-09 08:44:07 |
+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)
```

`emqx_client_events` テーブルにデータが書き込まれているか確認します。

```bash
mysql> select * from emqx_client_events;
+----------+---------------------+---------------------+
| clientid | event               | created_at          |
+----------+---------------------+---------------------+
| emqx_c   | client.connected    | 2022-12-09 08:44:07 |
| emqx_c   | client.disconnected | 2022-12-09 08:44:07 |
+----------+---------------------+---------------------+
2 rows in set (0.00 sec)
```
