Skip to content

MQTTデータをMySQLに取り込む

MySQLは、高い信頼性と安定性を持つ広く使われているリレーショナルデータベースであり、迅速にインストール、設定、利用が可能です。MySQLデータ統合は、MQTTメッセージをMySQLデータベースに効率的に保存できるだけでなく、イベントトリガーを通じてMySQL内のデータをリアルタイムに更新または削除することもサポートしています。MySQLデータ統合を活用することで、メッセージ保存、デバイスのオンライン/オフライン状態更新、デバイス動作記録などの機能を簡単に実装し、柔軟なIoTデータストレージおよびデバイス管理機能を実現できます。

本ページでは、EMQX PlatformとMySQL間のデータ統合について、実践的な作成および検証手順を紹介します。

動作概要

MySQLデータ統合はEMQX Platformに標準搭載されている機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的なIoTアプリケーションでは、EMQX PlatformがIoTプラットフォームとしてデバイス接続とメッセージの中継を担当し、MySQLがデータストレージプラットフォームとしてデバイス状態やメタデータ、メッセージデータの保存およびデータ分析を担います。

EMQX Platform MySQLデータ統合

EMQX Platformはルールエンジンを通じてデバイスイベントやデータをMySQLに転送します。アプリケーションはMySQL内のデータを読み取り、デバイス状態を把握したり、デバイスのオンライン・オフライン記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです:

  • IoTデバイスがEMQX Platformに接続:IoTデバイスがMQTTプロトコルを介して正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイスID、送信元IPアドレスなどの属性情報が含まれます。
  • メッセージのパブリッシュと受信:デバイスはテレメトリおよび状態データを特定のトピックにパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  • ルールエンジンによるメッセージ処理:組み込みのルールエンジンにより、特定のトピックに基づくメッセージやイベントを処理できます。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報付加などの処理を行います。
  • MySQLへの書き込み:ルールによりメッセージのMySQLへの書き込みがトリガーされます。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQLを構築、MySQLに送信して実行することで、メッセージの特定フィールドを対応するデータベースのテーブルやカラムに書き込んだり更新したりできます。

イベントおよびメッセージデータがMySQLに書き込まれた後は、MySQLに接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です:

  • Grafanaなどの可視化ツールに接続し、データに基づくグラフを生成してデータ変化を表示する。
  • デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題をタイムリーに解消する。

特長とメリット

MySQLとのデータ統合は、以下の特長とメリットをビジネスにもたらします:

  • 柔軟なイベント処理:EMQX Platformのルールエンジンを通じて、MySQLはデバイスのライフサイクルイベントを処理でき、IoTアプリケーションに必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイスの故障や異常動作、傾向変化を迅速に検知し、適切な対策を講じることが可能です。
  • メッセージ変換:メッセージはEMQX Platformのルールを通じて多様な処理や変換が可能であり、MySQLへの保存や利用がより便利になります。
  • 柔軟なデータ操作:EMQX Platformが提供するSQLテンプレートを活用して、特定フィールドのデータをMySQLの対応テーブル・カラムに簡単に書き込み・更新でき、柔軟なデータ保存・管理が実現します。
  • ビジネスプロセスの統合:データ統合により、デバイスデータをMySQLの豊富なエコシステムアプリケーションと連携可能にし、ERP、CRM、その他カスタムビジネスシステムとの統合を促進して高度な業務プロセスや自動化を実現します。
  • ランタイムメトリクス:各ルールの総メッセージ数、成功/失敗数、現在の処理レートなどのランタイムメトリクスの閲覧をサポートします。

柔軟なイベント処理、多様なメッセージ変換、柔軟なデータ操作、リアルタイムの監視・分析機能を通じて、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

はじめる前に

ここでは、EMQX PlatformでMySQLデータ統合を作成する前に必要な準備、MySQLサーバーのインストールやデータテーブルの作成について説明します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

MySQLサーバーのインストール

  1. Dockerを使ってMySQLサーバーをインストールし、Dockerイメージを起動します。
bash
# MySQLのDockerイメージを起動し、パスワードをpublicに設定
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=public -d mysql

# コンテナにアクセス
docker exec -it mysql bash

# コンテナ内でMySQLサーバーに接続し、設定したパスワードを入力
mysql -u root -p

# データベースを作成し、選択
CREATE DATABASE emqx_data CHARACTER SET utf8mb4;
use emqx_data;
  1. テーブルを作成します。以下のSQLコマンドを使ってtemp_humテーブルを作成します。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。
sql
CREATE TABLE `temp_hum` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `up_timestamp` timestamp NULL DEFAULT NULL,
  `client_id` varchar(32) DEFAULT NULL,
  `temp` float unsigned DEFAULT NULL,
  `hum` float unsigned DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `up_timestamp_client_id` (`up_timestamp`,`client_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4;

MySQLコネクターの作成

データ統合ルールを作成する前に、MySQLサーバーにアクセスするためのMySQLコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるMySQLを選択します。すでにコネクターを作成済みの場合は、新しいコネクターを選択し、続いてデータ永続化カテゴリのMySQLを選択します。

  3. コネクター名:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • サーバーホスト:サーバーのIPアドレスとポート。
    • データベース名emqx_dataと入力。
    • ユーザー名rootと入力。
    • パスワードpublicと入力。
    • 暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをクリックします。
  5. 詳細設定(任意):詳細設定を参照してください。

  6. テストボタンをクリックします。MySQLサービスにアクセス可能であれば、成功のメッセージが表示されます。

  7. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをMySQLに転送するためのアクションをルールに追加します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールマッチング用のSQL文を入力します。以下のルールでは、メッセージが報告された時間up_timestamp、クライアントID、ペイロードをtemp_hum/emqxトピックから読み取ります。また、このトピックから温度と湿度も取得します。

    sql
     SELECT
       timestamp as up_timestamp,
       clientid as client_id,
       payload.temp as temp,
       payload.hum as hum
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、テストを有効にするを使ってSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 利用する機能に応じてSQLテンプレートを設定します:

    注意:これは前処理済みのSQLのため、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
     INSERT INTO temp_hum (up_timestamp, client_id, temp, hum)
     VALUES (
       FROM_UNIXTIME(${up_timestamp}/1000),
       ${client_id},
       ${temp},
       ${hum}
     )

    SQLテンプレート内でプレースホルダー変数が未定義の場合は、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を設定できます:

    • 無効(デフォルト):ルールエンジンは文字列undefinedをデータベースに挿入します。

    • 有効:変数が未定義の場合、NULLをデータベースに挿入します。

      TIP

      可能な限りこのオプションは有効にすることを推奨します。無効にするのは後方互換性を確保する場合のみです。

  6. 詳細設定(任意):詳細設定を参照してください。

  7. 確認ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンが完了します。

ルールのテスト

MQTTXを使って温度と湿度のデータ報告をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. データダンプ結果を確認します。

bash
mysql> SELECT * FROM temp_hum ORDER BY up_timestamp DESC LIMIT 10;
+----+---------------------+-------------+------+------+
| id | up_timestamp        | client_id   | temp | hum  |
+----+---------------------+-------------+------+------+
| 26 | 2024-03-20 08:44:55 | test_client | 27.5 | 41.8 |
+----+---------------------+-------------+------+------+
1 row in set (0.00 sec)
  1. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルール配下のすべてのアクションの統計情報を閲覧できます。