# MQTTデータをMySQLに取り込む

[MySQL](https://www.mysql.com/)は、高い信頼性と安定性を持つ広く使われているリレーショナルデータベースであり、迅速にインストール、設定、利用が可能です。MySQLデータ統合は、MQTTメッセージをMySQLデータベースに効率的に保存できるだけでなく、イベントトリガーを通じてMySQL内のデータをリアルタイムに更新または削除することもサポートしています。MySQLデータ統合を活用することで、メッセージ保存、デバイスのオンライン／オフライン状態更新、デバイス動作記録などの機能を簡単に実装し、柔軟なIoTデータストレージおよびデバイス管理機能を実現できます。

本ページでは、EMQX PlatformとMySQL間のデータ統合について、実践的な作成および検証手順を紹介します。

## 動作概要

MySQLデータ統合はEMQX Platformに標準搭載されている機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的なIoTアプリケーションでは、EMQX PlatformがIoTプラットフォームとしてデバイス接続とメッセージの中継を担当し、MySQLがデータストレージプラットフォームとしてデバイス状態やメタデータ、メッセージデータの保存およびデータ分析を担います。

![EMQX Platform MySQLデータ統合](./_assets/data_integration_mysql.jpg)

EMQX Platformはルールエンジンを通じてデバイスイベントやデータをMySQLに転送します。アプリケーションはMySQL内のデータを読み取り、デバイス状態を把握したり、デバイスのオンライン・オフライン記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです：

- **IoTデバイスがEMQX Platformに接続**：IoTデバイスがMQTTプロトコルを介して正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの属性情報が含まれます。
- **メッセージのパブリッシュと受信**：デバイスはテレメトリおよび状態データを特定のトピックにパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンにより、特定のトピックに基づくメッセージやイベントを処理できます。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報付加などの処理を行います。
- **MySQLへの書き込み**：ルールによりメッセージのMySQLへの書き込みがトリガーされます。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQLを構築、MySQLに送信して実行することで、メッセージの特定フィールドを対応するデータベースのテーブルやカラムに書き込んだり更新したりできます。

イベントおよびメッセージデータがMySQLに書き込まれた後は、MySQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です：

- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題をタイムリーに解消する。

## 特長とメリット

MySQLとのデータ統合は、以下の特長とメリットをビジネスにもたらします：

- **柔軟なイベント処理**：EMQX Platformのルールエンジンを通じて、MySQLはデバイスのライフサイクルイベントを処理でき、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイスの故障や異常動作、傾向変化を迅速に検知し、適切な対策を講じることが可能です。
- **メッセージ変換**：メッセージはEMQX Platformのルールを通じて多様な処理や変換が可能であり、MySQLへの保存や利用がより便利になります。
- **柔軟なデータ操作**：EMQX Platformが提供するSQLテンプレートを活用して、特定フィールドのデータをMySQLの対応テーブル・カラムに簡単に書き込み・更新でき、柔軟なデータ保存・管理が実現します。
- **ビジネスプロセスの統合**：データ統合により、デバイスデータをMySQLの豊富なエコシステムアプリケーションと連携可能にし、ERP、CRM、その他カスタムビジネスシステムとの統合を促進して高度な業務プロセスや自動化を実現します。
- **ランタイムメトリクス**：各ルールの総メッセージ数、成功／失敗数、現在の処理レートなどのランタイムメトリクスの閲覧をサポートします。

柔軟なイベント処理、多様なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

## はじめる前に

ここでは、EMQX PlatformでMySQLデータ統合を作成する前に必要な準備、MySQLサーバーのインストールやデータテーブルの作成について説明します。

### 前提条件

- [データ統合](./introduction.md)の知識
- EMQX Platformのデータ統合における[ルール](./rules.md)の知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### MySQLサーバーのインストール

1. Dockerを使ってMySQLサーバーをインストールし、Dockerイメージを起動します。

```bash
# MySQLのDockerイメージを起動し、パスワードをpublicに設定
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=public -d mysql

# コンテナにアクセス
docker exec -it mysql bash

# コンテナ内でMySQLサーバーに接続し、設定したパスワードを入力
mysql -u root -p

# データベースを作成し、選択
CREATE DATABASE emqx_data CHARACTER SET utf8mb4;
use emqx_data;
```

2. テーブルを作成します。以下のSQLコマンドを使って`temp_hum`テーブルを作成します。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。

```sql
CREATE TABLE `temp_hum` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `up_timestamp` timestamp NULL DEFAULT NULL,
  `client_id` varchar(32) DEFAULT NULL,
  `temp` float unsigned DEFAULT NULL,
  `hum` float unsigned DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `up_timestamp_client_id` (`up_timestamp`,`client_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4;
```

## MySQLコネクターの作成

データ統合ルールを作成する前に、MySQLサーバーにアクセスするためのMySQLコネクターを作成する必要があります。

1. デプロイメントに移動し、左側のナビゲーションメニューから**データ統合**をクリックします。
2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの下にある**MySQL**を選択します。すでにコネクターを作成済みの場合は、**新しいコネクター**を選択し、続いて**データ永続化**カテゴリの**MySQL**を選択します。
3. **コネクター名**：システムが自動的にコネクター名を生成します。
4. 接続情報を入力します：

   - **サーバーホスト**：サーバーのIPアドレスとポート。
   - **データベース名**：`emqx_data`と入力。
   - **ユーザー名**：`root`と入力。
   - **パスワード**：`public`と入力。
   - 暗号化接続を確立したい場合は、**TLSを有効にする**のトグルスイッチをクリックします。

5. 詳細設定（任意）：[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-mysql.html#advanced-configurations)を参照してください。
6. **テスト**ボタンをクリックします。MySQLサービスにアクセス可能であれば、成功のメッセージが表示されます。
7. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理済みデータをMySQLに転送するためのアクションをルールに追加します。

1. ルールエリアの**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールマッチング用のSQL文を入力します。以下のルールでは、メッセージが報告された時間`up_timestamp`、クライアントID、ペイロードを`temp_hum/emqx`トピックから読み取ります。また、このトピックから温度と湿度も取得します。

   ```sql
    SELECT
      timestamp as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL例**をクリックし、**テストを有効にする**を使ってSQLルールを学習・テストできます。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. 利用する機能に応じて**SQLテンプレート**を設定します：

   注意：これは前処理済みのSQLのため、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

   ```sql
    INSERT INTO temp_hum (up_timestamp, client_id, temp, hum)
    VALUES (
      FROM_UNIXTIME(${up_timestamp}/1000),
      ${client_id},
      ${temp},
      ${hum}
    )
   ```

   SQLテンプレート内でプレースホルダー変数が未定義の場合は、**SQLテンプレート**上部の**未定義変数をNULLとして扱う**スイッチでルールエンジンの動作を設定できます：

   - **無効**（デフォルト）：ルールエンジンは文字列`undefined`をデータベースに挿入します。
   - **有効**：変数が未定義の場合、`NULL`をデータベースに挿入します。

     ::: tip

     可能な限りこのオプションは有効にすることを推奨します。無効にするのは後方互換性を確保する場合のみです。

     :::

6. 詳細設定（任意）：[詳細設定](https://docs.emqx.com/en/enterprise/latest/data-integration/data-bridge-mysql.html#advanced-configurations)を参照してください。

7. **確認**ボタンをクリックしてルール作成を完了します。

8. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合の設定チェーンが完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/)を使って温度と湿度のデータ報告をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. データダンプ結果を確認します。

```bash
mysql> SELECT * FROM temp_hum ORDER BY up_timestamp DESC LIMIT 10;
+----+---------------------+-------------+------+------+
| id | up_timestamp        | client_id   | temp | hum  |
+----+---------------------+-------------+------+------+
| 26 | 2024-03-20 08:44:55 | test_client | 27.5 | 41.8 |
+----+---------------------+-------------+------+------+
1 row in set (0.00 sec)
```

3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計情報を閲覧できます。
