MQTTデータをTablestoreに取り込む
Tablestoreは、IoTシナリオに最適化されたスケーラブルでサーバーレスなデータベースです。時系列データ、構造化データ、半構造化データを管理するためのワンストップソリューションであるIoTstoreを提供しています。IoT、車載ネットワーク、リスク管理、メッセージング、レコメンデーションシステムなどのシナリオに最適です。Tablestoreはコスト効率に優れ、高性能なデータストレージを提供し、ミリ秒単位のクエリや取得、柔軟なデータ分析機能を備えています。EMQXはTablestore Cloud、Tablestore OSS、Tablestore Enterpriseとシームレスに統合し、IoTユースケースにおける効率的なデータ管理を可能にします。
動作概要
EMQXにおけるTablestoreのデータ統合は、EMQXのリアルタイムデータキャプチャおよび送信機能と、Tablestoreの高性能なデータストレージ・分析機能を組み合わせています。組み込みのルールエンジンを活用することで、EMQXからTablestoreへのデータ取り込みと保存のプロセスを簡素化し、複雑なコーディングを不要にします。EMQXはルールエンジンとSinkを通じてIoTデバイスのデータをTablestoreに転送し、効率的な保存と分析を実現します。
データが保存されると、Tablestoreはレポートやチャート、その他の可視化を生成する強力なツールを提供し、Tablestoreの可視化機能を通じてユーザーに提示されます。
以下の図は、エネルギー貯蔵シナリオにおけるEMQXとTablestore間の典型的なデータ統合アーキテクチャを示しています。

EMQXとTablestoreは、エネルギー消費データをリアルタイムに効率的に収集・分析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイスのアクセス、メッセージ送信、データルーティングを担当するIoTプラットフォームとして機能し、Tablestoreがデータストレージおよび分析プラットフォームとしてデータの保存と分析を担当します。ワークフローは以下の通りです:
- メッセージのパブリッシュと受信:エネルギー貯蔵デバイスや産業用IoTデバイスはMQTTプロトコルを通じてEMQXに正常に接続し、電力消費量、入出力電力などの情報を含むエネルギー消費データを定期的にMQTTプロトコルでパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:組み込みのルールエンジンを使用して、特定のソースからのメッセージをトピックマッチングに基づいて処理できます。メッセージが到着すると、ルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。例えば、データフォーマットの変換、特定情報のフィルタリング、文脈情報でのメッセージの拡充などです。
- Tablestoreへのデータ取り込み:ルールエンジンで定義されたルールがメッセージをTablestoreに書き込む操作をトリガーします。Tablestore Sinkは設定可能なフィールドを提供し、書き込むデータフォーマットを柔軟に定義でき、メッセージの特定フィールドをTablestoreの対応するメジャメントやフィールドにマッピングします。
エネルギー消費データがTablestoreに書き込まれた後、以下のような分析が可能です:
- Grafanaなどの可視化ツールに接続し、データに基づくチャートを生成してエネルギー貯蔵データを表示する。
- ビジネスシステムに接続し、エネルギー貯蔵デバイスの状態監視やアラートを行う。
特長と利点
Tablestoreデータ統合は以下の特長と利点を提供します:
- 効率的なデータ処理:EMQXは大量のIoTデバイス接続とメッセージスループットを処理でき、Tablestoreはデータの書き込み、保存、クエリに優れた性能を発揮します。IoTシナリオのデータ処理ニーズを満たしつつ、システムに過度な負荷をかけません。
- メッセージ変換:メッセージはEMQXのルールを通じて大規模な処理や変換を経てからTablestoreに書き込まれます。
- スケーラビリティ:EMQXとTablestoreはどちらもクラスターのスケールアウトが可能で、ビジネスの成長に応じて柔軟に水平拡張できます。
- 豊富なクエリ機能:Tablestoreは最適化された関数、演算子、インデックス技術を提供し、タイムスタンプ付きデータの効率的なクエリと分析を可能にし、IoTの時系列データから価値ある洞察を正確に抽出します。
- 効率的なストレージ:Tablestoreは高圧縮率のエンコーディング方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズでき、不要なデータがストレージを占有するのを防ぎます。
はじめる前に
このセクションでは、Tablestoreデータ統合の作成を開始する前に必要な準備について説明します。データベースインスタンスの作成や時系列テーブルの作成・管理が含まれます。
TIP
現在、Tablestoreとのデータ統合はTimeSeriesモデルのみをサポートしています。したがって、以下の手順はTimeSeriesモデルに焦点を当てています。
前提条件
進める前に以下を確認してください:
時系列テーブルの作成
- Tablestoreコンソールにログインします。
- 時系列モデルのインスタンスを作成します。インスタンス名を
emqx-demoなど任意に指定します。インスタンス作成の詳細はTablestore公式ドキュメントを参照してください。 - インスタンス管理ページに移動します。
- インスタンス詳細タブで時系列テーブルを選択し、時系列テーブルの作成ボタンをクリックします。
- 時系列テーブル情報を設定し、テーブル名を
timeseries_demo_with_dataなどに指定して確認をクリックします。

時系列テーブルの管理
先ほど作成した時系列テーブルを管理するには、テーブル名をクリックして時系列テーブルの管理ページに入ります。ビジネス要件に応じて以下の手順を実行できます:
データのクエリタブをクリックします。
時系列の追加をクリックします。
TIP
この手順は任意です。時系列テーブルがまだ存在しない場合、データ書き込み時にTablestoreが自動的に作成します。そのため、この例では時系列に対する手動操作は示していません。

コネクターの作成
このセクションでは、SinkをTablestoreサーバーに接続するためのコネクターの作成方法を示します。
以下の手順はEMQXとTablestoreをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create ConnectorページでTablestoreを選択し、Nextをクリックします。
- Configurationステップで以下を設定します:
- コネクター名を入力します。英数字の組み合わせで、例:
my_tablestore - Tablestoreサーバー接続情報を入力します:
- Endpoint:TablestoreインスタンスのアクセスURLを入力します。Tablestoreコンソールのインスタンス詳細ページで確認可能です。デプロイ方法に応じてURLを入力してください。例:パブリックネットワークの場合は
https://emqx-demo.cn-hangzhou.ots.aliyuncs.com - Instance Name:接続するTablestoreインスタンス名。例:
emqx-demo - Access Key ID:Tablestore認証に使用するAlibaba Cloud発行のアクセスキーID
- Access Key Secret:アクセスキーIDに対応するシークレットキー
- Storage Model Type:現在は
TimeSeriesのみサポート
- Endpoint:TablestoreインスタンスのアクセスURLを入力します。Tablestoreコンソールのインスタンス詳細ページで確認可能です。デプロイ方法に応じてURLを入力してください。例:パブリックネットワークの場合は
- TLSパラメータを設定します。TablestoreはHTTPSエンドポイントを使用するためTLSはデフォルトで有効であり、追加設定は不要です。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
- コネクター名を入力します。英数字の組み合わせで、例:
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがTablestoreサーバーに接続できるかテストできます。
- ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成を続行できます。詳細はTablestore Sinkを使ったルールの作成を参照してください。
Tablestore Sinkを使ったルールの作成
このセクションでは、EMQXでソースMQTTトピックt/#からのメッセージを処理し、設定済みのSinkを通じてTablestoreに送信するルールの作成方法を示します。
EMQXダッシュボードにアクセスし、左ナビゲーションメニューからIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルール作成ページで、ルールIDに
my_ruleを入力します。SQL Editorでルールを設定します。例えば、トピック
t/#のMQTTメッセージをTablestoreに保存したい場合、以下のSQL構文を使用できます。TIP
独自のSQL構文を指定する場合は、後で設定するSinkのデータフォーマットに含まれるすべての変数が
SELECT部分に含まれていることを確認してください。sqlSELECT * FROM "t/#"注:初心者の場合はSQL Examplesをクリックし、Enable TestでSQLルールを学習・テストできます。
- Add Actionボタンをクリックして、ルールがトリガーするアクションを定義します。このアクションにより、EMQXはルールで処理したデータをTablestoreに送信します。
Type of Actionドロップダウンリストから
Alibaba Tablestoreを選択します。ActionはデフォルトのCreate Actionのままにします。すでに作成済みのSinkがあれば選択可能です。この例では新規にSinkを作成します。Sinkの名前を入力します。英数字の組み合わせで指定してください。
Connectorドロップダウンから先ほど作成した
my_tablestoreを選択します。新規コネクターを作成するにはドロップダウン横のボタンをクリックします。設定パラメータはコネクターの作成を参照してください。以下のフィールドを設定します。
Data Source:EMQXがメッセージを取得するデータソース。処理対象のデータの起点を示します。特定のトピックやデータストリームを指定します。
Table Name:データを保存するTablestoreのテーブル名。先ほど作成したテーブル名を入力します。
${table}のような変数を使って動的にテーブル名を割り当てることも可能です。Measurement:Tablestoreで使用するメジャメント名。通常は論理的なグループやカテゴリを示します。例:
temperature_readingsやsensor_data。${measurement}などの変数も利用可能です。Storage Model Type:Tablestoreで使用するデータストレージモデルの種類。現在は
timeseriesのみサポートされており、時間ベースのデータに最適化されています。Tags:Tablestoreの各データエントリに関連付けられるキーと値のペア。クエリやフィルタリングを容易にするためのメタデータやラベルとして利用します。Addをクリックして複数のタグを定義可能です。例:
Key Value locationoffice1devicesensor1Fields:Tablestoreに送信するデータのフィールドリスト。各フィールドはTablestoreテーブルのカラムにマッピングされます。Addをクリックして以下を追加します:
- Column:Tablestoreのカラム名。
${column_name}のような変数で定義可能で、後述のメッセージペイロードのフィールドと一致させます。 - Message value:カラムに割り当てる値。
${value}のような動的参照、trueなどのブール値、1.3などの数値、バイナリデータも指定可能です。 - Is Int:カラムが数値型の場合、EMQXはデフォルトで浮動小数点型としてTablestoreに挿入します。整数値として挿入する場合はこのフラグを
trueに設定します。設定ファイル経由では${isint}のような変数で動的に割り当て可能です。 - Is Binary:カラムがバイナリの場合、EMQXはデフォルトで文字列型としてTablestoreに挿入します。バイナリデータとして挿入する場合はこのフラグを
trueに設定します。設定ファイル経由では${isbinary}のような変数で動的に割り当て可能です。
- Column:Tablestoreのカラム名。
Timestamp:Tablestoreに記録されるタイムスタンプ。マイクロ秒単位の整数値で指定します。固定値、文字列の"NOW"(メッセージ処理時にEMQXが現在時刻を動的に埋め込む)、
${microsecond_timestamp}のような変数プレースホルダーで動的割り当てが可能です。Meta Update Model:Tablestoreのメタデータ更新戦略を定義します:
MUM_IGNORE:メタデータの更新を無視し、競合があってもメタデータは変更されません。MUM_NORMAL:通常のメタデータ更新を行います。メタデータが存在しない場合は動的に作成され、既存メタデータと競合があれば上書きされる可能性があります。
フォールバックアクション(オプション):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定(オプション):詳細は高度な設定を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてSinkがTablestoreサーバーに接続可能かテストできます。
CreateをクリックしてSink作成を完了します。ルール作成ページのAction Outputsタブに新しいSinkが表示されます。
Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。
これでルールが正常に作成され、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいTablestore Sinkが確認できます。
また、Integration -> Flow Designerをクリックしてトポロジーを確認できます。トピックt/#のメッセージがルールmy_ruleで解析され、Tablestoreに送信・保存されていることがわかります。
ルールのテスト
MQTTXを使用してトピック
t/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。bashmqttx pub -i emqx_c -t t/1 -m '{ "table": "timeseries_demo_with_data", "measurement": "foo", "microsecond_timestamp": 1734924039271024, "column_name": "cc", "value": 1}'Sinkの稼働状況を確認し、新しい受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。
Tablestoreコンソールにアクセスし、データがTablestoreに書き込まれているか確認します。
- Metric Nameにメジャメント名(本デモでは
foo)を入力します。 - Tagに
location=office1、device=sensor1をクエリ条件として入力し、Searchをクリックします。

- Metric Nameにメジャメント名(本デモでは
高度な設定
このセクションでは、TablestoreコネクターおよびSinkの高度な設定オプションについて詳述します。ダッシュボードでコネクターやSinkを設定する際、Advanced Settingsに移動して以下のパラメータをニーズに合わせて調整してください。
| フィールド | 説明 | 推奨値 |
|---|---|---|
| Buffer Pool Size | EMQXとTablestore間のエグレス型ブリッジでデータフローを管理するために割り当てられるバッファワーカープロセスの数を指定します。これらのワーカーは、データを送信前に一時的に保存・処理します。エグレス(送信)シナリオのパフォーマンス最適化とスムーズなデータ伝送に関連します。イングレス(受信)データのみを扱うSinkでは「0」に設定可能です。 | 16 |
| Request TTL | 「Request TTL」(Time To Live)は、リクエストがバッファに入ってから有効とみなされる最大秒数を指定します。このタイマーはリクエストがバッファに入った瞬間からカウントされます。TTLを超えてバッファに滞留するか、送信後にTablestoreからの応答やアックが期限内に得られない場合、リクエストは期限切れとみなされます。 | 45 |
| Health Check Interval | SinkがTablestoreとの接続の自動ヘルスチェックを実行する間隔(秒)を指定します。 | 15 |
| Max Buffer Queue Size | Tablestore Sinkの各バッファワーカーがバッファリングできる最大バイト数を指定します。バッファワーカーはデータを一時的に保存し、効率的なデータフローを実現します。システムの性能やデータ転送要件に応じて調整してください。 | 256 |
| Batch Size | EMQXからTablestoreへ単一の転送操作で送信できるデータバッチのサイズを指定します。サイズを調整することでデータ転送の効率と性能を最適化できます。 | 1 |
| Query Mode | メッセージ送信を最適化するために、asynchronousまたはsynchronousのクエリモードを選択できます。非同期モードでは、Tablestoreへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしません。ただし、クライアントがTablestoreに到達する前にメッセージを受信する可能性があります。 | Async |
| Inflight Window | 「インフライトクエリ」とは、開始されたがまだ応答やアックを受け取っていないクエリを指します。この設定はSinkがTablestoreと通信する際に同時に存在できるインフライトクエリの最大数を制御します。 Query Modeが asyncの場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密な順序で処理する必要がある場合は、この値を1に設定してください。 | 100 |