Skip to content

Data Integrations を使用してサブスクリプショントピック情報を取得する

本記事では、EMQX Cloud のデータ統合を利用して、クラウドサービスリソース(サードパーティのデータベースやメッセージキュー)からサブスクリプション関係を取得し、デバイスのサブスクリプションをプロキシする方法を紹介します。例として MySQL を使用しています。

開始前に、以下の操作を完了してください。

  • EMQX Cloud 上にデプロイメント(EMQX クラスター)が作成されていること。
  • Dedicated Flex プランのユーザーは、まず ピアリング接続の作成を完了してください。以下に記載されているすべての IP はリソースの内部ネットワーク IP を指します。(Dedicated Flex プランで NAT ゲートウェイを利用している場合は、リソースへの接続にパブリック IP を使用することも可能です)
  • BYOC プランのユーザーは、BYOC をデプロイした VPC とリソースが存在する VPC 間でピアリング接続を確立してください。以下に記載されているすべての IP はリソースの内部 IP を指します。パブリック IP 経由でリソースにアクセスする必要がある場合は、BYOC をデプロイした VPC に対してパブリッククラウドコンソールで NAT ゲートウェイを設定してください。

MySQL の設定

  1. MySQL のインストール

    bash
    docker run -d --restart=always \
      --name mysql \
      -p 3306:3306 \
      -e MYSQL_ROOT_PASSWORD=public \
      mysql/mysql-server:5.7
  2. 新規データベースの作成

    bash
    docker exec -it mysql mysql -uroot -ppublic
    CREATE DATABASE emqx;
    USE emqx;
  3. 新規サブスクリプション関係テーブルの作成

    TIP

    サブスクリプション関係テーブルの構造は変更できません。必ず上記の SQL 文で作成してください。

    以下の SQL 文で mqtt_sub テーブルを作成します。このテーブルはデバイスのサブスクリプション関係データを保存するために使用します。

    sql
    DROP TABLE IF EXISTS `mqtt_sub`;
    CREATE TABLE `mqtt_sub` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `clientid` varchar(64) DEFAULT NULL,
        `topic` varchar(180) DEFAULT NULL,
        `qos` tinyint(1) DEFAULT NULL,
        PRIMARY KEY (`id`),
        KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`),
        UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`),
        INDEX topic_index(`id`, `topic`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
  4. EMQX クラスターの IP セグメントからデータベースへのアクセス許可設定(任意)

    Dedicated Flex デプロイメントの場合、デプロイメントのセグメントを取得したい場合は、Deployment Details → View Peer Connection Information からデプロイメント VPC セグメントをコピーしてください。

    sql
    # Dedicated Flex デプロイメント
    GRANT ALL PRIVILEGES ON *.* TO root@'10.11.30.%' IDENTIFIED BY 'public' WITH GRANT OPTION;
    
    # Standard デプロイメント
    GRANT ALL PRIVILEGES ON *.* TO root@'%' IDENTIFIED BY 'public' WITH GRANT OPTION;
  5. テストデータの挿入と確認

    sql
    INSERT INTO mqtt_sub(clientid, topic, qos) values("test", "t1", 1);
    
    select * from mqtt_sub;

EMQX Cloud データ統合の設定

Deployment Details に移動し、EMQX ダッシュボードをクリックしてダッシュボードにアクセスします。

  1. 新規リソースの作成

    左メニューの Data Integrations → Resources をクリックし、新規リソースを作成します。ドロップダウンから MySQL リソースタイプを選択し、先ほど作成した MySQL データベース情報を入力して「Test」をクリックします。エラーが出た場合は、データベース設定が正しいかすぐに確認してください。 data integrationscreate resource

  2. ルールの入力

    左メニューの Data Integration をクリックし、設定したリソースを探して「New Rule」をクリックします。以下の SQL 文にマッチするルールを入力します。

    sql
    SELECT * FROM "$events/client_connected"

    create resource

  3. レスポンスアクションの追加

    「Next」をクリックし、最初に作成したリソースを選択、アクションタイプから「Proxy Subscriptions」→「Get Subscription List from MySQL」を選択します。

    mysql action

  4. ルール一覧に戻る

    rule list

  5. ルールのモニタリングを確認

    monitor

テスト

  1. MySQL にサブスクリプションデータを挿入

    client ID が client1、サブスクライブトピックが topic1、QoS が 1 のサブスクリプションデータを挿入します。

    sql
    INSERT INTO mqtt_sub(clientid, topic, qos) values("client1", "topic1", 1);
    select * from mqtt_sub;

    mysql_data

  2. MQTTX を使ってデプロイメントに接続

    broker.emqx.io を作成したデプロイメントの接続アドレスに置き換え、EMQX ダッシュボードでクライアント認証情報を追加してください。 この例では clientID を client1 に設定します。 mqttx_result

  3. ダッシュボードでサブスクリプション関係を確認

    Deployment Details に戻り、EMQX ダッシュボードをクリックしてダッシュボードにアクセスし、「Subscriptions」でクライアントのサブスクリプション関係を確認します。 monitor