Data Integrations を使用してサブスクリプショントピック情報を取得する
本記事では、EMQX Cloud のデータ統合を利用して、クラウドサービスリソース(サードパーティのデータベースやメッセージキュー)からサブスクリプション関係を取得し、デバイスのサブスクリプションをプロキシする方法を紹介します。例として MySQL を使用しています。
開始前に、以下の操作を完了してください。
- EMQX Cloud 上にデプロイメント(EMQX クラスター)が作成されていること。
- Dedicated Flex プランのユーザーは、まず ピアリング接続の作成を完了してください。以下に記載されているすべての IP はリソースの内部ネットワーク IP を指します。(Dedicated Flex プランで NAT ゲートウェイを利用している場合は、リソースへの接続にパブリック IP を使用することも可能です)
- BYOC プランのユーザーは、BYOC をデプロイした VPC とリソースが存在する VPC 間でピアリング接続を確立してください。以下に記載されているすべての IP はリソースの内部 IP を指します。パブリック IP 経由でリソースにアクセスする必要がある場合は、BYOC をデプロイした VPC に対してパブリッククラウドコンソールで NAT ゲートウェイを設定してください。
MySQL の設定
MySQL のインストール
bashdocker run -d --restart=always \ --name mysql \ -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=public \ mysql/mysql-server:5.7新規データベースの作成
bashdocker exec -it mysql mysql -uroot -ppublic CREATE DATABASE emqx; USE emqx;新規サブスクリプション関係テーブルの作成
TIP
サブスクリプション関係テーブルの構造は変更できません。必ず上記の SQL 文で作成してください。
以下の SQL 文で
mqtt_subテーブルを作成します。このテーブルはデバイスのサブスクリプション関係データを保存するために使用します。sqlDROP TABLE IF EXISTS `mqtt_sub`; CREATE TABLE `mqtt_sub` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `clientid` varchar(64) DEFAULT NULL, `topic` varchar(180) DEFAULT NULL, `qos` tinyint(1) DEFAULT NULL, PRIMARY KEY (`id`), KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`), UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`), INDEX topic_index(`id`, `topic`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;EMQX クラスターの IP セグメントからデータベースへのアクセス許可設定(任意)
Dedicated Flex デプロイメントの場合、デプロイメントのセグメントを取得したい場合は、Deployment Details → View Peer Connection Information からデプロイメント VPC セグメントをコピーしてください。
sql# Dedicated Flex デプロイメント GRANT ALL PRIVILEGES ON *.* TO root@'10.11.30.%' IDENTIFIED BY 'public' WITH GRANT OPTION; # Standard デプロイメント GRANT ALL PRIVILEGES ON *.* TO root@'%' IDENTIFIED BY 'public' WITH GRANT OPTION;テストデータの挿入と確認
sqlINSERT INTO mqtt_sub(clientid, topic, qos) values("test", "t1", 1); select * from mqtt_sub;
EMQX Cloud データ統合の設定
Deployment Details に移動し、EMQX ダッシュボードをクリックしてダッシュボードにアクセスします。
新規リソースの作成
左メニューの Data Integrations → Resources をクリックし、新規リソースを作成します。ドロップダウンから MySQL リソースタイプを選択し、先ほど作成した MySQL データベース情報を入力して「Test」をクリックします。エラーが出た場合は、データベース設定が正しいかすぐに確認してください。


ルールの入力
左メニューの
Data Integrationをクリックし、設定したリソースを探して「New Rule」をクリックします。以下の SQL 文にマッチするルールを入力します。sqlSELECT * FROM "$events/client_connected"
レスポンスアクションの追加
「Next」をクリックし、最初に作成したリソースを選択、アクションタイプから「Proxy Subscriptions」→「Get Subscription List from MySQL」を選択します。

ルール一覧に戻る

ルールのモニタリングを確認

テスト
MySQL にサブスクリプションデータを挿入
client ID が client1、サブスクライブトピックが topic1、QoS が 1 のサブスクリプションデータを挿入します。
sqlINSERT INTO mqtt_sub(clientid, topic, qos) values("client1", "topic1", 1); select * from mqtt_sub;
MQTTX を使ってデプロイメントに接続
broker.emqx.io を作成したデプロイメントの接続アドレスに置き換え、EMQX ダッシュボードでクライアント認証情報を追加してください。 この例では clientID を client1 に設定します。

ダッシュボードでサブスクリプション関係を確認
Deployment Details に戻り、EMQX ダッシュボードをクリックしてダッシュボードにアクセスし、「Subscriptions」でクライアントのサブスクリプション関係を確認します。
