Skip to content

Microsoft SQL ServerへのMQTTデータ取り込み

SQL Serverは、企業や組織の規模や種類を問わず広く利用されている主要な商用リレーショナルデータベースソリューションの一つです。EMQXプラットフォームはSQL Serverとの連携をサポートしており、MQTTメッセージやクライアントイベントをSQL Serverに保存できます。これにより、複雑なデータパイプラインや分析プロセスの構築、データ管理・分析、デバイス接続の管理、ERPやCRM、BIなどの他の企業システムとの統合が容易になります。

本ページでは、EMQXプラットフォームとMicrosoft SQL Server間のデータ統合について、実践的な手順を交えて詳しく解説します。

動作概要

Microsoft SQL Serverとのデータ統合は、EMQXプラットフォームに標準搭載された機能であり、EMQXプラットフォームのデバイス接続およびメッセージ送受信機能とMicrosoft SQL Serverの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントを通じて、MQTTメッセージやクライアントイベントをMicrosoft SQL Serverに保存できます。さらに、イベントに応じてMicrosoft SQL Server内のデータ更新や削除をトリガーできるため、デバイスのオンライン状態や接続履歴の記録なども可能です。この統合により、EMQXプラットフォームからSQL Serverへのデータ取り込みが簡素化され、複雑なコーディングは不要となります。

以下の図は、EMQXとSQL Server間の典型的なデータ統合アーキテクチャを示しています。

EMQX Integration SQL Server

Microsoft SQL ServerへのMQTTデータ取り込みは以下のように動作します。

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを用いてEMQXプラットフォームに正常に接続し、機械やセンサー、製造ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQXプラットフォームにパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXプラットフォームに定義されたルールに従って処理されます。ルールは事前に定義された条件に基づき、Microsoft SQL Serverにルーティングすべきメッセージを判別します。ペイロード変換を指定するルールがあれば、データ形式の変換や特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. SQL Serverへのデータ取り込み:ルールがトリガーされると、メッセージをMicrosoft SQL Serverに書き込みます。SQLテンプレートを利用して、ルール処理結果からデータを抽出しSQL文を構築、SQL Serverに送信して実行することで、メッセージの特定フィールドをデータベースの対応テーブル・カラムに書き込んだり更新したりします。
  4. データの保存と活用:データがMicrosoft SQL Serverに保存されることで、企業はその強力なクエリ機能を活用し、さまざまなユースケースに対応できます。

特長とメリット

Microsoft SQL Serverとのデータ統合は、効率的なデータ送信、保存、活用を実現するために以下のような特長とメリットを備えています。

  • リアルタイムデータストリーミング:EMQXプラットフォームはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMicrosoft SQL Serverへの効率的かつ信頼性の高いデータ送信を保証します。即時の洞察やアクションが求められるユースケースに適しています。
  • 高性能かつスケーラブル:EMQXプラットフォームとMicrosoft SQL Serverは共に拡張性と信頼性を備えており、大規模なIoTデータの処理に対応可能です。需要の増加に応じて水平・垂直の拡張が途切れなく行え、IoTアプリケーションの継続性と信頼性を確保します。
  • 柔軟なデータ変換:EMQXプラットフォームの強力なSQLベースのルールエンジンにより、Microsoft SQL Serverに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、強化など多様なデータ変換機能をサポートし、ニーズに応じたデータ整形を実現します。
  • 高度な分析機能:Microsoft SQL Serverは、Analysis Servicesによる多次元データモデルの構築やデータマイニングを通じた高度な分析機能を提供します。Reporting Servicesを用いたレポート作成・公開も可能で、IoTデータの洞察や分析結果を関係者に提示できます。

はじめる前に

このセクションでは、Microsoft SQL Serverデータ統合を作成する前に必要な準備について説明します。Microsoft SQL Serverのインストールと接続、データベースおよびデータテーブルの作成、ODBCドライバーのインストールと設定方法が含まれます。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Microsoft SQL Serverのインストールと接続

このセクションでは、Dockerイメージを使ってLinux/MacOS上でMicrosoft SQL Server 2019を起動し、sqlcmdで接続する方法を説明します。その他のインストール方法については、Microsoft SQL Serverインストールガイドをご参照ください。

  1. Dockerを使ってMicrosoft SQL Serverをインストールし、以下のコマンドでDockerイメージを起動します。パスワードはmqtt_public1を使用します。Microsoft SQL Serverのパスワードポリシーについてはパスワードの複雑性を参照してください。

    注意:環境変数ACCEPT_EULA=Yを指定してDockerコンテナを起動することで、MicrosoftのEULAに同意したことになります。詳細はMICROSOFT SOFTWARE LICENSE TERMS MICROSOFT SQL SERVER 2019 STANDARD(EN_US)をご確認ください。

    bash
    # Microsoft SQL ServerのDockerイメージを起動し、パスワードを`mqtt_public1`に設定
    $ docker run --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_SA_PASSWORD=mqtt_public1 -d mcr.microsoft.com/mssql/server:2019-CU19-ubuntu-20.04
  2. コンテナにアクセスします。

    bash
    docker exec -it sqlserver bash
  3. コンテナ内のサーバーに接続するため、設定したパスワードを入力します。パスワード入力時は文字が表示されません。入力後、直接Enterを押してください。

    bash
    $ /opt/mssql-tools/bin/sqlcmd -S 127.0.0.1 -U sa
    $ Password:
    1>

    TIP

    Microsoft SQL Serverコンテナにはmssql-toolsがインストールされていますが、実行ファイルは$PATHに含まれていません。そのため、mssql-toolsの実行ファイルパスを指定する必要があります。本例のDocker環境ではパスは/optです。

    mssql-toolsの使用方法についてはsqlcmd-utilityをご参照ください。

ここまででMicrosoft SQL Server 2019のインスタンスがデプロイされ、接続可能な状態になりました。

データベースとデータテーブルの作成

このセクションでは、Microsoft SQL Serverでデータベースとデータテーブルを作成する方法を説明します。

  1. 前節で作成した接続を使い、Microsoft SQL Serverでemqxデータベースを作成します。

    bash
     ...
     Password:
     1> USE master
     2> GO
     Changed database context to 'master'.
     1> CREATE DATABASE emqx;
     2> GO
  2. 以下のSQL文でデータテーブルを作成します。

    • temp_humテーブルを作成します。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。

      sql
       CREATE TABLE temp_hum(
         client_id VARCHAR(64) NULL,
         temp NVARCHAR(100) NULL,
         hum NVARCHAR(100) NULL,
         up_timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
       );
       GO;

Microsoft SQL Serverコネクターの作成

データ統合ルールを作成する前に、Microsoft SQL Serverにアクセスするためのコネクターを作成する必要があります。

  1. デプロイメント画面に移動し、左ナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からMicrosoft SQL serverを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択してからMicrosoft SQL serverを選択してください。

  3. コネクター名はシステムが自動生成します。

  4. 接続情報を入力します。

    • サーバーホスト:サーバーのIPアドレスとポート番号
    • データベース名emqxと入力
    • ユーザー名saと入力
    • パスワード:事前に設定したパスワードmqtt_public1または実際のパスワードを入力
    • SQL Serverドライバー名:EMQXプラットフォームにデフォルトでインストールされているODBC Driver 17 for SQL Serverを入力
  5. テストボタンをクリックし、Microsoft SQL Serverサービスにアクセス可能であれば成功メッセージが表示されます。

  6. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータをMicrosoft SQL Serverに転送するためのアクションをルールに追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールマッチング用のSQL文を入力します。以下のルールでは、メッセージが報告された時刻up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードから温度と湿度を取得します。

    sql
     SELECT
       timestamp as up_timestamp,
       clientid as client_id,
       payload.temp as temp,
       payload.hum as hum
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方はSQL例をクリックし、テスト有効化を利用してSQLルールの学習とテストが可能です。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 利用する機能に応じてSQLテンプレートを設定します。注意:これは前処理済みのSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
     INSERT INTO temp_hum(client_id, temp, hum)
     VALUES (
       ${client_id},
       ${temp},
       ${hum}
     )

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQLテンプレート上部の未定義変数をNULLとして扱うスイッチでルールエンジンの動作を設定できます。

    • 無効(デフォルト):ルールエンジンは未定義変数に文字列undefinedを挿入します。

    • 有効:未定義変数の場合、ルールエンジンはNULLを挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。

  6. 詳細設定(任意)を行います。

  7. 確定ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

温度・湿度データの報告をシミュレートするために、MQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがMicrosoft SQL Serverに転送されているか確認します。

    bash
     1> SELECT * FROM temp_hum ORDER BY up_timestamp;
     2> GO
     client_id                                                        temp         hum          up_timestamp
     ---------------------------------------------------------------- ------------ ------------ -----------------------
     test_client                                                             27.50        41.80 2024-03-25 05:49:21.237
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、そのルールの統計情報およびルール配下の全アクションの統計情報を閲覧できます。