Skip to content

MQTTデータをMySQLに取り込む

MySQLは、高い信頼性と安定性を持つ広く使われているリレーショナルデータベースであり、迅速にインストール、設定、利用が可能です。MySQLデータ統合は、MQTTメッセージを効率的にMySQLデータベースに保存できるほか、イベントトリガーを通じてMySQL内のデータをリアルタイムに更新または削除することもサポートしています。MySQLデータ統合を活用することで、メッセージの保存、デバイスのオンライン/オフライン状態の更新、デバイスの動作記録などの機能を簡単に実装し、柔軟なIoTデータストレージおよびデバイス管理機能を実現できます。

本ページでは、EMQX CloudとMySQL間のデータ統合について、作成および検証の実践的な手順を紹介します。

動作概要

MySQLデータ統合はEMQX Cloudに標準搭載された機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的なIoTアプリケーションにおいて、EMQX CloudはIoTプラットフォームとしてデバイスの接続とメッセージの中継を担当し、MySQLはデータストレージプラットフォームとしてデバイスの状態やメタデータ、メッセージデータの保存およびデータ分析を担当します。

EMQX Cloud MySQLデータ統合

EMQX Cloudはルールエンジンを通じてデバイスのイベントやデータをMySQLに転送します。アプリケーションはMySQL内のデータを読み取り、デバイスの状態を把握したり、デバイスのオンライン・オフライン記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです。

  • IoTデバイスがEMQX Cloudに接続:IoTデバイスがMQTTプロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの属性情報が含まれます。
  • メッセージのパブリッシュと受信:デバイスはテレメトリおよび状態データを特定のトピックにパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  • ルールエンジンによるメッセージ処理:組み込みのルールエンジンにより、特定のトピックに基づいてメッセージやイベントを処理できます。ルールエンジンは該当するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
  • MySQLへの書き込み:ルールによりメッセージをMySQLに書き込む処理がトリガーされます。SQLテンプレートを用いてルール処理結果からデータを抽出し、SQLを構築してMySQLに送信し、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりします。

イベントおよびメッセージデータがMySQLに書き込まれた後は、MySQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です。

  • Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
  • デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題を迅速に解消する。

特長と利点

MySQLとのデータ統合は、以下のような特長と利点をビジネスにもたらします。

  • 柔軟なイベント処理:EMQX Cloudのルールエンジンを通じて、MySQLはデバイスのライフサイクルイベントを処理でき、IoTアプリケーション実装に必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイスの故障や異常動作、トレンド変化を迅速に検知し、適切な対応を取れます。
  • メッセージ変換:メッセージはEMQX Cloudのルールを通じて大規模な処理・変換が可能であり、保存や利用がより便利になります。
  • 柔軟なデータ操作:EMQX Cloudが提供するSQLテンプレートを利用して、特定フィールドのデータをMySQLデータベースの対応テーブルやカラムに簡単に書き込み・更新でき、柔軟なデータ保存と管理を実現します。
  • 業務プロセスの統合:データ統合により、デバイスデータをMySQLの豊富なエコシステムアプリケーションと連携可能であり、ERP、CRM、その他カスタム業務システムとの統合を促進し、高度な業務プロセスや自動化を実現します。
  • ランタイムメトリクス:各ルールの総メッセージ数、成功/失敗数、現在の処理レートなどの実行時メトリクスの閲覧をサポートします。

柔軟なイベント処理、大規模なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。

はじめる前に

このセクションでは、EMQX CloudでMySQLデータ統合を作成する前に必要な準備について説明します。MySQLサーバーのインストールやデータテーブルの作成が含まれます。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

MySQLサーバーのインストール

  1. Dockerを使ってMySQLサーバーをインストールし、Dockerイメージを起動します。
bash
# MySQLのDockerイメージを起動し、パスワードをpublicに設定
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=public -d mysql

# コンテナにアクセス
docker exec -it mysql bash

# コンテナ内でMySQLサーバーに接続し、パスワードを入力
mysql -u root -p

# データベースを作成し、選択
CREATE DATABASE emqx_data CHARACTER SET utf8mb4;
use emqx_data;
  1. テーブルを作成します。以下のSQLコマンドでtemp_humテーブルを作成します。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。
sql
CREATE TABLE `temp_hum` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `up_timestamp` timestamp NULL DEFAULT NULL,
  `client_id` varchar(32) DEFAULT NULL,
  `temp` float unsigned DEFAULT NULL,
  `hum` float unsigned DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `up_timestamp_client_id` (`up_timestamp`,`client_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4;

MySQLコネクターの作成

データ統合ルールを作成する前に、MySQLサーバーにアクセスするためのMySQLコネクターを作成する必要があります。

  1. ご自身のデプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるMySQLを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリのMySQLを選択します。

  3. コネクター名:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • サーバーホスト:サーバーのIPアドレスとポート番号。
    • データベース名emqx_dataを入力。
    • ユーザー名rootを入力。
    • パスワードpublicを入力。
    • 暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをクリックします。
  5. 詳細設定(任意):詳細設定を参照してください。

  6. テストボタンをクリックします。MySQLサービスにアクセス可能であれば、成功メッセージが表示されます。

  7. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをMySQLに転送するためのアクションをルールに追加する必要があります。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、メッセージが報告された時間up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードから温度と湿度を読み取ります。

    sql
     SELECT
       timestamp as up_timestamp,
       clientid as client_id,
       payload.temp as temp,
       payload.hum as hum
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 利用する機能に応じてSQLテンプレートを設定します。

    注意:これは事前処理されたSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
     INSERT INTO temp_hum (up_timestamp, client_id, temp, hum)
     VALUES (
       FROM_UNIXTIME(${up_timestamp}/1000),
       ${client_id},
       ${temp},
       ${hum}
     )

    SQLテンプレート内でプレースホルダ変数が未定義の場合、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を設定できます。

    • 無効(デフォルト):ルールエンジンはundefinedという文字列をデータベースに挿入します。

    • 有効:変数が未定義の場合、ルールエンジンはNULLをデータベースに挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を確保する場合のみです。

  6. 詳細設定(任意):詳細設定を参照してください。

  7. 確認ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温度と湿度のデータ報告をシミュレートするには、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック: temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. データのダンプ結果を確認します。

bash
mysql> SELECT * FROM temp_hum ORDER BY up_timestamp DESC LIMIT 10;
+----+---------------------+-------------+------+------+
| id | up_timestamp        | client_id   | temp | hum  |
+----+---------------------+-------------+------+------+
| 26 | 2024-03-20 08:44:55 | test_client | 27.5 | 41.8 |
+----+---------------------+-------------+------+------+
1 row in set (0.00 sec)
  1. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールおよびルール内の全アクションの統計情報が表示されます。