MQTTデータをMySQLに取り込む
MySQLは、高い信頼性と安定性を持つ広く使われているリレーショナルデータベースであり、迅速にインストール、設定、利用が可能です。MySQLデータ統合は、MQTTメッセージを効率的にMySQLデータベースに保存できるほか、イベントトリガーを通じてMySQL内のデータをリアルタイムに更新または削除することもサポートしています。MySQLデータ統合を活用することで、メッセージの保存、デバイスのオンライン/オフライン状態の更新、デバイスの動作記録などの機能を簡単に実装し、柔軟なIoTデータストレージおよびデバイス管理機能を実現できます。
本ページでは、EMQX CloudとMySQL間のデータ統合について、作成および検証の実践的な手順を紹介します。
動作概要
MySQLデータ統合はEMQX Cloudに標準搭載された機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的なIoTアプリケーションにおいて、EMQX CloudはIoTプラットフォームとしてデバイスの接続とメッセージの中継を担当し、MySQLはデータストレージプラットフォームとしてデバイスの状態やメタデータ、メッセージデータの保存およびデータ分析を担当します。

EMQX Cloudはルールエンジンを通じてデバイスのイベントやデータをMySQLに転送します。アプリケーションはMySQL内のデータを読み取り、デバイスの状態を把握したり、デバイスのオンライン・オフライン記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです。
- IoTデバイスがEMQX Cloudに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの属性情報が含まれます。
- メッセージのパブリッシュと受信:デバイスはテレメトリおよび状態データを特定のトピックにパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンにより、特定のトピックに基づいてメッセージやイベントを処理できます。ルールエンジンは該当するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
- MySQLへの書き込み:ルールによりメッセージをMySQLに書き込む処理がトリガーされます。SQLテンプレートを用いてルール処理結果からデータを抽出し、SQLを構築してMySQLに送信し、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりします。
イベントおよびメッセージデータがMySQLに書き込まれた後は、MySQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です。
- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題を迅速に解消する。
特長と利点
MySQLとのデータ統合は、以下のような特長と利点をビジネスにもたらします。
- 柔軟なイベント処理:EMQX Cloudのルールエンジンを通じて、MySQLはデバイスのライフサイクルイベントを処理でき、IoTアプリケーション実装に必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイスの故障や異常動作、トレンド変化を迅速に検知し、適切な対応を取れます。
- メッセージ変換:メッセージはEMQX Cloudのルールを通じて大規模な処理・変換が可能であり、保存や利用がより便利になります。
- 柔軟なデータ操作:EMQX Cloudが提供するSQLテンプレートを利用して、特定フィールドのデータをMySQLデータベースの対応テーブルやカラムに簡単に書き込み・更新でき、柔軟なデータ保存と管理を実現します。
- 業務プロセスの統合:データ統合により、デバイスデータをMySQLの豊富なエコシステムアプリケーションと連携可能であり、ERP、CRM、その他カスタム業務システムとの統合を促進し、高度な業務プロセスや自動化を実現します。
- ランタイムメトリクス:各ルールの総メッセージ数、成功/失敗数、現在の処理レートなどの実行時メトリクスの閲覧をサポートします。
柔軟なイベント処理、大規模なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。
はじめる前に
このセクションでは、EMQX CloudでMySQLデータ統合を作成する前に必要な準備について説明します。MySQLサーバーのインストールやデータテーブルの作成が含まれます。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
MySQLサーバーのインストール
- Dockerを使ってMySQLサーバーをインストールし、Dockerイメージを起動します。
# MySQLのDockerイメージを起動し、パスワードをpublicに設定
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=public -d mysql
# コンテナにアクセス
docker exec -it mysql bash
# コンテナ内でMySQLサーバーに接続し、パスワードを入力
mysql -u root -p
# データベースを作成し、選択
CREATE DATABASE emqx_data CHARACTER SET utf8mb4;
use emqx_data;- テーブルを作成します。以下のSQLコマンドで
temp_humテーブルを作成します。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。
CREATE TABLE `temp_hum` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`up_timestamp` timestamp NULL DEFAULT NULL,
`client_id` varchar(32) DEFAULT NULL,
`temp` float unsigned DEFAULT NULL,
`hum` float unsigned DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `up_timestamp_client_id` (`up_timestamp`,`client_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4;MySQLコネクターの作成
データ統合ルールを作成する前に、MySQLサーバーにアクセスするためのMySQLコネクターを作成する必要があります。
ご自身のデプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるMySQLを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリのMySQLを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- サーバーホスト:サーバーのIPアドレスとポート番号。
- データベース名:
emqx_dataを入力。 - ユーザー名:
rootを入力。 - パスワード:
publicを入力。 - 暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをクリックします。
詳細設定(任意):詳細設定を参照してください。
テストボタンをクリックします。MySQLサービスにアクセス可能であれば、成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをMySQLに転送するためのアクションをルールに追加する必要があります。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時間
up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードから温度と湿度を読み取ります。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
利用する機能に応じてSQLテンプレートを設定します。
注意:これは事前処理されたSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlINSERT INTO temp_hum (up_timestamp, client_id, temp, hum) VALUES ( FROM_UNIXTIME(${up_timestamp}/1000), ${client_id}, ${temp}, ${hum} )SQLテンプレート内でプレースホルダ変数が未定義の場合、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を設定できます。
無効(デフォルト):ルールエンジンは
undefinedという文字列をデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLをデータベースに挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。
詳細設定(任意):詳細設定を参照してください。
確認ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度と湿度のデータ報告をシミュレートするには、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
データのダンプ結果を確認します。
mysql> SELECT * FROM temp_hum ORDER BY up_timestamp DESC LIMIT 10;
+----+---------------------+-------------+------+------+
| id | up_timestamp | client_id | temp | hum |
+----+---------------------+-------------+------+------+
| 26 | 2024-03-20 08:44:55 | test_client | 27.5 | 41.8 |
+----+---------------------+-------------+------+------+
1 row in set (0.00 sec)- コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールおよびルール内の全アクションの統計情報が表示されます。