Skip to content

クイックスタート:MQTTデータをEMQX Tablesに取り込む

本ガイドでは、EMQX ブローカーとEMQX Tablesを使って、外部データベースに依存せずにMQTTからデータベースへのパイプラインを構築する方法を説明します。EMQX Tablesは、EMQX Cloudプラットフォームに組み込まれた完全マネージドのネイティブ時系列データベースで、リアルタイムのIoTデータ処理に最適化されています。

本ガイドで学べる内容:

  • EMQX ブローカーとEMQX Tablesのデプロイメント作成
  • データ統合を使ったMQTTデータのEMQX Tablesへの取り込み
  • 組み込みのData Explorerを使ったSQLによる時系列データのクエリ
  • 必要に応じたカスタムデータベースとテーブルの作成

注意

本ドキュメントのスクリーンショットは参考用です。

EMQX Cloudは継続的に進化しており、コンソールのUIは更新や改善が行われるため、スクリーンショットが最新のインターフェースと完全に一致しない場合があります。ただし、全体のワークフローや機能は一貫しています。

EMQX Tables 無料トライアル

EMQX Tablesでは、ネイティブなMQTTからデータベースへの取り込みと時系列分析を無料で評価できるトライアルを提供しています。

クォータと期間

EMQX Tables 無料トライアルには以下が含まれます:

  • 14日間の無料トライアル期間
  • 100 GBのアウトバウンドトラフィック
  • 100 GBのストレージ容量

有効期限

  • 3日間連続でアクティブな接続がないトライアルデプロイメントは自動的に停止されます。
  • トライアル終了時にデプロイメントは即時停止します。
  • インスタンスは有効期限の3日後に削除されます。
  • 削除後、すべてのデータは完全に消去されます。

トライアル終了後もEMQX Tablesを継続利用するには、トライアル期間内に有料プランへアップグレードしてください。

ユースケース:スマートファクトリーモニタリング

実際の例として、工場のデバイスが定期的に以下の時系列テレメトリデータを報告するとします:

  • machine_id:デバイス識別子
  • production_line:関連する生産ライン
  • temperature:温度測定値
  • vibration:振動強度
  • machine_status:稼働状態(例:稼働中、警告、エラー)
  • ts:測定時刻のタイムスタンプ

このデータをMQTT経由で取り込み、EMQX Tablesにネイティブに保存して監視、分析、アラートに活用します。

注意

サーバレスデプロイメントでもEMQX Tablesへのデータ取り込みをサポートしています。本ガイドの手順はDedicated Flexを例にしていますが、サーバレスの場合もデータ統合の手順は同じです。ただし、サーバレスはEMQX TablesへTLSを使ってパブリックインターネット経由で接続するため、ネットワークアソシエーションやNATゲートウェイは適用されません。

EMQX ブローカーとEMQX Tablesのデプロイメント作成

  1. EMQX Cloudコンソールにログインします。

  2. 新規または既存のプロジェクトを作成または選択します。

  3. EMQX Brokersの下で、+ New Deploymentをクリックします。

  4. Dedicated Flexプランを選択し、仕様を設定します。

    • 必要に応じてクラウドプロバイダーとリージョンを選択します。
    • 以下のオプションはデフォルトのままにして、簡単にデモを進められます。
      • Tier
      • Smart Data Hubのサブスクライブ(任意)
      • デプロイメント名とプロジェクト
      • EMQXバージョン

    右下のDeployをクリックします。

    create_broker_deployment

    詳細はDedicated Flexデプロイメントの作成を参照してください。

  5. EMQX Tablesの下で、+ New Deploymentをクリックします。

  6. デフォルトのStarterプランを選択し、ブローカーのデプロイメントと同じクラウドプロバイダーとリージョンを選択します。

  7. (任意)Network Associationで既存のネットワークをドロップダウンから選択します。同じクラウドプラットフォームかつ同じリージョンにあるため、ブローカーのネットワークがリストに表示されます。これを選択すると両デプロイメントが同じネットワークを共有し、プライベート接続で通信できます。

  8. Tierを選択します。

  9. (任意)Deployment Nameを入力します。

  10. Deployをクリックします。詳細はEMQX Tablesデプロイメントの作成を参照してください。

new_table_deployment

デプロイメントが作成されたら、プロジェクト内のデプロイメントカードをクリックしてデプロイメントに入れます。

EMQX Tablesのデプロイメントに入り、左メニューのData Explorerをクリックすると、publicというデフォルトデータベースが利用可能なことが確認できます。

ブローカーとTables間の接続

接続方法はブローカーのデプロイメントタイプとネットワーク設定によって異なります:

デプロイメントタイプ条件接続方法必要な対応備考
Dedicated / Dedicated FlexTablesと同じクラウドプラットフォーム、リージョン、ネットワークプライベート(安全で低レイテンシ)2つ目のデプロイメント作成時にNetwork Associationで既存ネットワークを選択。プロジェクトのネットワーク管理で共有ネットワークを確認可能。各ネットワークは最大1つのブローカーと1つのTablesデプロイメントをサポート。
Dedicated / Dedicated FlexTablesと異なるリージョンまたはネットワークTLSを使ったパブリックインターネットブローカーのネットワーク管理設定でNAT Gatewayを有効化。
ServerlessTLSを使ったパブリックインターネット対応不要ネットワークアソシエーションとNAT Gatewayは適用外。

データ取り込み用ルールの作成

ルールエンジンを使ってMQTTメッセージをEMQX Tablesに取り込み、永続化します。

  1. Dedicated Flex(またはServerless)デプロイメントに入り、Data Integrationに移動します。

  2. 初めてコネクターを作成する場合は、コネクター一覧からEMQX Tablesを選択します。すでにコネクターがある場合は、+ New ConnectorをクリックしてEMQX Tablesを選択します。

    create_connector

  3. Quick Setup(デフォルト)を選び、プロジェクト内のTablesデプロイメントを選択します。

    connector_quick_setup

  4. Testをクリックして接続を検証します。成功メッセージが表示されます。

  5. Newをクリックし、New Ruleを選択してルール作成に進みます。

  6. SQL EditorでSQLルールを定義します。

    ルールSQL例

    sql
    SELECT
      timestamp as ts,
      payload.machine_id as machine_id,
      payload.production_line as production_line,
      payload.temperature as temperature,
      payload.vibration as vibration,
      payload.machine_status as machine_status
    FROM "factory/+/metrics"

    このルールは、トピック factory/+/metrics にマッチするすべてのMQTTメッセージのペイロードからフィールドを抽出します。抽出した値には別名を付けて、ルールアクションのWrite Syntax設定で参照可能にします。timestampフィールドはtsにマッピングされ、EMQX Tablesに記録する際の時間インデックスになります。

  7. Nextをクリックしてルールにアクションを追加します。アクション設定を行います:

    • Connector:作成したEMQX Tablesコネクターを選択

    • Time Precisionmillisecond

    • Write Syntax

      text
      machine_metrics,production_line=${production_line},machine_id=${machine_id} temperature=${temperature},vibration=${vibration},machine_status=${machine_status} ${ts}

      この構文は、machine_metricsテーブルが存在しない場合は自動作成し、InfluxDB Line Protocol形式でデータを書き込みます。

      • タグproduction_linemachine_id(ディメンションかつ主キー)
      • フィールドtemperaturevibrationmachine_status(実際のメトリクス値)
      • タイムスタンプ${ts} はメッセージから抽出したtimestampを使い時系列を整合
  8. Confirmをクリックしてルールを保存します。

    Data Integrationページに戻ると、作成したコネクター、ルール、アクションが表示されます。

    create_rule

MQTTメッセージのパブリッシュ

簡単にテストするには、Dedicated Flex(またはServerless)デプロイメント内の組み込み診断ツールで、左メニューのOnline Testを使います。

  1. Online Testでユーザー名とパスワード、または自動生成された認証情報でデプロイメントに接続します。

  2. Messagesセクションで以下の2つのメッセージを送信します:

TIP

テーブルが事前に作成されていない場合、EMQX Cloudは最初に正常に書き込まれたメッセージのデータ型を基に自動でテーブルを作成します。一度テーブルとカラムが作成されると、以降の書き込みは同じデータ型を使う必要があり、異なる型の場合は書き込みが失敗します。

  • トピックfactory/A/metrics

  • ペイロード

    json
    {
      "machine_id": "M001",
      "production_line": "A",
      "temperature": 36.5,
      "vibration": 0.03,
      "machine_status": "running"
    }
    json
    {
      "machine_id": "M002",
      "production_line": "A",
      "temperature": 39.1,
      "vibration": 0.06,
      "machine_status": "warning"
    }

publish_messages

EMQX Tablesでのデータクエリ

  1. EMQX Tablesデプロイメントに入ります。

  2. 左メニューのData Explorerをクリックします。

  3. エディターに以下のSQLを入力し、Run Queryをクリックします:

    sql
    SELECT * FROM machine_metrics;

    取り込んだメッセージが表示されます。

    query_test_data

完成:ネイティブMQTTからDBへのパイプライン

これでライブパイプラインが完成しました:

MQTTクライアント -> EMQX ブローカー -> ルールエンジン -> EMQX Tables -> SQL分析

サードパーティのインフラは不要で、完全マネージドかつ時系列IoTワークロードに最適化されています。

次はGrafanaやStreamlitでメトリクスを可視化できます。詳細は統合ガイドを参照してください。

データベース機能のクイックガイド

EMQX Brokerからデフォルトのpublicデータベースへのデータ取り込みに加え、EMQX Tablesではカスタムデータベースやテーブルを定義し、SQLで手動挿入やクエリが可能です。これによりテストや開発の柔軟性が向上します。

カスタムデータベースの作成

デフォルトのpublicとは別にカスタムデータベースを作成できます。

  1. デプロイメントのData Explorerページに移動します。

  2. 以下のSQLを入力し、Run Queryをクリックします:

    sql
    CREATE DATABASE factory WITH (ttl='7d');

これで、データ保持期間(TTL)が7日のfactoryという新しいデータベースが作成されます。

create_custom_database

テーブルの作成

新しいデータベース内に、工場のメトリクスを保存する時系列テーブルを定義します。

Data Explorerで以下のSQLを入力し、Run Queryをクリックします:

sql
CREATE TABLE factory.machine_metrics (
    ts TIMESTAMP NOT NULL,
    production_line STRING,
    machine_id STRING,
    temperature DOUBLE,
    vibration DOUBLE,
    machine_status STRING DEFAULT 'running',
    TIME INDEX (ts),
    PRIMARY KEY (production_line, machine_id)
) WITH (
    ttl='7d'
);

このテーブルはtsを時間インデックスに、production_linemachine_idの複合主キーを持ちます。

SQLを使ったデータ挿入

EMQX TablesはSQLベースとLine Protocolベースの両方のデータ取り込みをサポートします。.txt.lpファイルのアップロードによる書き込みも可能です。

Data Explorerで以下のコマンドを実行してサンプルデータを挿入します:

sql
INSERT INTO factory.machine_metrics (ts, production_line, machine_id, temperature, vibration, machine_status)
VALUES
    (now(), 'A', 'M001', 36.5, 0.03, 'running'),
    (now(), 'A', 'M002', 39.1, 0.06, 'warning'),
    (now(), 'B', 'M010', 37.2, 0.02, 'running'),
    (now(), 'B', 'M011', 45.6, 0.12, 'error');

now()は現在のタイムスタンプを挿入します。

データのクエリ

データを確認するにはData Explorerを使用します。

クエリ例

すべてのレコードを表示:

sql
SELECT * FROM factory.machine_metrics;

query_all_records

直近60分の平均温度をラインと状態別に集計:

sql
SELECT production_line, machine_status, AVG(temperature) AS avg_temp
FROM factory.machine_metrics
WHERE ts > now() - INTERVAL '60 minute'
GROUP BY production_line, machine_status;

query_by_time_limits

特定デバイスでフィルタリング:

sql
SELECT ts, temperature
FROM machine_metrics
WHERE machine_id = 'M001'
ORDER BY ts DESC
LIMIT 10;

query_filter_by_device

クイッククエリで高速アクセス

テーブルスキーマからSQLスニペットを素早く生成できます:

  1. Data Explorerの左側スキーマパネルでカラムにカーソルを合わせます。
  2. カラム横の**縦三点リーダー(︙)**をクリックします。
  3. Quick Queryを選択し、
    • Query columnQuery maxQuery minなどのオプションを使います。
  4. 生成されたSQLが自動的にエディターに表示されます。

quick_query

リソース

サポートされているSQL文と句についてはGreptimeのドキュメントを参照してください。

より詳細なデータクエリ手順はEMQX Tablesでのデータクエリをご覧ください。