Skip to content

MQTTデータをTablestoreに取り込む

Tablestoreは、IoTシナリオに最適化されたスケーラブルでサーバーレスなデータベースです。時系列データ、構造化データ、半構造化データを管理するためのワンストップソリューションであるIoTstoreを提供しています。IoT、車両ネットワーク、リスク管理、メッセージング、レコメンデーションシステムなどのシナリオに最適です。Tablestoreはコスト効率が高く高性能なデータストレージを提供し、ミリ秒単位のクエリや検索、柔軟なデータ分析機能を備えています。EMQXはTablestore Cloud、Tablestore OSS、Tablestore Enterpriseとシームレスに統合し、IoTユースケースにおける効率的なデータ管理を実現します。

動作概要

EMQXにおけるTablestoreデータ統合は、EMQXのリアルタイムデータキャプチャおよび送信機能と、Tablestoreの高性能なデータストレージおよび分析機能をシームレスに組み合わせています。組み込みのルールエンジンを活用することで、EMQXからTablestoreへのデータ取り込みと保存のプロセスを簡素化し、複雑なコーディングを不要にします。EMQXはルールエンジンとSinkを介してIoTデバイスのデータをTablestoreに転送し、効率的な保存と分析を可能にします。

データが保存されると、Tablestoreはレポートやチャート、その他の可視化を生成する強力なツールを提供し、これらはTablestoreの可視化機能を通じてユーザーに提示されます。

以下の図は、エネルギー貯蔵シナリオにおけるEMQXとTablestoreの典型的なデータ統合アーキテクチャを示しています。

MQTT to Tablestore

EMQXとTablestoreは、エネルギー消費データをリアルタイムに効率よく収集・分析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがIoTプラットフォームとしてデバイスアクセス、メッセージ送信、データルーティングを担当し、Tablestoreがデータストレージおよび分析プラットフォームとしてデータの保存と分析機能を担います。ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:エネルギー貯蔵デバイスや産業用IoTデバイスはMQTTプロトコルを通じてEMQXに正常に接続し、電力消費量、入出力電力などの情報を含むエネルギー消費データを定期的にMQTTプロトコルでパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:組み込みのルールエンジンを使い、特定のソースからのメッセージをトピックマッチングに基づいて処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングして、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  3. Tablestoreへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTablestoreに書き込む操作をトリガーします。Tablestore Sinkは設定可能なフィールドを提供し、メッセージの特定フィールドをTablestoreの対応するメジャメントやフィールドにマッピングすることで、柔軟なデータフォーマット定義を可能にします。

エネルギー消費データがTablestoreに書き込まれた後、以下のような分析が可能です。

  • Grafanaなどの可視化ツールに接続し、データに基づくチャートを生成してエネルギー貯蔵データを表示する。
  • ビジネスシステムに接続し、エネルギー貯蔵デバイスの状態監視やアラートを実施する。

特長とメリット

Tablestoreデータ統合は以下の特長と利点を提供します。

  • 効率的なデータ処理:EMQXは膨大な数のIoTデバイス接続とメッセージスループットを処理可能であり、Tablestoreはデータの書き込み、保存、クエリに優れた性能を発揮します。IoTシナリオのデータ処理要件を満たしつつ、システムに過度の負荷をかけません。
  • メッセージ変換:メッセージはEMQXのルールを通じて多様な処理や変換を経てからTablestoreに書き込まれます。
  • スケーラビリティ:EMQXとTablestoreはどちらもクラスターのスケールアウトに対応し、ビジネスの成長に応じて柔軟に水平拡張が可能です。
  • 豊富なクエリ機能:Tablestoreは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと分析を実現し、IoT時系列データから価値ある洞察を正確に抽出します。
  • 効率的なストレージ:Tablestoreは高圧縮率のエンコーディング方式を採用し、ストレージコストを大幅に削減します。また、異なるデータタイプごとに保存期間をカスタマイズでき、不必要なデータがストレージを占有するのを防ぎます。

はじめる前に

このセクションでは、Tablestoreデータ統合を作成する前に必要な準備、すなわちデータベースインスタンスの作成、時系列テーブルの作成と管理について説明します。

TIP

現時点でTablestoreとのデータ統合はTimeSeriesモデルのみをサポートしています。したがって、以下の手順はTimeSeriesモデルを前提としています。

前提条件

以下を準備・理解していることを確認してください。

時系列テーブルの作成

  1. Tablestoreコンソールにログインします。
  2. 時系列モデルのインスタンスを作成します。インスタンス名として例としてemqx-demoを指定します。インスタンス作成の詳細はTablestore公式ドキュメントを参照してください。
  3. インスタンス管理ページに移動します。
  4. インスタンス詳細タブで時系列テーブルを選択し、時系列テーブルの作成ボタンをクリックします。
  5. 時系列テーブル情報を設定し、テーブル名(例:timeseries_demo_with_data)を入力して確認をクリックします。

img

時系列テーブルの管理

先ほど作成した時系列テーブルを管理するには、テーブル名をクリックして時系列テーブル管理ページに入ります。ここから、ビジネス要件に応じて以下の操作を行えます。

  1. データクエリタブをクリックします。

  2. 時系列の追加をクリックします。

    TIP

    このステップは任意です。時系列テーブルがまだ存在しない場合、Tablestoreはデータ書き込み時に自動的に作成します。そのため、この例では時系列の手動操作は示していません。

img

コネクターの作成

このセクションでは、SinkをTablestoreサーバーに接続するためのコネクター作成方法を説明します。

以下の手順は、EMQXとTablestoreをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでTablestoreを選択し、Nextをクリックします。
  4. Configurationステップで以下を設定します:
    • コネクター名を入力します。英数字の組み合わせとしてください。例:my_tablestore
    • Tablestoreサーバー接続情報を入力します:
      • Endpoint:TablestoreインスタンスのアクセスURLを入力します。これはTablestoreコンソールのインスタンス詳細ページで確認可能です。デプロイ方法に応じてURLを入力してください。例:パブリックネットワークの場合はhttps://emqx-demo.cn-hangzhou.ots.aliyuncs.com
      • Instance Name:接続するTablestoreインスタンス名。例:emqx-demo
      • Access Key ID:Tablestore認証に使用するアクセスキーID。Alibaba Cloudが発行したキーです。
      • Access Key Secret:アクセスキーIDに対応するシークレットキー。
      • Storage Model Type:現在はTimeSeriesのみサポート。
    • TLSパラメータの設定。TablestoreはHTTPSエンドポイントを使用するためTLSはデフォルトで有効です。追加設定は不要です。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
  5. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTablestoreサーバーに接続できるかテストできます。
  6. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成を続けてください。詳細はTablestore Sink付きルールの作成を参照してください。

Tablestore Sink付きルールの作成

このセクションでは、EMQXでソースMQTTトピックt/#のメッセージを処理し、処理結果を設定済みのSinkを通じてTablestoreに送信するルールの作成方法を説明します。

  1. EMQXダッシュボードに入り、左ナビゲーションメニューからIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルール作成ページでルールIDにmy_ruleを入力します。

  4. SQL Editorでルールを設定します。例えば、トピックt/#のMQTTメッセージをTablestoreに保存したい場合、以下のSQL文を使用します。

    TIP

    独自のSQL文を指定する場合、後で設定するSinkのデータフォーマットに含まれるすべての変数がSELECT句に含まれていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    注:初心者の方はSQL ExamplesEnable TestをクリックしてSQLルールの学習・テストが可能です。

    • Add Actionボタンをクリックして、ルールがトリガーするアクションを定義します。このアクションでEMQXはルールで処理したデータをTablestoreに送信します。
  5. Type of ActionドロップダウンリストからAlibaba Tablestoreを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能です。この例では新規Sinkを作成します。

  6. Sink名を入力します。英数字の組み合わせとしてください。

  7. Connectorドロップダウンから先ほど作成したmy_tablestoreを選択します。新規コネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメータはコネクターの作成を参照。

  8. 以下のフィールドを設定します:

    • Data Source:EMQXがメッセージを取得するデータソース。処理対象のデータの起点を示します。特定のトピックやデータストリームを指定可能です。

    • Table Name:データを保存するTablestoreのテーブル名。先に作成したテーブル名を入力します。${table}などの変数を使って動的に指定することも可能です。

    • Measurement:Tablestoreで使用するメジャメント名。通常は論理的なデータのグループやカテゴリを示します。例:temperature_readingssensor_data。変数(例:${measurement})を使って動的に指定可能です。

    • Storage Model Type:Tablestoreで使用するデータストレージモデル。現在はtimeseriesのみサポートされており、時系列データに最適化されています。

    • Tags:Tablestoreの各データエントリに付与されるキーと値のペア。メタデータやラベルとして利用し、クエリやフィルタリングを容易にします。Addをクリックして複数のタグを定義可能です。例:

      KeyValue
      locationoffice1
      devicesensor1
    • Fields:Tablestoreに送信するデータのフィールドリスト。各フィールドはTablestoreテーブルのカラムにマッピングされます。Addをクリックして以下を追加します:

      • Column:Tablestoreのカラム名。${column_name}などの変数を使って定義可能で、後述のメッセージペイロードのフィールドと一致させます。
      • Message value:カラムに割り当てる値。動的参照(${value})、真偽値(true)、数値(1.3)、バイナリデータが指定可能です。
      • Is Int:カラムが数値型の場合、デフォルトでEMQXは浮動小数点型として挿入します。整数値として挿入したい場合はこのフラグをtrueに設定します。設定ファイル経由では変数(${isint})で動的指定可能です。
      • Is Binary:カラムがバイナリの場合、デフォルトでEMQXは文字列型として挿入します。バイナリデータとして挿入したい場合はこのフラグをtrueに設定します。設定ファイル経由では変数(${isbinary})で動的指定可能です。
    • Timestamp:Tablestoreに記録されるタイムスタンプ。マイクロ秒単位の整数値で指定します。固定値、文字列"NOW"(メッセージ処理時にEMQXが現在時刻を動的に埋め込み)、変数プレースホルダー(例:${microsecond_timestamp})で動的指定が可能です。

    • Meta Update Model:Tablestoreのメタデータ更新戦略を定義します:

      • MUM_IGNORE:メタデータ更新を無視し、競合があっても変更しません。
      • MUM_NORMAL:通常のメタデータ更新を行います。メタデータが存在しない場合は動的に作成し、既存のメタデータと競合した場合は上書きされる可能性があります。
  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。

  10. 詳細設定(任意)詳細設定を参照してください。

  11. Createをクリックする前に、Test ConnectivityをクリックしてSinkがTablestoreサーバーに接続できるかテスト可能です。

  12. CreateをクリックしてSink作成を完了します。ルール作成ページに戻ると、Action Outputsタブに新しいSinkが表示されます。

  13. ルール作成ページで設定内容を確認し、Createボタンをクリックしてルールを生成します。

これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいTablestore Sinkが確認できます。

また、Integration -> Flow Designerをクリックしてトポロジーを確認できます。トピックt/#のメッセージがルールmy_ruleで解析され、Tablestoreに送信・保存されている様子が確認できます。

ルールのテスト

  1. MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

    bash
    mqttx pub -i emqx_c -t t/1 -m '{ "table": "timeseries_demo_with_data", "measurement": "foo", "microsecond_timestamp": 1734924039271024, "column_name": "cc", "value": 1}'
  2. Sinkの稼働状況を確認し、新規の受信メッセージと送信メッセージが1件ずつあることを確認します。

  3. Tablestoreコンソールにアクセスし、データがTablestoreに書き込まれているか確認します。

    • Metric Nameにメジャメント名(このデモではfoo)を入力します。
    • Taglocation=office1およびdevice=sensor1をクエリ条件として入力し、Searchをクリックします。

    tablestore_query_data

詳細設定

このセクションでは、TablestoreコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードでコネクターやSinkを設定する際、Advanced Settingsに進み、以下のパラメータをニーズに合わせて調整できます。

フィールド説明推奨値
Buffer Pool SizeEMQXとTablestore間の出口タイプのブリッジでデータフローを管理するために割り当てられるバッファワーカープロセス数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへの送信を担います。出口(アウトバウンド)シナリオのパフォーマンス最適化とスムーズなデータ送信に重要です。Ingress(インバウンド)データのみ扱うSinkでは「0」に設定可能です。16
Request TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストはバッファリング開始時からこのTTLをカウントし、TTLを超えるか、送信後にTablestoreからの応答やアックを受け取れなかった場合、リクエストは期限切れと判断されます。45
Health Check IntervalSinkがTablestore接続の自動ヘルスチェックを行う間隔(秒)を指定します。15
Max Buffer Queue SizeTablestore Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的なデータフローを実現します。システム性能やデータ転送要件に応じて調整してください。256
Batch SizeEMQXからTablestoreへ一度に転送可能なデータバッチのサイズを指定します。サイズを調整することでデータ転送の効率とパフォーマンスを最適化できます。1
Query Modeメッセージ送信の最適化のため、asynchronous(非同期)またはsynchronous(同期)クエリモードを選択可能です。非同期モードではTablestoreへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがTablestore到着前にメッセージを受け取る可能性があります。Async
Inflight Window「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。SinkがTablestoreと通信する際に同時に存在可能な最大インフライトクエリ数を制御します。
Query Modeasyncの場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密な順序で処理する必要がある場合は、この値を1に設定してください。
100