Skip to content

データ統合の概要

EMQX Platformは、完全マネージド型のMQTTメッセージクラウドサービスとして、MQTTプロトコルを介してIoTデバイスを接続し、リアルタイムでメッセージを配信します。この基盤をもとに、データ統合はEMQX Platformの他のクラウドリソースとの接続機能を強化し、デバイスと他の業務システムとのシームレスな統合を可能にします。EMQX Platformのデータ統合は、明確かつ柔軟な「設定可能」なアーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化します。これによりユーザーの利用可能性が向上し、業務システムとEMQX Platform間の結合度を低減し、データ転送のためのより良いインフラを提供します。

data_integration_intro

動作原理

デバイスやアプリケーションが接続を確立すると、MQTTブローカーはメッセージをルーティングします。到着したメッセージは、SQL文を用いてデータ操作を行う強力なコンポーネントであるルールエンジンによって処理されます。処理されたデータは「アクション」によってターゲットのサービスへ転送されます。アクションは「Sink」(サービスへデータを送信)と「Source」(サービスからデータを受信)の2種類に分類されます。現在、データ統合は「Sink」モードのみで動作し、様々なクラウドサービスへのデータのシームレスな統合を実現しています。

コネクター

コネクターはSink/Sourceの基盤となる接続チャネルであり、クラウドプラットフォームから購入したクラウドサービス製品に接続するために使用されます。クラウドサービス製品にはKafkaのようなメッセージキューサービスやRDSのようなストレージサービスがあります。

ルール

ルールは「データの発生元」と「データのフィルタリングおよび処理方法」を記述します。ルールはSQLライクな文でカスタムデータを記述でき、SQLテストを用いてエクスポートされるデータのシミュレーションも可能です。ルールSQLの書き方についてはルールSQLリファレンスをご参照ください。

アクション

アクションは「処理済みデータの送信先」を決定します。ルールは1つ以上のアクションに対応でき、アクションは定義済みのコネクターを設定する必要があります。つまり、データの送信先を指定します。

Sink

Sinkはルールのアクションに追加されるデータ出力コンポーネントです。デバイスがイベントをトリガーしたり、メッセージがEMQXデプロイメントに到着すると、システムは対応するルールをマッチングして実行し、データをフィルタリングおよび処理します。ルールエンジンで処理されたデータは指定されたSinkに転送されます。Sinkでは、${var}${.var}構文を使ってデータから変数を抽出し、SQL文やデータテンプレートを動的に生成するなど、データの取り扱い方法を設定できます。その後、対応するコネクターを通じて外部データシステムにデータを送信し、メッセージの保存、データ更新、イベント通知などの操作を可能にします。

Sinkでサポートされる変数抽出構文は以下の通りです:

  • ${var}:ルールの出力結果から変数を抽出する構文です。例として${topic}があります。ネストされた変数を抽出したい場合はドット.を使い、${payload.temp}のように記述します。抽出したい変数が出力結果に含まれていない場合は、文字列undefinedが返されます。
  • ${.var}:まずルールの出力結果から変数を抽出しようとし、存在しない場合は対応するイベントデータから抽出を試みます。例として${.topic}があります。こちらもドット.を使ったネスト変数の抽出をサポートし、${.payload.temp}のように記述できます。ルールの出力結果とイベントデータの両方に変数が存在しない場合は、文字列undefinedが返されます。${.}を使うと、ルールの出力結果とイベントデータをマージしたすべての変数を抽出できます。

Source

Sourceはデータ入力コンポーネントであり、ルールのデータソースとして機能し、ルールSQLを通じて選択されます。

SourceはMQTTやKafkaなどの外部データシステムからメッセージをサブスクライブまたはコンシュームします。コネクター経由で新しいメッセージが到着すると、ルールエンジンは対応するルールをマッチングして実行し、データをフィルタリングおよび処理します。処理されたデータは指定されたEMQXトピックにパブリッシュされ、クラウドコマンドの配信などの操作を可能にします。

ワークフロー

以下はデータ統合を作成する基本的な手順です:

data_integration_intro

  1. コネクターを作成します。デプロイメントのデータ統合初期ページから接続したいサービスを選択し、コネクターを設定します。
  2. デバイスから収集したデータを処理するルールを作成します。ルールはSQL文を使って任意の方法でデータを収集・処理できます。
  3. ルールにアクションを追加します。ルールがトリガーされると、処理されたデータは設定されたコネクターを通じてクラウドサービスに転送されます。
  4. 作成したデータ統合が正しく動作するかテストします。

デプロイメントに必要なネットワーク設定

データ統合機能はデプロイメントの種類によって、データソースへのアクセスレベルやネットワーク構成が異なります。

サーバーレスデプロイメント

  • データソースはパブリックネットワークアクセスのみをサポートします。そのため、データソース作成前にパブリックネットワークアクセスが可能であることと、セキュリティグループが開放されていることを確認してください。

  • KafkaおよびHTTPサーバーコネクターのみサポートします。

  • サーバーレスデプロイメントのデータ統合は従量課金制で、以下の通りです:

    EMQXサーバーレスはユーザーに月間最大100万回のルールアクション実行無料枠を提供します。これを超えると、追加の100万回のルールアクション実行ごとに0.25ドルの料金が発生します。

    最適なパフォーマンスと管理性を維持するため、EMQX Platformは各デプロイメント内のコネクター、ルール、アクションの作成に以下の制限を設けています:

    カテゴリー最大許容数
    コネクター合計2
    ルール合計4
    ルールあたりのアクション数1

専用デプロイメント

  • データソースへは内部ネットワーク経由でのアクセスを推奨します。そのため、作成前にVPCピアリングを設定し、セキュリティグループを開放してください。
  • パブリックネットワーク経由でアクセスする場合はNATゲートウェイを有効にできます。

BYOCデプロイメント

注意

BYOCデプロイメントではコンソールからのデータ統合設定はできません。設定にはEMQX Management Consoleをご利用ください。

  • ネットワークセキュリティとパフォーマンス向上のため、データソースへは内部ネットワーク経由でのアクセスを推奨します。作成前に、リソースが存在するVPCとBYOCデプロイメントが存在するパブリッククラウドコンソールのVPC間でピアリング接続を設定し、関連するセキュリティグループを開放してください。詳細はCreate VPC Peering Connectionsをご参照ください。
  • パブリックネットワーク経由でリソースにアクセスする場合は、BYOCデプロイメントが存在するVPCに対してパブリッククラウドコンソールでNATゲートウェイを設定してください。