クイックスタート:MQTTデータをEMQX Tablesに取り込む
本ガイドでは、EMQX ブローカーとEMQX Tablesを使って、外部データベースに依存せずにMQTTからデータベースへのパイプラインを構築する方法を説明します。EMQX Tablesは、EMQX Cloudプラットフォームに組み込まれた完全マネージドのネイティブ時系列データベースで、リアルタイムのIoTデータ処理に最適化されています。
本ガイドで学べる内容:
- EMQX ブローカーとEMQX Tablesのデプロイメント作成
- データ統合を使ったMQTTデータのEMQX Tablesへの取り込み
- 組み込みのData Explorerを使ったSQLによる時系列データのクエリ
- 必要に応じたカスタムデータベースとテーブルの作成
注意
本ドキュメントのスクリーンショットは参考用です。
EMQX Cloudは継続的に進化しており、コンソールのUIは更新や改善が行われるため、スクリーンショットが最新のインターフェースと完全に一致しない場合があります。ただし、全体のワークフローや機能は一貫しています。
EMQX Tables 無料トライアル
EMQX Tablesでは、ネイティブなMQTTからデータベースへの取り込みと時系列分析を無料で評価できるトライアルを提供しています。
クォータと期間
EMQX Tables 無料トライアルには以下が含まれます:
- 14日間の無料トライアル期間
- 100 GBのアウトバウンドトラフィック
- 100 GBのストレージ容量
有効期限
- 3日間連続でアクティブな接続がないトライアルデプロイメントは自動的に停止されます。
- トライアル終了時にデプロイメントは即時停止します。
- インスタンスは有効期限の3日後に削除されます。
- 削除後、すべてのデータは完全に消去されます。
トライアル終了後もEMQX Tablesを継続利用するには、トライアル期間内に有料プランへアップグレードしてください。
ユースケース:スマートファクトリーモニタリング
実際の例として、工場のデバイスが定期的に以下の時系列テレメトリデータを報告するとします:
machine_id:デバイス識別子production_line:関連する生産ラインtemperature:温度測定値vibration:振動強度machine_status:稼働状態(例:稼働中、警告、エラー)ts:測定時刻のタイムスタンプ
このデータをMQTT経由で取り込み、EMQX Tablesにネイティブに保存して監視、分析、アラートに活用します。
注意
サーバレスデプロイメントでもEMQX Tablesへのデータ取り込みをサポートしています。本ガイドの手順はDedicated Flexを例にしていますが、サーバレスの場合もデータ統合の手順は同じです。ただし、サーバレスはEMQX TablesへTLSを使ってパブリックインターネット経由で接続するため、ネットワークアソシエーションやNATゲートウェイは適用されません。
EMQX ブローカーとEMQX Tablesのデプロイメント作成
EMQX Cloudコンソールにログインします。
新規または既存のプロジェクトを作成または選択します。
EMQX Brokersの下で、+ New Deploymentをクリックします。
Dedicated Flexプランを選択し、仕様を設定します。
- 必要に応じてクラウドプロバイダーとリージョンを選択します。
- 以下のオプションはデフォルトのままにして、簡単にデモを進められます。
- Tier
- Smart Data Hubのサブスクライブ(任意)
- デプロイメント名とプロジェクト
- EMQXバージョン
右下のDeployをクリックします。

詳細はDedicated Flexデプロイメントの作成を参照してください。
EMQX Tablesの下で、+ New Deploymentをクリックします。
デフォルトのStarterプランを選択し、ブローカーのデプロイメントと同じクラウドプロバイダーとリージョンを選択します。
(任意)Network Associationで既存のネットワークをドロップダウンから選択します。同じクラウドプラットフォームかつ同じリージョンにあるため、ブローカーのネットワークがリストに表示されます。これを選択すると両デプロイメントが同じネットワークを共有し、プライベート接続で通信できます。
Tierを選択します。
(任意)Deployment Nameを入力します。
Deployをクリックします。詳細はEMQX Tablesデプロイメントの作成を参照してください。

デプロイメントが作成されたら、プロジェクト内のデプロイメントカードをクリックしてデプロイメントに入れます。
EMQX Tablesのデプロイメントに入り、左メニューのData Explorerをクリックすると、publicというデフォルトデータベースが利用可能なことが確認できます。
ブローカーとTables間の接続
接続方法はブローカーのデプロイメントタイプとネットワーク設定によって異なります:
| デプロイメントタイプ | 条件 | 接続方法 | 必要な対応 | 備考 |
|---|---|---|---|---|
| Dedicated / Dedicated Flex | Tablesと同じクラウドプラットフォーム、リージョン、ネットワーク | プライベート(安全で低レイテンシ) | 2つ目のデプロイメント作成時にNetwork Associationで既存ネットワークを選択。プロジェクトのネットワーク管理で共有ネットワークを確認可能。 | 各ネットワークは最大1つのブローカーと1つのTablesデプロイメントをサポート。 |
| Dedicated / Dedicated Flex | Tablesと異なるリージョンまたはネットワーク | TLSを使ったパブリックインターネット | ブローカーのネットワーク管理設定でNAT Gatewayを有効化。 | — |
| Serverless | — | TLSを使ったパブリックインターネット | 対応不要 | ネットワークアソシエーションとNAT Gatewayは適用外。 |
データ取り込み用ルールの作成
ルールエンジンを使ってMQTTメッセージをEMQX Tablesに取り込み、永続化します。
Dedicated Flex(またはServerless)デプロイメントに入り、Data Integrationに移動します。
初めてコネクターを作成する場合は、コネクター一覧からEMQX Tablesを選択します。すでにコネクターがある場合は、+ New ConnectorをクリックしてEMQX Tablesを選択します。

Quick Setup(デフォルト)を選び、プロジェクト内のTablesデプロイメントを選択します。

Testをクリックして接続を検証します。成功メッセージが表示されます。
Newをクリックし、New Ruleを選択してルール作成に進みます。
SQL EditorでSQLルールを定義します。
ルールSQL例:
sqlSELECT timestamp as ts, payload.machine_id as machine_id, payload.production_line as production_line, payload.temperature as temperature, payload.vibration as vibration, payload.machine_status as machine_status FROM "factory/+/metrics"このルールは、トピック
factory/+/metricsにマッチするすべてのMQTTメッセージのペイロードからフィールドを抽出します。抽出した値には別名を付けて、ルールアクションのWrite Syntax設定で参照可能にします。timestampフィールドはtsにマッピングされ、EMQX Tablesに記録する際の時間インデックスになります。Nextをクリックしてルールにアクションを追加します。アクション設定を行います:
Connector:作成したEMQX Tablesコネクターを選択
Time Precision:
millisecondWrite Syntax:
textmachine_metrics,production_line=${production_line},machine_id=${machine_id} temperature=${temperature},vibration=${vibration},machine_status=${machine_status} ${ts}この構文は、
machine_metricsテーブルが存在しない場合は自動作成し、InfluxDB Line Protocol形式でデータを書き込みます。- タグ:
production_line、machine_id(ディメンションかつ主キー) - フィールド:
temperature、vibration、machine_status(実際のメトリクス値) - タイムスタンプ:
${ts}はメッセージから抽出したtimestampを使い時系列を整合
- タグ:
Confirmをクリックしてルールを保存します。
Data Integrationページに戻ると、作成したコネクター、ルール、アクションが表示されます。

MQTTメッセージのパブリッシュ
簡単にテストするには、Dedicated Flex(またはServerless)デプロイメント内の組み込み診断ツールで、左メニューのOnline Testを使います。
Online Testでユーザー名とパスワード、または自動生成された認証情報でデプロイメントに接続します。
Messagesセクションで以下の2つのメッセージを送信します:
TIP
テーブルが事前に作成されていない場合、EMQX Cloudは最初に正常に書き込まれたメッセージのデータ型を基に自動でテーブルを作成します。一度テーブルとカラムが作成されると、以降の書き込みは同じデータ型を使う必要があり、異なる型の場合は書き込みが失敗します。
トピック:
factory/A/metricsペイロード:
json{ "machine_id": "M001", "production_line": "A", "temperature": 36.5, "vibration": 0.03, "machine_status": "running" }json{ "machine_id": "M002", "production_line": "A", "temperature": 39.1, "vibration": 0.06, "machine_status": "warning" }

EMQX Tablesでのデータクエリ
EMQX Tablesデプロイメントに入ります。
左メニューのData Explorerをクリックします。
エディターに以下のSQLを入力し、Run Queryをクリックします:
sqlSELECT * FROM machine_metrics;取り込んだメッセージが表示されます。

完成:ネイティブMQTTからDBへのパイプライン
これでライブパイプラインが完成しました:
MQTTクライアント -> EMQX ブローカー -> ルールエンジン -> EMQX Tables -> SQL分析
サードパーティのインフラは不要で、完全マネージドかつ時系列IoTワークロードに最適化されています。
次はGrafanaやStreamlitでメトリクスを可視化できます。詳細は統合ガイドを参照してください。
データベース機能のクイックガイド
EMQX Brokerからデフォルトのpublicデータベースへのデータ取り込みに加え、EMQX Tablesではカスタムデータベースやテーブルを定義し、SQLで手動挿入やクエリが可能です。これによりテストや開発の柔軟性が向上します。
カスタムデータベースの作成
デフォルトのpublicとは別にカスタムデータベースを作成できます。
デプロイメントのData Explorerページに移動します。
以下のSQLを入力し、Run Queryをクリックします:
sqlCREATE DATABASE factory WITH (ttl='7d');
これで、データ保持期間(TTL)が7日のfactoryという新しいデータベースが作成されます。

テーブルの作成
新しいデータベース内に、工場のメトリクスを保存する時系列テーブルを定義します。
Data Explorerで以下のSQLを入力し、Run Queryをクリックします:
CREATE TABLE factory.machine_metrics (
ts TIMESTAMP NOT NULL,
production_line STRING,
machine_id STRING,
temperature DOUBLE,
vibration DOUBLE,
machine_status STRING DEFAULT 'running',
TIME INDEX (ts),
PRIMARY KEY (production_line, machine_id)
) WITH (
ttl='7d'
);このテーブルはtsを時間インデックスに、production_lineとmachine_idの複合主キーを持ちます。
SQLを使ったデータ挿入
EMQX TablesはSQLベースとLine Protocolベースの両方のデータ取り込みをサポートします。.txtや.lpファイルのアップロードによる書き込みも可能です。
Data Explorerで以下のコマンドを実行してサンプルデータを挿入します:
INSERT INTO factory.machine_metrics (ts, production_line, machine_id, temperature, vibration, machine_status)
VALUES
(now(), 'A', 'M001', 36.5, 0.03, 'running'),
(now(), 'A', 'M002', 39.1, 0.06, 'warning'),
(now(), 'B', 'M010', 37.2, 0.02, 'running'),
(now(), 'B', 'M011', 45.6, 0.12, 'error');
now()は現在のタイムスタンプを挿入します。
データのクエリ
データを確認するにはData Explorerを使用します。
クエリ例
すべてのレコードを表示:
SELECT * FROM factory.machine_metrics;
直近60分の平均温度をラインと状態別に集計:
SELECT production_line, machine_status, AVG(temperature) AS avg_temp
FROM factory.machine_metrics
WHERE ts > now() - INTERVAL '60 minute'
GROUP BY production_line, machine_status;
特定デバイスでフィルタリング:
SELECT ts, temperature
FROM machine_metrics
WHERE machine_id = 'M001'
ORDER BY ts DESC
LIMIT 10;
クイッククエリで高速アクセス
テーブルスキーマからSQLスニペットを素早く生成できます:
- Data Explorerの左側スキーマパネルでカラムにカーソルを合わせます。
- カラム横の**縦三点リーダー(︙)**をクリックします。
- Quick Queryを選択し、
- Query column、Query max、Query minなどのオプションを使います。
- 生成されたSQLが自動的にエディターに表示されます。

リソース
サポートされているSQL文と句についてはGreptimeのドキュメントを参照してください。
より詳細なデータクエリ手順はEMQX Tablesでのデータクエリをご覧ください。