MySQLへのMQTTデータ取り込み
MySQLは、高い信頼性と安定性を持つ広く利用されているリレーショナルデータベースであり、迅速にインストール、設定、利用が可能です。MySQLデータ統合により、MQTTメッセージを効率的にMySQLデータベースに保存でき、イベントトリガーを通じてMySQL内のデータをリアルタイムに更新または削除することもサポートしています。MySQLデータ統合を活用することで、メッセージの保存、デバイスのオンライン/オフライン状態更新、デバイスの動作記録などの機能を簡単に実装し、柔軟なIoTデータストレージおよびデバイス管理機能を実現できます。
本ページでは、EMQXとMySQL間のデータ統合について、実践的な作成および検証手順を紹介します。
動作の仕組み
MySQLデータ統合はEMQXの標準機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的なIoTアプリケーションでは、EMQXがIoTプラットフォームとしてデバイス接続とメッセージの中継を担当し、MySQLがデータストレージプラットフォームとしてデバイス状態やメタデータ、メッセージデータの保存および分析を担当します。

EMQXはルールエンジンとSinkを介してデバイスイベントとデータをMySQLに転送します。アプリケーションはMySQL内のデータを読み取り、デバイス状態の把握やオンライン/オフライン記録の取得、デバイスデータの分析を行います。具体的なワークフローは以下の通りです:
- IoTデバイスがEMQXに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの属性情報が含まれます。
- メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンにより、特定のソースからのメッセージやイベントをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報付加などの処理を行います。
- MySQLへの書き込み:ルールによりメッセージのMySQLへの書き込みがトリガーされます。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQLを構築、MySQLに送信して実行することで、メッセージの特定フィールドをデータベースの対応テーブルやカラムに書き込みまたは更新します。
イベントおよびメッセージデータがMySQLに書き込まれた後は、MySQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です:
- Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成して変化を表示。
- デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題を早期に解消。
特長と利点
MySQLとのデータ統合により、以下の機能と利点が得られます:
- 柔軟なイベント処理:EMQXルールエンジンを通じてMySQLはデバイスのライフサイクルイベントを処理でき、IoTアプリケーション実装に必要な各種管理や監視タスクの開発が大幅に容易になります。イベントデータを分析することで、デバイスの故障や異常動作、傾向変化を迅速に検知し適切な対応が可能です。
- メッセージ変換:メッセージはEMQXルールで広範に処理・変換されてからMySQLに書き込まれるため、保存や利用がより便利になります。
- 柔軟なデータ操作:MySQL Sinkが提供するSQLテンプレートを用いて、特定フィールドのデータをMySQLの対応テーブルやカラムに簡単に書き込み・更新でき、柔軟なデータ保存・管理が可能です。
- ビジネスプロセスの統合:データ統合により、デバイスデータをMySQLの豊富なエコシステムアプリケーションと連携可能で、ERP、CRM、その他カスタムビジネスシステムとの統合を促進し、高度な業務プロセスや自動化を実現します。
- ランタイムメトリクス:各Sinkのランタイムメトリクス(総メッセージ数、成功/失敗数、現在のレートなど)を確認できます。
柔軟なイベント処理、広範なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。
はじめる前に
このセクションでは、EMQXダッシュボードでMySQLデータ統合を作成する前に必要な準備について説明します。MySQLサーバーのインストールやデータテーブルの作成が含まれます。
前提条件
MySQLサーバーのインストール
Dockerを使ってMySQLサーバーをインストールし、Dockerイメージを起動します。
# MySQLのDockerイメージを起動し、パスワードをpublicに設定
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=public -d mysql
# コンテナにアクセス
docker exec -it mysql bash
# コンテナ内でMySQLサーバーに接続し、設定したパスワードを入力
mysql -u root -p
# データベースを作成し、選択
CREATE DATABASE emqx_data CHARACTER SET utf8mb4;
use emqx_data;
データテーブルの作成
以下のSQL文を使って、MySQLデータベースに
emqx_messages
テーブルを作成します。このテーブルは各メッセージのクライアントID、トピック、ペイロード、作成日時を保存します。sqlCREATE TABLE emqx_messages ( id INT AUTO_INCREMENT PRIMARY KEY, clientid VARCHAR(255), topic VARCHAR(255), payload TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
注意:バイナリペイロードが必要な場合は、カラムを"BLOB"型で宣言してください。
以下のSQL文を使って、MySQLデータベースに
emqx_client_events
テーブルを作成します。このテーブルは各イベントのクライアントID、イベントタイプ、作成日時を保存します。sqlCREATE TABLE emqx_client_events ( id INT AUTO_INCREMENT PRIMARY KEY, clientid VARCHAR(255), event VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
コネクターの作成
このセクションでは、SinkをMySQLサーバーに接続するためのコネクターを作成する方法を説明します。
以下の手順は、EMQXとMySQLをローカルマシンで実行していることを前提としています。MySQLやEMQXがリモートで稼働している場合は、設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectors をクリックします。
- ページ右上の Create をクリックします。
- Create Connector ページで MySQL を選択し、Next をクリックします。
- Configuration ステップで以下の情報を設定します:
- Connector name:コネクターの名前を英数字で入力します(例:
my_mysql
)。 - Server Host:
127.0.0.1:3306
またはMySQLサーバーがリモートの場合は実際のホスト名を入力。 - Database Name:
emqx_data
を入力。 - Username:
root
を入力。 - Password:
public
を入力。
- Connector name:コネクターの名前を英数字で入力します(例:
- 高度な設定(任意):高度な設定を参照してください。
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがMySQLサーバーに接続できるかテストできます。
- ページ下部の Create ボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックしてルール作成に進み、Sinkを用いてMySQLに転送するデータやクライアントイベントの記録を指定できます。詳細はメッセージ保存用MySQL Sinkのルール作成およびイベント記録用MySQL Sinkのルール作成を参照してください。
メッセージ保存用MySQL Sinkのルール作成
このセクションでは、ダッシュボードでMQTTトピックt/#
からのメッセージを処理し、設定済みのSinkを介してMySQLのemqx_messages
テーブルに保存するルールの作成方法を説明します。
EMQXとMySQLをローカルマシンで実行していることを前提としています。リモート環境の場合は設定を調整してください。
EMQXダッシュボードで Integration -> Rules をクリックします。
ページ右上の Create をクリックします。
ルールIDに
my_rule
と入力し、SQL Editor に以下のステートメントを設定します。これはトピックt/#
配下のMQTTメッセージをMySQLに保存することを意味します。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT
句に含めてください。sqlSELECT * FROM "t/#"
TIP
初心者の方は、SQL Examples をクリックし、Enable Test を有効にしてSQLルールを学習・テストできます。
- Add Action ボタンをクリックして、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをMySQLに送信します。
Type of Action ドロップダウンから
MySQL
を選択します。Action ドロップダウンはデフォルトのCreate Action
のままにします。既に作成済みのSinkがあれば選択可能ですが、この例では新規Sinkを作成します。Sinkの名前を入力します。名前は英数字の組み合わせにしてください。
Connector ドロップダウンから先ほど作成した
my_mysql
を選択します。新しいコネクターを作成するには、ドロップダウン横のボタンをクリックしてください。設定パラメータはコネクターの作成を参照。使用する機能に応じてSQL Templateを設定します。
注意:これは前処理済みのSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlINSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES( ${clientid}, ${topic}, ${payload}, FROM_UNIXTIME(${timestamp}/1000) )
SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの挙動を切り替えられます:
Disabled(デフォルト):ルールエンジンは文字列
undefined
をデータベースに挿入します。Enabled:変数が未定義の場合、ルールエンジンは
NULL
を挿入します。TIP
可能な限りこのオプションは有効にしてください。無効化は後方互換性確保のためのみ推奨されます。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。これらはプライマリSinkがメッセージ処理に失敗した際にトリガーされます。詳細はフォールバックアクションを参照してください。
高度な設定(任意):高度な設定を参照してください。
Create ボタンをクリックしてSinkの設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Rule ページに戻り、設定内容を確認してCreateをクリックしルールを生成します。
これでルールの作成が完了しました。Integration -> Rules ページで新規ルールを確認できます。Actions(Sink) タブをクリックすると、新しいMySQL Sinkが表示されます。
また、Integration -> Flow Designer をクリックするとトポロジーが表示され、トピックt/#
配下のメッセージがMySQLに送信・保存されていることが確認できます。
イベント記録用MySQL Sinkのルール作成
このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みのSinkを介してMySQLのemqx_client_events
テーブルに保存するルールの作成方法を説明します。
ルール作成手順はメッセージ保存用MySQL Sinkのルール作成とほぼ同様で、SQLルール構文とSQLテンプレートのみ異なります。
オンライン/オフライン状態記録用のSQLステートメントは以下の通りです:
SELECT
*
FROM
"$events/client_connected", "$events/client_disconnected"
クライアントイベントデータをテーブルに挿入するSQLテンプレートは以下の通りです:
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
${clientid},
${event},
FROM_UNIXTIME(${timestamp}/1000)
)
ルールのテスト
MQTTXを使ってトピックt/1
にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello MySQL" }'
2つのSinkの稼働状況を確認してください。新規の受信メッセージと送信メッセージが1件ずつ、イベントレコードが2件あるはずです。
emqx_messages
テーブルにデータが書き込まれているか確認します。
mysql> select * from emqx_messages;
+----+----------+-------+--------------------------+---------------------+
| id | clientid | topic | payload | created_at |
+----+----------+-------+--------------------------+---------------------+
| 1 | emqx_c | t/1 | { "msg": "hello MySQL" } | 2022-12-09 08:44:07 |
+----+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)
emqx_client_events
テーブルにデータが書き込まれているか確認します。
mysql> select * from emqx_client_events;
+----+----------+---------------------+---------------------+
| id | clientid | event | created_at |
+----+----------+---------------------+---------------------+
| 1 | emqx_c | client.connected | 2022-12-09 08:44:07 |
| 2 | emqx_c | client.disconnected | 2022-12-09 08:44:07 |
+----+----------+---------------------+---------------------+
2 rows in set (0.00 sec)
高度な設定
このセクションでは、MySQLコネクターおよびSinkの高度な設定オプションについて詳述します。ダッシュボードでコネクターやSinkを設定する際、Advanced Settingsに移動して以下のパラメータをニーズに合わせて調整してください。
項目 | 説明 | 推奨値 |
---|---|---|
Connection Pool Size | MySQLサービスと接続する際に、接続プール内で同時に維持可能な接続数を指定します。このオプションはEMQXとMySQL間のアクティブ接続数を制御し、アプリケーションのスケーラビリティやパフォーマンス管理に役立ちます。 注意:適切な接続プールサイズはシステムリソース、ネットワークレイテンシ、アプリケーションのワークロードに依存します。大きすぎるとリソース枯渇の恐れがあり、小さすぎるとスループットが制限されます。 | 8 |
Start Timeout | コネクターが自動起動されたリソース(例:MySQLのデータベースインスタンス)が正常な状態になるまで待機する最大時間(秒)を指定します。この設定により、接続先リソースが完全に稼働しデータ取引可能であることを確認してから処理を進めます。 | 5 秒 |
Buffer Pool Size | EMQXとMySQL間の送信(egress)タイプのSinkでデータフローを管理するバッファワーカープロセス数を指定します。これらのワーカーはデータを一時的に保持し、対象サービスへ送信する役割を担います。送信専用のSinkに関連する設定であり、受信(ingress)専用のSinkには「0」を設定できます。 | 16 |
Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストがバッファに入ってからこのTTLを超えて滞留するか、送信後にMySQLからの応答やアックがタイムリーに得られない場合、リクエストは期限切れと判断されます。 | 45 秒 |
Health Check Interval | コネクターがMySQLへの接続の自動ヘルスチェックを行う間隔(秒)を指定します。 | 15 秒 |
Max Buffer Queue Size | コネクター内の各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはMySQLに送信する前のデータを一時的に保管し、データフローの効率化に寄与します。システム性能やデータ転送要件に応じて調整してください。 | 256 MB |
Max Batch Size | EMQXからMySQLへ一度に送信するデータバッチの最大サイズを指定します。サイズ調整によりデータ転送の効率やパフォーマンスを最適化できます。 「1」に設定すると、データレコードはバッチ化せず個別に送信されます。 | 1 |
Query Mode | asynchronous (非同期)またはsynchronous (同期)クエリモードを選択可能で、要件に応じてメッセージ送信を最適化します。非同期モードではMySQLへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがMySQLに到達する前にメッセージを受信する可能性があります。 | Async |
Inflight Window | 「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。コネクターがMySQLと通信する際に同時に存在可能なインフライトクエリの最大数を制御します。 Query Modeが async の場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合は、この値を1に設定してください。 | 100 |
さらに詳しく
以下のリンクから詳細情報をご覧いただけます: