クイックスタート:MQTTデータをEMQX Tablesに取り込む
このガイドでは、外部データベースに依存せずに、EMQX ブローカーとEMQX Tablesを使って完全なMQTTからデータベースへのパイプラインを作成する手順を説明します。EMQX Tablesは、EMQX Cloudプラットフォームに組み込まれた完全マネージドのネイティブ時系列データベースで、リアルタイムのIoTデータ処理に最適化されています。
本ガイドで学べること:
- EMQX ブローカーとEMQX Tablesのデプロイメント作成
- カスタムEMQX Tablesユーザーの作成
- データ統合を使ったMQTTデータのEMQX Tablesへの取り込み
- 組み込みのData ExplorerでSQLによる時系列データのクエリ
- 必要に応じたカスタムデータベースとテーブルの作成
注意
本ドキュメントのスクリーンショットは参考用です。
EMQX Cloudは継続的に進化しており、コンソールUIが更新・改善されることがあります。そのため、一部のスクリーンショットが最新のインターフェースと完全に一致しない場合がありますが、全体のワークフローや機能は変わりません。
EMQX Tables 無料トライアル
EMQX Tablesは、ネイティブなMQTTからデータベースへの取り込みと時系列分析を無料で評価できるトライアルを提供しています。
クォータと期間
EMQX Tables 無料トライアルには以下が含まれます:
- 14日間の無料トライアル期間
- 100 GBのアウトバウンドトラフィック
- 100 GBのストレージ容量
有効期限
- 3日間連続でアクティブな接続がないトライアルデプロイメントは自動的に停止されます。
- トライアル終了時にデプロイメントは即時停止します。
- インスタンスは期限切れから3日後に削除されます。
- 削除後、すべてのデータは完全に消去されます。
トライアル終了後もEMQX Tablesを継続利用するには、期限切れ前に有料プランにアップグレードしてください。
ユースケース:スマートファクトリーモニタリング
具体例として、工場のデバイスが定期的に以下の時系列テレメトリデータを報告するケースを考えます:
machine_id:デバイス識別子production_line:所属する生産ラインtemperature:温度計測値vibration:振動強度machine_status:稼働状態(例:稼働中、警告、エラー)ts:計測のタイムスタンプ
このデータをMQTT経由で取り込み、EMQX Tablesにネイティブに保存してモニタリングや分析、アラートに活用したい場合の例です。
注意
サーバレスデプロイメントでもEMQX Tablesへのデータ取り込みが可能です。本ガイドではDedicated Flexを例にしています。サーバレスの場合、データ統合の手順は同じですが、EMQX Tablesへの接続はTLSを用いたパブリックインターネット経由となり、ネットワークアソシエーションやNATゲートウェイは適用されません。
EMQX ブローカーとEMQX Tablesのデプロイメント作成
EMQX Cloudコンソールにログインします。
既存のプロジェクトを選択するか、新規作成します。
EMQX Brokersの下で、+ New Deploymentをクリックします。
Dedicated Flexプランを選択し、仕様を設定します。
- 必要に応じてクラウドプロバイダーとリージョンを選択します。
- 以下のオプションはデフォルトのままで構いません(クイックデモ用)。
- Tier
- Subscribe to Smart Data Hub(任意)
- Deployment Name & Project
- EMQX Version
右下のDeployをクリックします。

詳細はDedicated Flexデプロイメントの作成を参照してください。
EMQX Tablesの下で、+ New Deploymentをクリックします。
デフォルトのStarterプランを選択し、ブローカーと同じクラウドプロバイダーとリージョンを選びます。
(任意)Network Associationで既存のネットワークをドロップダウンから選択します。同じクラウドプラットフォームかつ同リージョンのため、ブローカーのネットワークがリストに表示されます。これを選択すると両デプロイメントが同じネットワークを共有し、プライベート接続で通信可能になります。
Tierを選択します。
(任意)Deployment Nameを入力します。
Deployをクリックします。詳細はEMQX Tablesデプロイメントの作成を参照してください。

デプロイメント作成後、プロジェクト内のデプロイメントカードをクリックしてデプロイメントに入れます。
EMQX Tablesのデプロイメントに入り、左メニューのData Explorerをクリックすると、デフォルトのpublicデータベースが利用可能であることが確認できます。
データ取り込み用ユーザーの作成
新しいEMQX Tablesのデプロイメントはカスタムユーザーと権限をサポートしています。デプロイメント詳細にデフォルトのユーザー名やパスワードは表示されません。ブローカーデータ統合を作成する前に、対象データベースに書き込み可能なユーザーを作成してください。
注意
古いEMQX Tablesデプロイメントはカスタムユーザーや権限設定をサポートしていません。古いデプロイメントの場合、利用可能であればデプロイメント詳細に表示されるデフォルト認証情報を使用してください。
EMQX Tablesデプロイメントで、左メニューのUser Managementをクリックします。
+ Add Userをクリックします。
ユーザー名とパスワードを入力します。
PrivilegesでRead & Writeを選択します。
EMQX ブローカーからのデータ取り込みには最低でも
SqlInsert権限が必要です。Read & WriteはSqlInsertとSqlSelectの両方を含むため、同じユーザーで書き込みと検証用のクエリが可能です。Access Controlでこのユーザーがアクセス可能なデータベースを選択します。クイックスタートでは
publicデータベースを選択します。カスタムデータベースを作成している場合はそちらを選択してください。Confirmをクリックします。

作成したユーザーはUser Managementページから管理できます。

権限やアクセス制御モードの詳細はユーザー管理を参照してください。
ブローカーとTables間の接続
接続方法はブローカーのデプロイメントタイプとネットワーク設定によって異なります:
| デプロイメントタイプ | 条件 | 接続方法 | 必要な操作 | 備考 |
|---|---|---|---|---|
| Dedicated / Dedicated Flex | Tablesと同じクラウドプラットフォーム、リージョン、ネットワーク | プライベート(安全・低レイテンシ) | 2つ目のデプロイメント作成時にNetwork Associationで既存ネットワークを選択。プロジェクトのネットワーク管理で共有ネットワークを確認可能。 | 各ネットワークは最大1つのブローカーと1つのTablesデプロイメントをサポート。 |
| Dedicated / Dedicated Flex | Tablesと異なるリージョンまたはネットワーク | TLSを用いたパブリックインターネット | ブローカーのNetwork Management設定でNAT Gatewayを有効化。 | — |
| Serverless | — | TLSを用いたパブリックインターネット | 操作不要。 | ネットワークアソシエーションとNAT Gatewayは適用外。 |
データ取り込み用ルールの作成
ルールエンジンを使ってMQTTメッセージをEMQX Tablesに取り込み、永続化します。
Dedicated Flex(またはServerless)デプロイメントに入り、Data Integrationに移動します。
初めてのコネクターの場合は、コネクター一覧からEMQX Tablesを選択します。既にコネクターがある場合は**+ New ConnectorをクリックしてEMQX Tables**を選択します。

Quick Setup(デフォルト)を選択し、プロジェクト内のTablesデプロイメントを選びます。
接続設定を入力します:
- Database Name:
publicを使用 - Username:データ取り込み用ユーザーの作成で作成したユーザーを選択
- Password:選択したユーザーのパスワードを入力

- Database Name:
Testをクリックして接続を検証します。成功メッセージが表示されます。
Newをクリックし、New Ruleを選択してこのコネクターを使ったルール作成に進みます。
SQL EditorでSQLルールを定義します。
ルールSQL例:
sqlSELECT timestamp as ts, payload.machine_id as machine_id, payload.production_line as production_line, payload.temperature as temperature, payload.vibration as vibration, payload.machine_status as machine_status FROM "factory/+/metrics"このルールは、トピック
factory/+/metricsにマッチするすべてのMQTTメッセージのペイロードからフィールドを抽出します。抽出した値にはエイリアスを付けて、ルールアクションの書き込み構文で参照可能にしています。timestampフィールドはtsにマッピングされ、EMQX Tablesに記録する際の時刻インデックスとなります。Nextをクリックしてルールにアクションを追加します。アクション設定を行います:
Connector:作成したEMQX Tablesコネクターを選択
Time Precision:
millisecondWrite Syntax:
textmachine_metrics,production_line=${production_line},machine_id=${machine_id} temperature=${temperature},vibration=${vibration},machine_status=${machine_status} ${ts}この構文は、
machine_metricsテーブルが存在しない場合は自動作成し、InfluxDB Line Protocol形式でデータを書き込みます。- タグ:
production_line,machine_id(ディメンションおよび主キーとして使用) - フィールド:
temperature,vibration,machine_status(実際のメトリクス値) - タイムスタンプ:
${ts}はメッセージから抽出したtimestampを使い時系列を整合
- タグ:
Confirmをクリックしてルールを保存します。
Data Integrationページに戻ると、作成したコネクター、ルール、アクションが表示されます。

MQTTメッセージのパブリッシュ
クイックテストには、Dedicated Flex(またはServerless)デプロイメント内蔵の診断ツールのOnline Testを使えます。
Online Testでユーザー名とパスワード、または自動生成認証でデプロイメントに接続します。
Messagesセクションで以下の2つのメッセージを送信します:
TIP
テーブルが事前に作成されていない場合、EMQX Cloudは最初に正常に書き込まれたメッセージのデータ型に基づいて自動的にテーブルを作成します。テーブルとカラムが作成された後は、すべての書き込みで同じデータ型を使用する必要があり、異なる場合は書き込みが失敗します。
Topic:
factory/A/metricsPayload:
json{ "machine_id": "M001", "production_line": "A", "temperature": 36.5, "vibration": 0.03, "machine_status": "running" }json{ "machine_id": "M002", "production_line": "A", "temperature": 39.1, "vibration": 0.06, "machine_status": "warning" }

EMQX Tablesでのデータクエリ
EMQX Tablesデプロイメントにアクセスします。
左メニューのData Explorerをクリックします。
エディターに以下のSQLを入力し、Run Queryをクリックします:
sqlSELECT * FROM machine_metrics;取り込まれたメッセージが表示されます。

完成:ネイティブMQTTからDBへのパイプライン
これで以下のライブパイプラインが完成しました:
MQTTクライアント -> EMQX ブローカー -> ルールエンジン -> EMQX Tables -> SQL分析
サードパーティのインフラは不要で、完全マネージドかつ時系列IoTワークロード向けに設計されています。
次はGrafanaやStreamlitでメトリクスを可視化できます。詳細は統合ガイドを参照してください。
データベース機能のクイックガイド
EMQX Brokerからデフォルトのpublicデータベースへのデータ取り込みに加え、EMQX Tablesではカスタムデータベースやテーブルの定義、SQLによる手動データ挿入、組み込みのData Explorerでのクエリが可能です。これによりテストや開発の柔軟性が向上します。
カスタムデータベースの作成
デフォルトのpublicデータベースとは別にカスタムデータベースを作成できます。
デプロイメントのData Explorerページに移動します。
以下のSQLを入力し、Run Queryをクリックします:
sqlCREATE DATABASE factory WITH (ttl='7d');
これにより、データ保持期間(TTL)が7日のfactoryという名前の新しいデータベースが作成されます。

テーブルの作成
新しいデータベース内に工場のメトリクスを格納する時系列テーブルを定義します。
Data Explorerで以下のSQLを入力し、Run Queryをクリックします:
CREATE TABLE factory.machine_metrics (
ts TIMESTAMP NOT NULL,
production_line STRING,
machine_id STRING,
temperature DOUBLE,
vibration DOUBLE,
machine_status STRING DEFAULT 'running',
TIME INDEX (ts),
PRIMARY KEY (production_line, machine_id)
) WITH (
ttl='7d'
);このテーブルはtsを時刻インデックスに、production_lineとmachine_idの複合主キーを持ちます。
SQLによるデータ挿入
EMQX TablesはSQLベースとLine Protocolベースの両方のデータ取り込みをサポートし、.txtや.lpファイルのアップロードも可能です。
Data Explorerで以下のコマンドを実行してサンプルデータを挿入します:
INSERT INTO factory.machine_metrics (ts, production_line, machine_id, temperature, vibration, machine_status)
VALUES
(now(), 'A', 'M001', 36.5, 0.03, 'running'),
(now(), 'A', 'M002', 39.1, 0.06, 'warning'),
(now(), 'B', 'M010', 37.2, 0.02, 'running'),
(now(), 'B', 'M011', 45.6, 0.12, 'error');
now()は現在のタイムスタンプを挿入します。
データのクエリ
データ検証にはData Explorerを使用します。
クエリ例
すべてのレコードを表示:
SELECT * FROM factory.machine_metrics;
直近60分の平均温度をラインと状態別に集計:
SELECT production_line, machine_status, AVG(temperature) AS avg_temp
FROM factory.machine_metrics
WHERE ts > now() - INTERVAL '60 minute'
GROUP BY production_line, machine_status;
特定デバイスで絞り込み:
SELECT ts, temperature
FROM machine_metrics
WHERE machine_id = 'M001'
ORDER BY ts DESC
LIMIT 10;
クイッククエリで高速アクセス
テーブルスキーマからSQLスニペットを素早く生成できます:
- Data Explorerの左側スキーマパネルでカラムにカーソルを合わせます。
- カラム横の縦三点リーダー(︙)をクリックします。
- Quick Queryを選択し、Query column、Query max、Query minなどを選びます。
- 生成されたSQLが自動的にエディターに表示されます。

リソース
サポートされているSQL文と句についてはGreptimeのドキュメントを参照してください。
より詳細なクエリ方法はEMQX Tablesでのデータクエリをご覧ください。