# Data Integrationを使ったサブスクリプショントピック情報の取得

本記事では、EMQX Platformのデータ統合機能を利用して、クラウドサービスリソース（サードパーティのデータベースやメッセージキュー）からサブスクリプション関係を取得し、デバイスのサブスクリプションをプロキシする方法を、MySQLを例に解説します。

開始前に以下の準備を完了してください。

- EMQX Platform上にデプロイメント（EMQXクラスター）が作成されていること。
- Dedicatedプラン利用者の場合：[ピアリング接続の作成](../deployments/vpc_peering.md)を先に完了してください。以下に記載のIPはすべてリソースの内部ネットワークIPを指します。（Dedicatedプランで[NATゲートウェイ](../vas/nat-gateway.md)を利用している場合は、パブリックIPでの接続も可能です）
- BYOCプラン利用者の場合：BYOCがデプロイされているVPCとリソースがあるVPC間でピアリング接続を確立してください。以下に記載のIPはすべてリソースの内部IPを指します。パブリックIP経由でリソースにアクセスする場合は、BYOCがデプロイされているVPCに対してパブリッククラウドコンソールでNATゲートウェイを設定してください。

## MySQLの設定

1. MySQLのインストール

   ```bash
   docker run -d --restart=always \
     --name mysql \
     -p 3306:3306 \
     -e MYSQL_ROOT_PASSWORD=public \
     mysql/mysql-server:5.7
   ```

2. データベースの作成

   ```bash
   docker exec -it mysql mysql -uroot -ppublic
   CREATE DATABASE emqx;
   USE emqx;
   ```

3. サブスクリプション関係テーブルの作成

   ::: tip
   サブスクリプション関係テーブルの構造は変更できません。上記のSQL文を使用して作成してください。
   :::

   以下のSQL文で `mqtt_sub` テーブルを作成します。このテーブルはデバイスのサブスクリプション関係データを保存するために使用します。

   ```sql
   DROP TABLE IF EXISTS `mqtt_sub`;
   CREATE TABLE `mqtt_sub` (
       `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
       `clientid` varchar(64) DEFAULT NULL,
       `topic` varchar(180) DEFAULT NULL,
       `qos` tinyint(1) DEFAULT NULL,
       PRIMARY KEY (`id`),
       KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`),
       UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`),
       INDEX topic_index(`id`, `topic`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
   ```

4. EMQXクラスターのIPセグメントからデータベースへのアクセス許可設定（任意）

   Dedicatedデプロイメントの場合、デプロイメントのセグメントを確認したい場合は、Deployment Details → Peer Connection Informationの表示からデプロイメントVPCセグメントをコピーしてください。

   ```sql
   # Dedicatedデプロイメント
   GRANT ALL PRIVILEGES ON *.* TO root@'10.11.30.%' IDENTIFIED BY 'public' WITH GRANT OPTION;

   # Standardデプロイメント
   GRANT ALL PRIVILEGES ON *.* TO root@'%' IDENTIFIED BY 'public' WITH GRANT OPTION;
   ```

5. テストデータの挿入と確認

   ```sql
   INSERT INTO mqtt_sub(clientid, topic, qos) values("test", "t1", 1);
   
   select * from mqtt_sub;
   ```

## EMQX Platformのデータ統合設定

Deployment DetailsからEMQX Dashboardへアクセスし、ダッシュボードを開きます。

1. リソースの新規作成

   左メニューのData Integrations → Resourcesをクリックし、新規リソースを作成します。ドロップダウンからMySQLリソースタイプを選択し、先ほど作成したMySQLの情報を入力してTestをクリックします。エラーが出た場合は設定を再確認してください。
   ![data integrations](./_assets/data_integrations_get_sub_from_mysql.png)
   ![create resource](./_assets/create_mysql_resource_get_subs_from_mysql.png)

2. ルールの作成

   左メニューのData Integrationをクリックし、作成したリソースを選択してNew Ruleをクリックします。以下のSQL文を入力してルールを作成します。

   ```sql
   SELECT * FROM "$events/client_connected"
   ```

   ![create resource](./_assets/rule_get_subs_mysql.png)

3. レスポンスアクションの追加

   Nextをクリックし、最初に作成したリソースを選択します。アクションタイプのドロップダウンから「Proxy Subscriptions」→「Get Subscription List from MySQL」を選択します。

   ![mysql action](./_assets/get_subs_mysql_action.png)

4. ルール一覧に戻る

   ![rule list](./_assets/view_rule_engine_mysql_get_subs.png)

5. ルールのモニタリング確認

   ![monitor](./_assets/view_monitor_mysql_get_subs.png)

## テスト

1. MySQLにサブスクリプションデータを挿入

   クライアントIDが `client1`、サブスクライブトピックが `topic1`、QoSが1のサブスクリプションデータを挿入します。

   ```sql
   INSERT INTO mqtt_sub(clientid, topic, qos) values("client1", "topic1", 1);
   select * from mqtt_sub;
   ```

   ![mysql_data](./_assets/insert_subs_mysql.png)

2. [MQTTX](https://mqttx.app/)を使ってデプロイメントに接続

   broker.emqx.ioを作成したデプロイメントの接続アドレスに置き換え、EMQX Dashboardでクライアント認証情報を追加してください。
   この例ではclientIDを `client1` に設定します。
   ![mqttx_result](./_assets/connect_mqtt_get_subs_mysql.png)

3. ダッシュボードでサブスクリプション関係を確認

   Deployment DetailsからEMQX Dashboardへアクセスし、`Subscriptions`でクライアントのサブスクリプション関係を確認します。
   ![monitor](./_assets/dashboard_get_subs_mysql.png)
