Microsoft SQL ServerへのMQTTデータ取り込み
SQL Serverは、企業や組織の規模や種類を問わず広く利用されている主要なリレーショナル商用データベースソリューションの一つです。EMQX CloudはSQL Serverとの統合をサポートしており、MQTTメッセージやクライアントイベントをSQL Serverに保存できます。これにより、データ管理や分析のための複雑なデータパイプラインや分析プロセスの構築、デバイス接続の管理、ERP、CRM、BIなどの他の企業システムとの統合が可能になります。
本ページでは、EMQX CloudとMicrosoft SQL Server間のデータ統合の詳細な概要と、データ統合の作成および検証に関する実践的な手順を提供します。
動作の仕組み
Microsoft SQL Serverとのデータ統合は、EMQX Cloudの標準機能として提供されており、EMQX Cloudのデバイス接続およびメッセージ送信機能とMicrosoft SQL Serverの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントを通じて、MQTTメッセージやクライアントイベントをMicrosoft SQL Serverに保存できます。さらに、イベントによりMicrosoft SQL Server内のデータの更新や削除をトリガーでき、デバイスのオンライン状態や接続履歴などの情報を記録可能です。この統合により、EMQX CloudからSQL Serverへのデータ取り込みが簡素化され、複雑なコーディングを必要としません。
以下の図は、EMQXとSQL Server間の典型的なデータ統合アーキテクチャを示しています。

Microsoft SQL ServerへのMQTTデータ取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、機械、センサー、製品ラインの稼働状態、計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQX Cloudにパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でのマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloudで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、どのメッセージをMicrosoft SQL Serverにルーティングするかを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
- SQL Serverへのデータ取り込み:ルールはメッセージのMicrosoft SQL Serverへの書き込みをトリガーします。SQLテンプレートを用いて、ルール処理結果からデータを抽出しSQLを構築、SQL Serverに送信して実行することで、メッセージの特定フィールドをデータベースの対応するテーブルやカラムに書き込んだり更新したりします。
- データの保存と活用:データがMicrosoft SQL Serverに保存された後、企業はそのクエリ機能を活用してさまざまなユースケースに利用できます。
特長とメリット
Microsoft SQL Serverとのデータ統合は、効率的なデータ送信、保存、活用を実現するための多様な特長とメリットを提供します:
- リアルタイムデータストリーミング:EMQX Cloudはリアルタイムデータストリームの処理に最適化されており、ソースシステムからMicrosoft SQL Serverへの効率的かつ信頼性の高いデータ送信を保証します。即時の洞察やアクションが必要なユースケースに理想的です。
- 高性能かつスケーラブル:EMQX CloudとMicrosoft SQL Serverは共に拡張性と信頼性を備えており、大規模なIoTデータの処理に適しています。需要の増加に応じて水平・垂直の拡張を途切れなく行い、IoTアプリケーションの継続性と信頼性を確保します。
- 柔軟なデータ変換:EMQX Cloudは強力なSQLベースのルールエンジンを提供し、Microsoft SQL Serverに保存する前にデータの前処理が可能です。フィルタリング、ルーティング、集約、強化など多様なデータ変換機構をサポートし、ニーズに応じたデータ整形を実現します。
- 高度な分析機能:Microsoft SQL ServerはAnalysis Servicesによる多次元データモデル構築など強力な分析機能を提供し、複雑なデータ分析やデータマイニングを支援します。また、Reporting Servicesを通じてIoTデータの洞察や分析結果をレポートとして作成・公開し、関係者に提供できます。
はじめる前に
本セクションでは、Microsoft SQL Serverデータ統合の作成を開始する前に必要な準備について説明します。Microsoft SQL Serverのインストールと接続、データベースおよびデータテーブルの作成、ODBCドライバーのインストールと設定方法を含みます。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
Microsoft SQL Serverのインストールと接続
本セクションでは、Dockerイメージを使用してLinux/MacOS上でMicrosoft SQL Server 2019を起動し、sqlcmdを使ってMicrosoft SQL Serverに接続する方法を説明します。その他のインストール方法については、Microsoft SQL Serverインストールガイドをご参照ください。
Docker経由でMicrosoft SQL Serverをインストールし、以下のコマンドでDockerイメージを起動します。パスワードは
mqtt_public1を使用します。Microsoft SQL Serverのパスワードポリシーについてはパスワードの複雑性をご覧ください。注意:環境変数
ACCEPT_EULA=Yを指定してDockerコンテナを起動することで、MicrosoftのEULAに同意したことになります。詳細はMICROSOFT SOFTWARE LICENSE TERMS MICROSOFT SQL SERVER 2019 STANDARD(EN_US)をご参照ください。bash# Microsoft SQL ServerのDockerイメージを起動し、パスワードを`mqtt_public1`に設定 $ docker run --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_SA_PASSWORD=mqtt_public1 -d mcr.microsoft.com/mssql/server:2019-CU19-ubuntu-20.04コンテナにアクセスします。
bashdocker exec -it sqlserver bashコンテナ内のサーバーに接続するために、設定したパスワードを入力します。パスワード入力時は文字が表示されません。入力後はそのまま
Enterを押してください。bash$ /opt/mssql-tools/bin/sqlcmd -S 127.0.0.1 -U sa $ Password: 1>TIP
Microsoftが提供するMicrosoft SQL Serverコンテナには
mssql-toolsがインストールされていますが、実行ファイルは$PATHに含まれていません。そのため、mssql-toolsの実行ファイルパスを指定する必要があります。本例のDocker環境ではパスはoptとなります。mssql-toolsの使い方の詳細はsqlcmd-utilityをご参照ください。
これでMicrosoft SQL Server 2019のインスタンスがデプロイされ、接続可能な状態になりました。
データベースおよびデータテーブルの作成
本セクションでは、Microsoft SQL Serverでのデータベースおよびデータテーブルの作成方法を説明します。
前節で作成した接続を使い、Microsoft SQL Serverで
emqxデータベースを作成します。bash... Password: 1> USE master 2> GO Changed database context to 'master'. 1> CREATE DATABASE emqx; 2> GO以下のSQL文でデータテーブルを作成します。
temp_humテーブルを作成するSQLコマンドです。このテーブルはデバイスから報告される温度と湿度のデータを保存するために使用します。sqlCREATE TABLE temp_hum( client_id VARCHAR(64) NULL, temp NVARCHAR(100) NULL, hum NVARCHAR(100) NULL, up_timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ); GO;
Microsoft SQL Serverコネクターの作成
データ統合ルールを作成する前に、Microsoft SQL Serverにアクセスするためのコネクターを作成する必要があります。
デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるMicrosoft SQL serverを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリの下のMicrosoft SQL serverを選択します。
コネクター名:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- サーバーホスト:サーバーのIPアドレスとポート番号
- データベース名:
emqxを入力 - ユーザー名:
saを入力 - パスワード:設定したパスワード
mqtt_public1または実際のパスワードを入力 - SQL Serverドライバー名:EMQX Cloudにデフォルトでインストールされている
ODBC Driver 17 for SQL Serverを入力
テストボタンをクリックします。Microsoft SQL Serverサービスにアクセス可能であれば、成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをMicrosoft SQL Serverに転送するためのアクションをルールに追加する必要があります。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールマッチング用のSQL文を入力します。以下のルールでは、メッセージが報告された時間
up_timestamp、クライアントID、temp_hum/emqxトピックのペイロードから温度と湿度を取得します。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL ExamplesとTry It OutをクリックしてSQLルールの学習とテストを行うことをおすすめします。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
利用する機能に基づいてSQLテンプレートを設定します。注意:これは前処理済みのSQLのため、フィールドは引用符で囲まず、文末にセミコロンを記述しないでください。
sqlINSERT INTO temp_hum(client_id, temp, hum) VALUES ( ${client_id}, ${temp}, ${hum} )SQLテンプレート内でプレースホルダー変数が未定義の場合、SQLテンプレート上の未定義変数をNULLとして扱うスイッチを切り替えてルールエンジンの挙動を設定できます:
無効(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLをデータベースに挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。
詳細設定(任意)。
確認ボタンをクリックしてルール作成を完了します。
ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントも使用可能です。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがMicrosoft SQL Serverに転送されているか確認します。
bash1> SELECT * FROM temp_hum ORDER BY up_timestamp; 2> GO client_id temp hum up_timestamp ---------------------------------------------------------------- ------------ ------------ ----------------------- test_client 27.50 41.80 2024-03-25 05:49:21.237コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。