Skip to content

Data Integrationを使ったサブスクリプショントピック情報の取得

本記事では、EMQX Platformのデータ統合機能を利用して、クラウドサービスリソース(サードパーティのデータベースやメッセージキュー)からサブスクリプション関係を取得し、デバイスのサブスクリプションをプロキシする方法を、MySQLを例に解説します。

開始前に以下の準備を完了してください。

  • EMQX Platform上にデプロイメント(EMQXクラスター)が作成されていること。
  • Dedicatedプラン利用者の場合:ピアリング接続の作成を先に完了してください。以下に記載のIPはすべてリソースの内部ネットワークIPを指します。(DedicatedプランでNATゲートウェイを利用している場合は、パブリックIPでの接続も可能です)
  • BYOCプラン利用者の場合:BYOCがデプロイされているVPCとリソースがあるVPC間でピアリング接続を確立してください。以下に記載のIPはすべてリソースの内部IPを指します。パブリックIP経由でリソースにアクセスする場合は、BYOCがデプロイされているVPCに対してパブリッククラウドコンソールでNATゲートウェイを設定してください。

MySQLの設定

  1. MySQLのインストール

    bash
    docker run -d --restart=always \
      --name mysql \
      -p 3306:3306 \
      -e MYSQL_ROOT_PASSWORD=public \
      mysql/mysql-server:5.7
  2. データベースの作成

    bash
    docker exec -it mysql mysql -uroot -ppublic
    CREATE DATABASE emqx;
    USE emqx;
  3. サブスクリプション関係テーブルの作成

    TIP

    サブスクリプション関係テーブルの構造は変更できません。上記のSQL文を使用して作成してください。

    以下のSQL文で mqtt_sub テーブルを作成します。このテーブルはデバイスのサブスクリプション関係データを保存するために使用します。

    sql
    DROP TABLE IF EXISTS `mqtt_sub`;
    CREATE TABLE `mqtt_sub` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `clientid` varchar(64) DEFAULT NULL,
        `topic` varchar(180) DEFAULT NULL,
        `qos` tinyint(1) DEFAULT NULL,
        PRIMARY KEY (`id`),
        KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`),
        UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`),
        INDEX topic_index(`id`, `topic`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
  4. EMQXクラスターのIPセグメントからデータベースへのアクセス許可設定(任意)

    Dedicatedデプロイメントの場合、デプロイメントのセグメントを確認したい場合は、Deployment Details → Peer Connection Informationの表示からデプロイメントVPCセグメントをコピーしてください。

    sql
    # Dedicatedデプロイメント
    GRANT ALL PRIVILEGES ON *.* TO root@'10.11.30.%' IDENTIFIED BY 'public' WITH GRANT OPTION;
    
    # Standardデプロイメント
    GRANT ALL PRIVILEGES ON *.* TO root@'%' IDENTIFIED BY 'public' WITH GRANT OPTION;
  5. テストデータの挿入と確認

    sql
    INSERT INTO mqtt_sub(clientid, topic, qos) values("test", "t1", 1);
    
    select * from mqtt_sub;

EMQX Platformのデータ統合設定

Deployment DetailsからEMQX Dashboardへアクセスし、ダッシュボードを開きます。

  1. リソースの新規作成

    左メニューのData Integrations → Resourcesをクリックし、新規リソースを作成します。ドロップダウンからMySQLリソースタイプを選択し、先ほど作成したMySQLの情報を入力してTestをクリックします。エラーが出た場合は設定を再確認してください。 data integrationscreate resource

  2. ルールの作成

    左メニューのData Integrationをクリックし、作成したリソースを選択してNew Ruleをクリックします。以下のSQL文を入力してルールを作成します。

    sql
    SELECT * FROM "$events/client_connected"

    create resource

  3. レスポンスアクションの追加

    Nextをクリックし、最初に作成したリソースを選択します。アクションタイプのドロップダウンから「Proxy Subscriptions」→「Get Subscription List from MySQL」を選択します。

    mysql action

  4. ルール一覧に戻る

    rule list

  5. ルールのモニタリング確認

    monitor

テスト

  1. MySQLにサブスクリプションデータを挿入

    クライアントIDが client1、サブスクライブトピックが topic1、QoSが1のサブスクリプションデータを挿入します。

    sql
    INSERT INTO mqtt_sub(clientid, topic, qos) values("client1", "topic1", 1);
    select * from mqtt_sub;

    mysql_data

  2. MQTTXを使ってデプロイメントに接続

    broker.emqx.ioを作成したデプロイメントの接続アドレスに置き換え、EMQX Dashboardでクライアント認証情報を追加してください。 この例ではclientIDを client1 に設定します。 mqttx_result

  3. ダッシュボードでサブスクリプション関係を確認

    Deployment DetailsからEMQX Dashboardへアクセスし、Subscriptionsでクライアントのサブスクリプション関係を確認します。 monitor