Microsoft SQL Server への MQTT データ取り込み
SQL Serverは、企業や組織の規模や種類を問わず広く利用されている主要なリレーショナル商用データベースソリューションの一つです。EMQXはSQL Serverとの連携をサポートしており、MQTTメッセージやクライアントイベントをSQL Serverに保存できます。これにより、複雑なデータパイプラインや分析処理を構築し、データ管理・分析やデバイス接続管理、ERP、CRM、BIなどの他の企業システムとの統合が可能になります。
本ページでは、EMQXとMicrosoft SQL Server間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。
TIP
Microsoft SQL Server とのデータ統合は EMQX Enterprise 5.0.3 以降でサポートされています。
動作概要
Microsoft SQL Serverとのデータ統合はEMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送受信機能とMicrosoft SQL Serverの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントとSinkを利用して、MQTTメッセージやクライアントイベントをMicrosoft SQL Serverに保存できます。さらに、イベントによりMicrosoft SQL Server内のデータの更新や削除をトリガーし、デバイスのオンライン状態や接続履歴などの情報を記録可能です。この統合により、EMQXからSQL Serverへのデータ取り込みが簡素化され、複雑なコーディングを必要としません。
以下の図は、EMQXとSQL Server間の典型的なデータ統合アーキテクチャを示しています。

Microsoft SQL ServerへのMQTTデータ取り込みは以下のように動作します。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルでEMQXに正常に接続し、機械、センサー、製造ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQXにパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQXに定義されたルールで処理されます。ルールは事前定義された条件に基づき、どのメッセージをMicrosoft SQL Serverにルーティングするかを判断します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
- SQL Serverへのデータ取り込み:ルールはメッセージをMicrosoft SQL Serverに書き込む処理をトリガーします。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQLを構築してSQL Serverに送信し、メッセージの特定フィールドを対応するデータベースのテーブルやカラムに書き込んだり更新したりします。
- データ保存と活用:データがMicrosoft SQL Serverに保存されることで、企業はそのクエリ機能を活用し、多様なユースケースに対応できます。
特長と利点
Microsoft SQL Serverとのデータ統合は、効率的なデータ送信、保存、活用を実現するために以下の特長と利点を備えています。
- リアルタイムデータストリーミング:EMQXはリアルタイムデータストリーム処理に最適化されており、ソースシステムからMicrosoft SQL Serverへの効率的かつ信頼性の高いデータ送信を実現します。リアルタイムのデータ取得と分析が可能で、即時の洞察やアクションが必要なユースケースに適しています。
- 高いパフォーマンスとスケーラビリティ:EMQXとMicrosoft SQL Serverはともに拡張性と信頼性を備え、大規模なIoTデータ処理に対応可能です。需要の増加に応じて水平・垂直の拡張が途切れることなく行え、IoTアプリケーションの継続性と信頼性を保証します。
- 柔軟なデータ変換:EMQXは強力なSQLベースのルールエンジンを提供し、Microsoft SQL Serverに保存する前にデータを前処理できます。フィルタリング、ルーティング、集約、エンリッチメントなど多様なデータ変換機構をサポートし、組織のニーズに応じてデータを整形可能です。
- 高度な分析機能:Microsoft SQL ServerはAnalysis Servicesによる多次元データモデル構築など強力な分析機能を提供し、複雑なデータ分析やデータマイニングを支援します。また、Reporting Servicesを通じてIoTデータの洞察や分析結果をレポートとして作成・公開し、関係者に提示できます。
はじめる前に
本節では、Microsoft SQL Serverデータ統合の作成を始める前に必要な準備について説明します。ODBCドライバーのインストールと設定、Microsoft SQL Serverのインストールと接続、データベースおよびデータテーブルの作成方法を解説します。
前提条件
ODBC ドライバーのインストールと設定
Microsoft SQL ServerデータベースにアクセスするためにODBCドライバーを設定する必要があります。ODBCドライバーとしては、FreeTDSまたはMicrosoftが提供するmsodbcsql18ドライバーのいずれかを使用できます。
EMQXはodbcinst.ini設定に指定されたDSN名を使ってドライバーの動的ライブラリのパスを判別します。以下の例ではDSN名をms-sqlとしています。詳細は接続プロパティを参照してください。
補足
DSN 名は任意に設定可能ですが、英字のみの使用を推奨します。また、DSN 名は大文字・小文字を区別します。
msodbcsql18ドライバーをODBCドライバーとしてインストール・設定する方法
msodbcsql18ドライバーを使用する場合は、Microsoftの公式手順を参照してください。
MicrosoftのEULA条項により、EMQXが提供するDockerイメージにはmsodbcsql18ドライバーは含まれていません。DockerやKubernetesで使用する場合は、EMQX EnterpriseのイメージをベースにODBCドライバーをインストールした新しいイメージを作成する必要があります。新しいイメージを使用することは、Microsoft SQL Server EULAに同意することを意味します。
以下の手順で新しいイメージをビルドしてください。
以下のDockerfileを用いて新しいイメージをビルドします。
例のベースイメージバージョンは
emqx/emqx-enterprise:5.8.1です。必要なEMQX Enterpriseバージョンに応じてビルドするか、最新のemqx/emqx-enterprise:latestを使用してください。
FROM emqx/emqx-enterprise:5.8.1
USER root
RUN apt-get -qq update && apt-get install -yqq curl gpg && \
. /etc/os-release && \
curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg && \
curl -fsSL "https://packages.microsoft.com/config/${ID}/${VERSION_ID}/prod.list" > /etc/apt/sources.list.d/mssql-release.list && \
apt-get -qq update && \
ACCEPT_EULA=Y apt-get install -yqq msodbcsql18 unixodbc-dev && \
sed -i 's/ODBC Driver 18 for SQL Server/ms-sql/g' /etc/odbcinst.ini && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
USER emqxdocker build -t emqx/emqx-enterprise:5.8.1-msodbcコマンドでイメージをビルドします。ビルド後、
docker image lsでローカルイメージ一覧を確認できます。必要に応じてイメージをアップロードまたは保存してください。
補足
この例でmsodbcsql18ドライバーをインストールした場合、odbcinst.iniのDSN名はms-sqlになっていることを確認してください。必要に応じてDSN名を変更可能です。
FreeTDSをODBCドライバーとしてインストール・設定する方法
ここでは主要なディストリビューションでのFreeTDSのインストールと設定例を示します。
MacOSでのFreeTDS ODBCドライバーのインストールと設定例:
$ brew install unixodbc freetds
$ vim /usr/local/etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver = /usr/local/lib/libtdsodbc.so
Setup = /usr/local/lib/libtdsodbc.so
FileUsage = 1CentOSでのFreeTDS ODBCドライバーのインストールと設定例:
$ yum install unixODBC unixODBC-devel freetds freetds-devel perl-DBD-ODBC perl-local-lib
$ vim /etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver = /usr/lib64/libtdsodbc.so
Setup = /usr/lib64/libtdsS.so.2
Driver64 = /usr/lib64/libtdsodbc.so
Setup64 = /usr/lib64/libtdsS.so.2
FileUsage = 1UbuntuでのFreeTDS ODBCドライバーのインストールと設定例(Ubuntu20.04の場合。他バージョンは公式ODBCドキュメントを参照):
$ apt-get install unixodbc unixodbc-dev tdsodbc freetds-bin freetds-common freetds-dev libdbd-odbc-perl liblocal-lib-perl
$ vim /etc/odbcinst.ini
# 以下の内容を追加
[ms-sql]
Description = ODBC for FreeTDS
Driver = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
Setup = /usr/lib/x86_64-linux-gnu/odbc/libtdsS.so
FileUsage = 1Microsoft SQL Server のインストールと接続
本節では、Dockerイメージを用いてLinux/MacOS上でMicrosoft SQL Server 2019を起動し、sqlcmdで接続する方法を説明します。その他のインストール方法はMicrosoft SQL Serverインストールガイドを参照してください。
DockerでMicrosoft SQL Serverをインストールし、以下のコマンドで起動します。パスワードは
mqtt_public1を使用します。Microsoft SQL Serverのパスワードポリシーはパスワードの複雑さを参照してください。注意:環境変数
ACCEPT_EULA=Yを指定してDockerコンテナを起動することで、MicrosoftのEULAに同意したことになります。詳細はエンドユーザー使用許諾契約を参照してください。bash# Microsoft SQL Server Docker イメージを起動し、パスワードを `mqtt_public1` に設定 $ docker run --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_SA_PASSWORD=mqtt_public1 -d mcr.microsoft.com/mssql/server:2022-CU15-ubuntu-22.04コンテナにアクセスします。
bashdocker exec -it sqlserver bashコンテナ内で設定したパスワードを使ってサーバーに接続します。パスワード入力時は文字が表示されません。入力後はEnterを押してください。
bash$ /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P mqtt_public1 -N -C 1>TIP
Microsoftが提供するMicrosoft SQL Serverコンテナには
mssql-tools18パッケージがインストールされていますが、実行ファイルは$PATHに含まれていません。そのため、sqlcmdを使用する際は実行ファイルのパスを指定する必要があります。今回のDocker環境では/opt配下にあります。mssql-tools18の使い方の詳細はsqlcmdユーティリティを参照してください。
これでMicrosoft SQL Server 2022インスタンスのデプロイと接続が完了しました。
データベースとデータテーブルの作成
前節で作成した接続を用いて、以下のSQL文でデータテーブルを作成します。
TIP
ODBCインターフェースの制約により、CJK文字やEmojiなどのUnicode文字を書き込む場合は、挿入前にバイナリ形式に変換する関数を使用する必要があります。テーブル作成時はUnicode文字を格納するカラムの型をNVARCHARに設定してください。
MQTTメッセージを保存するためのデータテーブルを作成します。メッセージID、トピック、QoS、ペイロード、パブリッシュ時間を含みます。
sqlCREATE TABLE dbo.t_mqtt_msg (id int PRIMARY KEY IDENTITY(1000000001,1) NOT NULL, msgid VARCHAR(64) NULL, topic VARCHAR(100) NULL, qos tinyint NOT NULL DEFAULT 0, payload VARCHAR(100) NULL, arrived DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP); GOクライアントのオンライン/オフライン状態を記録するデータテーブルを作成します。
sqlCREATE TABLE dbo.t_mqtt_events (id int PRIMARY KEY IDENTITY(1000000001,1) NOT NULL, clientid VARCHAR(255) NULL, event_type VARCHAR(255) NULL, event_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP); GO
コネクターの作成
本節では、SinkをMicrosoft SQL Serverに接続するためのコネクターの作成方法を説明します。
以下の手順は、EMQXとMicrosoft SQL Serverの両方をローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。
EMQX ダッシュボードに入り、Integration -> Connectors をクリックします。
ページ右上の Create をクリックします。
Create Connector ページで Microsoft SQL Server を選択し、Next をクリックします。
Configurationステップで以下の情報を設定します:
Connector name:コネクター名を入力します。英大文字・小文字と数字の組み合わせが望ましく、例:
my_sqlserverServer Host:
127.0.0.1:1433を入力、またはMicrosoft SQL Serverがリモートの場合はそのURLを入力します。TIP
Named Instanceを使用している場合は、インスタンスが動作するポート番号を明示的に指定する必要があります。ドライバーは指定ポートでインスタンスに接続し、ヘルスチェック時にEMQXがインスタンス名を推測します。
Server Host欄にインスタンス名のみ(例:
MYSERVER\SQL2022)を指定しても正しいインスタンスに接続できる保証はありません。ポート設定を必ず確認してください。Database Name:
masterを入力します。Username:
saを入力します。Password:事前設定したパスワード
mqtt_public1を入力、または実際のパスワードを使用します。SQL Server Driver Name:
ms-sqlを入力します。これはodbcinst.iniで設定した DSN 名です。
詳細設定(任意):詳細はSinkの機能を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてコネクターがMicrosoft SQL Serverに接続できるか確認できます。
ページ下部のCreateをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSink付きルールの作成を続行できます。ルール作成の詳細はメッセージ保存用Microsoft SQL Server Sink付きルールの作成およびイベント記録用Microsoft SQL Server Sink付きルールの作成を参照してください。
メッセージ保存用Microsoft SQL Server Sink付きルールの作成
本節では、ソースMQTTトピックt/#からのメッセージを処理し、処理済みデータを設定済みSink経由でMicrosoft SQL Serverのテーブルdbo.t_mqtt_msgに保存するルールの作成方法をダッシュボードで説明します。
EMQX Dashboard で Integration -> Rules をクリックします。
ページ右上の Create をクリックします。
ルールIDに
my_ruleを入力します。メッセージ保存用ルールを作成するため、SQL Editorに以下の文を入力します。これはトピックt/#配下のMQTTメッセージをMicrosoft SQL Serverに保存することを意味します。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT句に含めてください。sqlSELECT * FROM "t/#"TIP
ODBCインターフェースの制約により、CJK文字やEmojiなどのUnicode文字を書き込む場合は、挿入前にバイナリ形式に変換する関数を使用する必要があります。
ルール作成時に組み込み関数を使って文字列をUTF-16リトルエンディアンエンコードのバイナリ文字列に変換可能です。例:
sqlSELECT sqlserver_bin2hexstr(str_utf16_le(payload)) as payload, * FROM "t/#"TIP
初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。
- Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをMicrosoft SQL Serverに送信します。
Type of Actionドロップダウンリストから
Microsoft SQL Serverを選択します。ActionドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのMicrosoft SQL Server Sinkを選択することも可能ですが、本例では新規Sinkを作成します。Sink の名前を入力します。名前は英大文字・小文字と数字の組み合わせとしてください。
Connectorドロップダウンから前に作成した
my_sqlserverを選択します。新規コネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメーターはコネクターの作成を参照してください。メッセージ保存用の SQL Template を以下の SQL 文で設定します。
注意:これは前処理済みSQLのため、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。
sqlinsert into dbo.t_mqtt_msg(msgid, topic, qos, payload) values ( ${id}, ${topic}, ${qos}, ${payload} )TIP
ODBCインターフェースの制約により、CJK文字やEmojiなどのUnicode文字を書き込む場合は、挿入前にバイナリ形式に変換する関数を使用する必要があります。
SQL テンプレート内で
CONVERT関数を使い、Microsoft SQL Server 側で対応するバイナリデータを文字列に変換可能です。sqlinsert into dbo.t_mqtt_msg(msgid, topic, qos, payload) values ( ${id}, ${topic}, ${qos}, CONVERT(NVARCHAR(100), ${payload}) )SQLテンプレート内でプレースホルダー変数が未定義の場合、SQL template上部のUndefined Vars as Nullスイッチでルールエンジンの動作を切り替えられます:
Disabled(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。Enabled:変数が未定義の場合、
NULLを挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):詳細はSinkの機能を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてSinkがMicrosoft SQL Serverに接続できるか確認できます。
CreateをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。
Create Ruleページに戻り、設定内容を確認してCreateをクリックしルールを生成します。
これでMicrosoft SQL Server Sink付きルールの作成が完了しました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいMicrosoft SQL Server Sinkが表示されます。
また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#配下のメッセージがルールmy_ruleで解析されMicrosoft SQL Serverに送信・保存されていることが確認できます。
イベント記録用Microsoft SQL Server Sink付きルールの作成
本節では、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みSink経由でMicrosoft SQL Serverのテーブルdbo.t_mqtt_eventsに保存するルールの作成方法を説明します。
手順はメッセージ保存用Microsoft SQL Server Sink付きルールの作成とほぼ同様ですが、SQLテンプレートとSQLルール文が異なります。
オンライン/オフライン状態記録用のルール SQL 文は以下の通りです。
SELECT
*,
floor(timestamp / 1000) as s_shift,
timestamp div 1000 as ms_shift
FROM
"$events/client_connected", "$events/client_disconnected"イベント記録用の SQL テンプレートは以下の通りです。
insert into dbo.t_mqtt_events(clientid, event_type, event_time) values ( ${clientid}, ${event}, DATEADD(MS, ${ms_shift}, DATEADD(S, ${s_shift}, '19700101 00:00:00:000') ) )ルールのテスト
MQTT Xを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello SQL Server" }'Microsoft SQL Server Sinkの稼働状況を確認します。
- メッセージ保存用Sinkでは、新たに1件のマッチングと1件の送信メッセージがあるはずです。
dbo.t_mqtt_msgデータテーブルにデータが書き込まれているか確認してください。
1> SELECT * from dbo.t_mqtt_msg
2> GO
id msgid topic qos payload arrived
----------- ---------------------------------------------------------------- ---------------------------------------------------------------------------------------------------- --- ---------------------------------------------------------------------------------------------------- -----------------------
1000000001 0005F995096D9466F442000010520002 t/1 0 { "msg": "Hello SQL Server" } 2023-04-18 04:49:47.170
(1 rows affected)
1>- オンライン/オフライン状態記録用Sinkでは、新たに2件のイベント(クライアント接続・切断)が記録されているはずです。
dbo.t_mqtt_eventsデータテーブルに状態記録が書き込まれているか確認してください。
1> SELECT * from dbo.t_mqtt_events
2> GO
id clientid event_type event_time
----------- ---------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -----------------------
1000000001 emqx_c client.connected 2023-04-18 04:49:47.140
1000000002 emqx_c client.disconnected 2023-04-18 04:49:47.180
(2 rows affected)
1>