Skip to content

Microsoft SQL ServerへのMQTTデータ取り込み

SQL Serverは、企業や組織のさまざまな規模や種類で広く利用されている主要なリレーショナル商用データベースソリューションの一つです。EMQXはSQL Serverとの統合をサポートしており、MQTTメッセージやクライアントイベントをSQL Serverに保存できます。これにより、複雑なデータパイプラインや分析プロセスの構築、データ管理・分析、デバイス接続の管理、ERP、CRM、BIなどの他の企業システムとの統合が容易になります。

本ページでは、EMQXとMicrosoft SQL Server間のデータ統合について詳細に解説し、データ統合の作成および検証方法を実践的に説明します。

TIP

Microsoft SQL Serverとのデータ統合は、EMQX Enterprise 5.0.3以降でサポートされています。

動作概要

Microsoft SQL Serverとのデータ統合はEMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送信機能とMicrosoft SQL Serverの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントとSinkを通じて、MQTTメッセージやクライアントイベントをMicrosoft SQL Serverに保存できます。さらに、イベントによりMicrosoft SQL Server内のデータの更新や削除をトリガーでき、デバイスのオンライン状態や接続履歴などの情報を記録可能です。この統合により、EMQXからSQL Serverへのデータ取り込みが簡素化され、複雑なコーディングを必要としません。

下図はEMQXとSQL Server間の典型的なデータ統合アーキテクチャを示しています。

EMQX Integration SQL Server

Microsoft SQL ServerへのMQTTデータ取り込みは以下のように動作します:

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、機械、センサー、製品ラインの稼働状態、計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、どのメッセージをMicrosoft SQL Serverにルーティングするかを判断します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. SQL Serverへのデータ取り込み:ルールがメッセージのMicrosoft SQL Serverへの書き込みをトリガーします。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQL文を構築し、SQL Serverで実行することで、メッセージの特定フィールドを対応するテーブルやカラムに書き込んだり更新したりします。
  4. データの保存と活用:データがMicrosoft SQL Serverに保存された後、企業はそのクエリ機能を活用してさまざまなユースケースに対応できます。

特長とメリット

Microsoft SQL Serverとのデータ統合は、効率的なデータ送信、保存、活用を実現するための多彩な特長とメリットを提供します:

  • リアルタイムデータストリーミング:EMQXはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMicrosoft SQL Serverへの効率的かつ信頼性の高いデータ送信を保証します。リアルタイムのデータ取得と分析を可能にし、即時の洞察やアクションが必要なユースケースに最適です。
  • 高性能かつスケーラブル:EMQXとMicrosoft SQL Serverは共に拡張性と信頼性を備え、大規模なIoTデータの処理に適しています。需要の増加に応じて水平・垂直の拡張が途切れなく可能であり、IoTアプリケーションの継続性と信頼性を確保します。
  • 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを提供し、Microsoft SQL Serverに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、強化など多様なデータ変換機構をサポートし、組織のニーズに合わせてデータを整形可能です。
  • 高度な分析機能:Microsoft SQL ServerはAnalysis Servicesを通じた多次元データモデル構築など強力な分析機能を備え、複雑なデータ分析やデータマイニングを支援します。Reporting Servicesによるレポート作成・公開も可能で、IoTデータの洞察や分析結果を関係者に提示できます。

はじめる前に

このセクションでは、Microsoft SQL Serverデータ統合の作成を始める前に必要な準備について説明します。ODBCドライバーのインストールと設定、Microsoft SQL Serverのインストールと接続、データベースおよびデータテーブルの作成方法を含みます。

前提条件

ODBCドライバーのインストールと設定

Microsoft SQL ServerデータベースにアクセスするためにODBCドライバーを設定する必要があります。ODBCドライバーとしては、FreeTDSまたはMicrosoft提供のmsodbcsql18ドライバーのいずれかを使用できます。

EMQXはodbcinst.ini設定で指定されたDSN名を使ってドライバーの動的ライブラリのパスを判別します。以下の例ではDSN名をms-sqlとしています。詳細は接続プロパティを参照してください。

注意

DSN名は任意に設定可能ですが、英字のみの使用を推奨します。またDSN名は大文字・小文字を区別します。

msodbcsql18ドライバーのODBCドライバーとしてのインストールと設定

msodbcsql18ドライバーを使用する場合は、Microsoftの公式手順を参照してください:

MicrosoftのEULA条件により、EMQXが提供するDockerイメージにはmsodbcsql18ドライバーは含まれていません。DockerやKubernetesで使用する場合は、EMQX Enterpriseが提供するイメージをベースにODBCドライバーをインストールした新しいイメージを作成する必要があります。新しいイメージの使用はMicrosoft SQL Server EULAへの同意を意味します。

以下の手順で新しいイメージをビルドしてください:

  1. 以下のDockerfileを使用して新しいイメージをビルドします。

    この例のベースイメージバージョンはemqx/emqx-enterprise:5.8.1です。必要に応じて使用したいEMQX Enterpriseバージョンを指定するか、最新バージョンのemqx/emqx-enterprise:latestを使用してください。

dockerfile
FROM emqx/emqx-enterprise:5.8.1

USER root

RUN apt-get -qq update && apt-get install -yqq curl gpg && \
    . /etc/os-release && \
    curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg && \
    curl -fsSL "https://packages.microsoft.com/config/${ID}/${VERSION_ID}/prod.list" > /etc/apt/sources.list.d/mssql-release.list && \
    apt-get -qq update && \
    ACCEPT_EULA=Y apt-get install -yqq msodbcsql18 unixodbc-dev && \
    sed -i 's/ODBC Driver 18 for SQL Server/ms-sql/g' /etc/odbcinst.ini && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

USER emqx
  1. docker build -t emqx/emqx-enterprise:5.8.1-msodbcコマンドで新しいイメージをビルドします。

  2. ビルド後、docker image lsでローカルイメージ一覧を確認できます。必要に応じてイメージのアップロードや保存も可能です。

注意

この例でmsodbcsql18ドライバーをインストールした場合、odbcinst.iniのDSN名はms-sqlとなっています。必要に応じてDSN名は変更可能です。

FreeTDSのODBCドライバーとしてのインストールと設定

ここでは主要なディストリビューションでFreeTDSをODBCドライバーとしてインストール・設定する方法を紹介します。

MacOSでのFreeTDS ODBCドライバーのインストールと設定:

bash
$ brew install unixodbc freetds
$ vim /usr/local/etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver      = /usr/local/lib/libtdsodbc.so
Setup       = /usr/local/lib/libtdsodbc.so
FileUsage   = 1

CentOSでのFreeTDS ODBCドライバーのインストールと設定:

bash
$ yum install unixODBC unixODBC-devel freetds freetds-devel perl-DBD-ODBC perl-local-lib
$ vim /etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver      = /usr/lib64/libtdsodbc.so
Setup       = /usr/lib64/libtdsS.so.2
Driver64    = /usr/lib64/libtdsodbc.so
Setup64     = /usr/lib64/libtdsS.so.2
FileUsage   = 1

UbuntuでのFreeTDS ODBCドライバーのインストールと設定(Ubuntu20.04を例に、他バージョンは公式ODBCドキュメント参照):

bash
$ apt-get install unixodbc unixodbc-dev tdsodbc freetds-bin freetds-common freetds-dev libdbd-odbc-perl liblocal-lib-perl
$ vim /etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver      = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
Setup       = /usr/lib/x86_64-linux-gnu/odbc/libtdsS.so
FileUsage   = 1

Microsoft SQL Serverのインストールと接続

このセクションでは、Dockerイメージを用いてLinux/MacOS上でMicrosoft SQL Server 2019を起動し、sqlcmdで接続する方法を説明します。その他のインストール方法はMicrosoft SQL Serverインストールガイドを参照してください。

  1. DockerでMicrosoft SQL Serverをインストールし、以下のコマンドでDockerイメージを起動します。パスワードはmqtt_public1を使用します。Microsoft SQL Serverのパスワードポリシーはパスワードの複雑性を参照してください。

    注意:環境変数ACCEPT_EULA=Yを設定してDockerコンテナを起動することで、MicrosoftのEULAに同意したことになります。詳細はエンドユーザー使用許諾契約を参照してください。

    bash
    # Microsoft SQL ServerのDockerイメージを起動し、パスワードをmqtt_public1に設定
    $ docker run --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_SA_PASSWORD=mqtt_public1 -d mcr.microsoft.com/mssql/server:2022-CU15-ubuntu-22.04
  2. コンテナにアクセスします。

    bash
    docker exec -it sqlserver bash
  3. コンテナ内で設定したパスワードを入力してサーバーに接続します。パスワード入力時は文字が表示されません。入力後はそのままEnterを押してください。

    bash
    $ /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P mqtt_public1 -N -C
    1>

    TIP

    Microsoftが提供するMicrosoft SQL Serverコンテナにはmssql-tools18パッケージがインストールされていますが、実行ファイルは$PATHに含まれていません。そのため、sqlcmdを使用する際は実行ファイルのパスを指定する必要があります。この例のDocker環境ではパスは/optです。

    mssql-tools18の詳細な使い方はsqlcmdユーティリティを参照してください。

これでMicrosoft SQL Server 2022インスタンスがデプロイされ、接続可能な状態になりました。

データベースとデータテーブルの作成

前節で作成した接続を使い、以下のSQL文でデータテーブルを作成します。

TIP

ODBCインターフェースの制限により、CJK文字や絵文字などのUnicode文字を書き込む場合は、挿入前にバイナリ形式に変換する関数を使用する必要があります。テーブル作成時にはUnicode文字を格納するカラムの型をNVARCHARに設定してください。

  • MQTTメッセージを保存するためのデータテーブルを作成します。メッセージID、トピック、QoS、ペイロード、パブリッシュ時刻を含みます。

    sql
    CREATE TABLE dbo.t_mqtt_msg (id int PRIMARY KEY IDENTITY(1000000001,1) NOT NULL,
                                 msgid   VARCHAR(64) NULL,
                                 topic   VARCHAR(100) NULL,
                                 qos     tinyint NOT NULL DEFAULT 0,
                                 payload VARCHAR(100) NULL,
                                 arrived DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP);
    GO
  • クライアントのオンライン/オフライン状態を記録するためのデータテーブルを作成します。

    sql
    CREATE TABLE dbo.t_mqtt_events (id int PRIMARY KEY IDENTITY(1000000001,1) NOT NULL,
                                    clientid VARCHAR(255) NULL,
                                    event_type VARCHAR(255) NULL,
                                    event_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP);
    GO

コネクターの作成

このセクションでは、SinkをMicrosoft SQL Serverに接続するためのコネクターの作成方法を示します。

以下の手順は、EMQXとMicrosoft SQL Serverをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。

  2. ページ右上のCreateをクリックします。

  3. Create ConnectorページでMicrosoft SQL Serverを選択し、Nextをクリックします。

  4. Configurationステップで以下の情報を設定します:

    • Connector name:コネクター名を入力します。英大文字・小文字と数字の組み合わせで、例:my_sqlserver

    • Server Host127.0.0.1:1433を入力するか、Microsoft SQL Serverがリモートの場合はそのURLを入力します。

      TIP

      Named Instanceを使用している場合は、インスタンスが動作するポート番号を明示的に指定する必要があります。ドライバーは指定されたポートでインスタンスに接続し、ヘルスチェック時にEMQXがインスタンス名を推測します。

      Server Hostフィールドにインスタンス名のみ(例:MYSERVER\SQL2022)を指定しても正しいインスタンスに接続できる保証はありません。必ずポート設定を確認してください。

    • Database Namemasterを入力します。

    • Usernamesaを入力します。

    • Password:事前設定したパスワードmqtt_public1または実際のパスワードを入力します。

    • SQL Server Driver Namems-sqlを入力します。これはodbcinst.iniで設定したDSN名です。

  5. 高度な設定(任意):詳細はSinkの特長を参照してください。

  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがMicrosoft SQL Serverに接続できるかテストできます。

  7. ページ下部のCreateボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSinkを指定したルールの作成を続行できます。詳細はメッセージ保存用Microsoft SQL Server Sinkのルール作成およびイベント記録用Microsoft SQL Server Sinkのルール作成を参照してください。

メッセージ保存用Microsoft SQL Server Sinkのルール作成

このセクションでは、ダッシュボードでソースMQTTトピックt/#からのメッセージを処理し、処理済みデータを設定済みSinkを通じてMicrosoft SQL Serverのテーブルdbo.t_mqtt_msgに保存するルールの作成方法を示します。

  1. EMQXダッシュボードでIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力します。メッセージ保存用ルールを作成するには、SQL Editorに以下の文を入力します。これはトピックt/#配下のMQTTメッセージをMicrosoft SQL Serverに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT部分に含めてください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    ODBCインターフェースの制限により、CJK文字や絵文字などのUnicode文字を書き込む場合は挿入前にバイナリ形式に変換する関数を使用する必要があります。

    ルール作成時に組み込み関数を使い、文字列をUTF-16リトルエンディアンエンコードのバイナリ文字列に変換可能です。例:

    sql
    SELECT
      sqlserver_bin2hexstr(str_utf16_le(payload)) as payload,
      *
    FROM
      "t/#"

    TIP

    初心者ユーザーはSQL ExamplesEnable TestをクリックしてSQLルールの学習やテストが可能です。

    • Add Actionボタンをクリックし、ルールがトリガーされた際のアクションを定義します。このアクションにより、EMQXはルールで処理したデータをMicrosoft SQL Serverに送信します。
  4. Type of ActionのドロップダウンからMicrosoft SQL Serverを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのMicrosoft SQL Server Sinkを選択することも可能ですが、この例では新規Sinkを作成します。

  5. Sinkの名前を入力します。名前は英大文字・小文字と数字の組み合わせにしてください。

  6. Connectorのドロップダウンから先に作成したmy_sqlserverを選択します。隣のボタンで新規コネクター作成も可能です。設定パラメーターはコネクター作成を参照してください。

  7. メッセージ保存用のSQL Templateを以下のSQL文で設定します。

    注意:これは前処理済みのSQLなので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
    insert into dbo.t_mqtt_msg(msgid, topic, qos, payload) values ( ${id}, ${topic}, ${qos}, ${payload} )

    TIP

    ODBCインターフェースの制限により、Unicode文字を書き込む場合は挿入前にバイナリ形式に変換する関数を使用してください。

    SQLテンプレート内でCONVERT関数を使い、Microsoft SQL Server側で対応するバイナリデータを文字列に変換可能です。

    sql
    insert into dbo.t_mqtt_msg(msgid, topic, qos, payload) values ( ${id}, ${topic}, ${qos}, CONVERT(NVARCHAR(100), ${payload}) )

    SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの挙動を設定できます:

    • Disabled(デフォルト):ルールエンジンはundefined文字列をデータベースに挿入します。

    • Enabled:未定義変数の場合、ルールエンジンはNULLをデータベースに挿入します。

      TIP

      可能な限りこのオプションは有効にしてください。無効化は後方互換性確保のためのみ推奨されます。

  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照してください。

  9. 高度な設定(任意):詳細はSinkの特長を参照してください。

  10. Createをクリックする前に、Test ConnectivityをクリックしてSinkがMicrosoft SQL Serverに接続できるかテストできます。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認します。Createボタンをクリックしてルールを生成します。

これでMicrosoft SQL Server Sink用のルール作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいMicrosoft SQL Server Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleで解析されMicrosoft SQL Serverに送信・保存されている様子が確認できます。

イベント記録用Microsoft SQL Server Sinkのルール作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSinkを通じてMicrosoft SQL Serverのテーブルdbo.t_mqtt_eventsに保存するルール作成方法を示します。

手順はメッセージ保存用Microsoft SQL Server Sinkのルール作成とほぼ同じで、SQLテンプレートとSQLルールのみ異なります。

オンライン/オフライン状態記録用のルールSQL文は以下の通りです。

sql
SELECT
  *,
  floor(timestamp / 1000) as s_shift,
  timestamp div 1000 as ms_shift
FROM
  "$events/client_connected", "$events/client_disconnected"

イベント記録用のSQLテンプレートは以下の通りです。

sql
insert into dbo.t_mqtt_events(clientid, event_type, event_time) values ( ${clientid}, ${event}, DATEADD(MS, ${ms_shift}, DATEADD(S, ${s_shift}, '19700101 00:00:00:000') ) )

ルールのテスト

MQTT Xを使い、トピックt/1にメッセージを送信してオンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello SQL Server" }'

Microsoft SQL Server Sinkの実行統計を確認します。

  • メッセージ保存用Sinkでは、新たに1件のマッチングと1件の送信済みメッセージがあるはずです。dbo.t_mqtt_msgデータテーブルにデータが書き込まれているか確認してください。
bash
1> SELECT * from dbo.t_mqtt_msg
2> GO
id          msgid                                                            topic                                                                                                qos payload                                                                                              arrived
----------- ---------------------------------------------------------------- ---------------------------------------------------------------------------------------------------- --- ---------------------------------------------------------------------------------------------------- -----------------------
 1000000001 0005F995096D9466F442000010520002                                 t/1                                                                                                    0 { "msg": "Hello SQL Server" }                                                                        2023-04-18 04:49:47.170

(1 rows affected)
1>
  • オンライン/オフライン状態記録用Sinkでは、新たに2件のイベント(クライアント接続と切断)が記録されているはずです。dbo.t_mqtt_eventsデータテーブルに状態記録が書き込まれているか確認してください。
bash
1> SELECT * from dbo.t_mqtt_events
2> GO
id          clientid                                                         event_type                                                                                                                                                                                                    event_time
----------- ---------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -----------------------
 1000000001 emqx_c                                                           client.connected                                                                                                                                                                                              2023-04-18 04:49:47.140
 1000000002 emqx_c                                                           client.disconnected                                                                                                                                                                                           2023-04-18 04:49:47.180

(2 rows affected)
1>