Skip to content

クイックスタート:MQTTデータをEMQX Tablesに取り込む

このガイドでは、外部データベースに依存せずに、EMQX ブローカーとEMQX Tablesを使って完全なMQTTからデータベースへのパイプラインを作成する手順を説明します。EMQX Tablesは、EMQX Cloudプラットフォームに組み込まれた完全マネージドのネイティブ時系列データベースで、リアルタイムのIoTデータ処理に最適化されています。

本ガイドで学べること:

  • EMQX ブローカーとEMQX Tablesのデプロイメント作成
  • カスタムEMQX Tablesユーザーの作成
  • データ統合を使ったMQTTデータのEMQX Tablesへの取り込み
  • 組み込みのData ExplorerでSQLによる時系列データのクエリ
  • 必要に応じたカスタムデータベースとテーブルの作成

注意

本ドキュメントのスクリーンショットは参考用です。

EMQX Cloudは継続的に進化しており、コンソールUIが更新・改善されることがあります。そのため、一部のスクリーンショットが最新のインターフェースと完全に一致しない場合がありますが、全体のワークフローや機能は変わりません。

EMQX Tables 無料トライアル

EMQX Tablesは、ネイティブなMQTTからデータベースへの取り込みと時系列分析を無料で評価できるトライアルを提供しています。

クォータと期間

EMQX Tables 無料トライアルには以下が含まれます:

  • 14日間の無料トライアル期間
  • 100 GBのアウトバウンドトラフィック
  • 100 GBのストレージ容量

有効期限

  • 3日間連続でアクティブな接続がないトライアルデプロイメントは自動的に停止されます。
  • トライアル終了時にデプロイメントは即時停止します。
  • インスタンスは期限切れから3日後に削除されます。
  • 削除後、すべてのデータは完全に消去されます。

トライアル終了後もEMQX Tablesを継続利用するには、期限切れ前に有料プランにアップグレードしてください。

ユースケース:スマートファクトリーモニタリング

具体例として、工場のデバイスが定期的に以下の時系列テレメトリデータを報告するケースを考えます:

  • machine_id:デバイス識別子
  • production_line:所属する生産ライン
  • temperature:温度計測値
  • vibration:振動強度
  • machine_status:稼働状態(例:稼働中、警告、エラー)
  • ts:計測のタイムスタンプ

このデータをMQTT経由で取り込み、EMQX Tablesにネイティブに保存してモニタリングや分析、アラートに活用したい場合の例です。

注意

サーバレスデプロイメントでもEMQX Tablesへのデータ取り込みが可能です。本ガイドではDedicated Flexを例にしています。サーバレスの場合、データ統合の手順は同じですが、EMQX Tablesへの接続はTLSを用いたパブリックインターネット経由となり、ネットワークアソシエーションやNATゲートウェイは適用されません。

EMQX ブローカーとEMQX Tablesのデプロイメント作成

  1. EMQX Cloudコンソールにログインします。

  2. 既存のプロジェクトを選択するか、新規作成します。

  3. EMQX Brokersの下で、+ New Deploymentをクリックします。

  4. Dedicated Flexプランを選択し、仕様を設定します。

    • 必要に応じてクラウドプロバイダーとリージョンを選択します。
    • 以下のオプションはデフォルトのままで構いません(クイックデモ用)。
      • Tier
      • Subscribe to Smart Data Hub(任意)
      • Deployment Name & Project
      • EMQX Version

    右下のDeployをクリックします。

    create_broker_deployment

    詳細はDedicated Flexデプロイメントの作成を参照してください。

  5. EMQX Tablesの下で、+ New Deploymentをクリックします。

  6. デフォルトのStarterプランを選択し、ブローカーと同じクラウドプロバイダーとリージョンを選びます。

  7. (任意)Network Associationで既存のネットワークをドロップダウンから選択します。同じクラウドプラットフォームかつ同リージョンのため、ブローカーのネットワークがリストに表示されます。これを選択すると両デプロイメントが同じネットワークを共有し、プライベート接続で通信可能になります。

  8. Tierを選択します。

  9. (任意)Deployment Nameを入力します。

  10. Deployをクリックします。詳細はEMQX Tablesデプロイメントの作成を参照してください。

new_table_deployment

デプロイメント作成後、プロジェクト内のデプロイメントカードをクリックしてデプロイメントに入れます。

EMQX Tablesのデプロイメントに入り、左メニューのData Explorerをクリックすると、デフォルトのpublicデータベースが利用可能であることが確認できます。

データ取り込み用ユーザーの作成

新しいEMQX Tablesのデプロイメントはカスタムユーザーと権限をサポートしています。デプロイメント詳細にデフォルトのユーザー名やパスワードは表示されません。ブローカーデータ統合を作成する前に、対象データベースに書き込み可能なユーザーを作成してください。

注意

古いEMQX Tablesデプロイメントはカスタムユーザーや権限設定をサポートしていません。古いデプロイメントの場合、利用可能であればデプロイメント詳細に表示されるデフォルト認証情報を使用してください。

  1. EMQX Tablesデプロイメントで、左メニューのUser Managementをクリックします。

  2. + Add Userをクリックします。

  3. ユーザー名とパスワードを入力します。

  4. PrivilegesRead & Writeを選択します。

    EMQX ブローカーからのデータ取り込みには最低でもSqlInsert権限が必要です。Read & WriteSqlInsertSqlSelectの両方を含むため、同じユーザーで書き込みと検証用のクエリが可能です。

  5. Access Controlでこのユーザーがアクセス可能なデータベースを選択します。クイックスタートではpublicデータベースを選択します。カスタムデータベースを作成している場合はそちらを選択してください。

  6. Confirmをクリックします。

emqx_tables_add_user

作成したユーザーはUser Managementページから管理できます。

emqx_tables_user_entry

権限やアクセス制御モードの詳細はユーザー管理を参照してください。

ブローカーとTables間の接続

接続方法はブローカーのデプロイメントタイプとネットワーク設定によって異なります:

デプロイメントタイプ条件接続方法必要な操作備考
Dedicated / Dedicated FlexTablesと同じクラウドプラットフォーム、リージョン、ネットワークプライベート(安全・低レイテンシ)2つ目のデプロイメント作成時にNetwork Associationで既存ネットワークを選択。プロジェクトのネットワーク管理で共有ネットワークを確認可能。各ネットワークは最大1つのブローカーと1つのTablesデプロイメントをサポート。
Dedicated / Dedicated FlexTablesと異なるリージョンまたはネットワークTLSを用いたパブリックインターネットブローカーのNetwork Management設定でNAT Gatewayを有効化。
ServerlessTLSを用いたパブリックインターネット操作不要。ネットワークアソシエーションとNAT Gatewayは適用外。

データ取り込み用ルールの作成

ルールエンジンを使ってMQTTメッセージをEMQX Tablesに取り込み、永続化します。

  1. Dedicated Flex(またはServerless)デプロイメントに入り、Data Integrationに移動します。

  2. 初めてのコネクターの場合は、コネクター一覧からEMQX Tablesを選択します。既にコネクターがある場合は**+ New ConnectorをクリックしてEMQX Tables**を選択します。

    create_connector

  3. Quick Setup(デフォルト)を選択し、プロジェクト内のTablesデプロイメントを選びます。

  4. 接続設定を入力します:

    connector_quick_setup

  5. Testをクリックして接続を検証します。成功メッセージが表示されます。

  6. Newをクリックし、New Ruleを選択してこのコネクターを使ったルール作成に進みます。

  7. SQL EditorでSQLルールを定義します。

    ルールSQL例

    sql
    SELECT
      timestamp as ts,
      payload.machine_id as machine_id,
      payload.production_line as production_line,
      payload.temperature as temperature,
      payload.vibration as vibration,
      payload.machine_status as machine_status
    FROM "factory/+/metrics"

    このルールは、トピックfactory/+/metricsにマッチするすべてのMQTTメッセージのペイロードからフィールドを抽出します。抽出した値にはエイリアスを付けて、ルールアクションの書き込み構文で参照可能にしています。timestampフィールドはtsにマッピングされ、EMQX Tablesに記録する際の時刻インデックスとなります。

  8. Nextをクリックしてルールにアクションを追加します。アクション設定を行います:

    • Connector:作成したEMQX Tablesコネクターを選択

    • Time Precisionmillisecond

    • Write Syntax

      text
      machine_metrics,production_line=${production_line},machine_id=${machine_id} temperature=${temperature},vibration=${vibration},machine_status=${machine_status} ${ts}

      この構文は、machine_metricsテーブルが存在しない場合は自動作成し、InfluxDB Line Protocol形式でデータを書き込みます。

      • タグproduction_line, machine_id(ディメンションおよび主キーとして使用)
      • フィールドtemperature, vibration, machine_status(実際のメトリクス値)
      • タイムスタンプ${ts}はメッセージから抽出したtimestampを使い時系列を整合
  9. Confirmをクリックしてルールを保存します。

    Data Integrationページに戻ると、作成したコネクター、ルール、アクションが表示されます。

    create_rule

MQTTメッセージのパブリッシュ

クイックテストには、Dedicated Flex(またはServerless)デプロイメント内蔵の診断ツールのOnline Testを使えます。

  1. Online Testでユーザー名とパスワード、または自動生成認証でデプロイメントに接続します。

  2. Messagesセクションで以下の2つのメッセージを送信します:

TIP

テーブルが事前に作成されていない場合、EMQX Cloudは最初に正常に書き込まれたメッセージのデータ型に基づいて自動的にテーブルを作成します。テーブルとカラムが作成された後は、すべての書き込みで同じデータ型を使用する必要があり、異なる場合は書き込みが失敗します。

  • Topicfactory/A/metrics

  • Payload

    json
    {
      "machine_id": "M001",
      "production_line": "A",
      "temperature": 36.5,
      "vibration": 0.03,
      "machine_status": "running"
    }
    json
    {
      "machine_id": "M002",
      "production_line": "A",
      "temperature": 39.1,
      "vibration": 0.06,
      "machine_status": "warning"
    }

publish_messages

EMQX Tablesでのデータクエリ

  1. EMQX Tablesデプロイメントにアクセスします。

  2. 左メニューのData Explorerをクリックします。

  3. エディターに以下のSQLを入力し、Run Queryをクリックします:

    sql
    SELECT * FROM machine_metrics;

    取り込まれたメッセージが表示されます。

    query_test_data

完成:ネイティブMQTTからDBへのパイプライン

これで以下のライブパイプラインが完成しました:

MQTTクライアント -> EMQX ブローカー -> ルールエンジン -> EMQX Tables -> SQL分析

サードパーティのインフラは不要で、完全マネージドかつ時系列IoTワークロード向けに設計されています。

次はGrafanaやStreamlitでメトリクスを可視化できます。詳細は統合ガイドを参照してください。

データベース機能のクイックガイド

EMQX Brokerからデフォルトのpublicデータベースへのデータ取り込みに加え、EMQX Tablesではカスタムデータベースやテーブルの定義、SQLによる手動データ挿入、組み込みのData Explorerでのクエリが可能です。これによりテストや開発の柔軟性が向上します。

カスタムデータベースの作成

デフォルトのpublicデータベースとは別にカスタムデータベースを作成できます。

  1. デプロイメントのData Explorerページに移動します。

  2. 以下のSQLを入力し、Run Queryをクリックします:

    sql
    CREATE DATABASE factory WITH (ttl='7d');

これにより、データ保持期間(TTL)が7日のfactoryという名前の新しいデータベースが作成されます。

create_custom_database

テーブルの作成

新しいデータベース内に工場のメトリクスを格納する時系列テーブルを定義します。

Data Explorerで以下のSQLを入力し、Run Queryをクリックします:

sql
CREATE TABLE factory.machine_metrics (
    ts TIMESTAMP NOT NULL,
    production_line STRING,
    machine_id STRING,
    temperature DOUBLE,
    vibration DOUBLE,
    machine_status STRING DEFAULT 'running',
    TIME INDEX (ts),
    PRIMARY KEY (production_line, machine_id)
) WITH (
    ttl='7d'
);

このテーブルはtsを時刻インデックスに、production_linemachine_idの複合主キーを持ちます。

SQLによるデータ挿入

EMQX TablesはSQLベースとLine Protocolベースの両方のデータ取り込みをサポートし、.txt.lpファイルのアップロードも可能です。

Data Explorerで以下のコマンドを実行してサンプルデータを挿入します:

sql
INSERT INTO factory.machine_metrics (ts, production_line, machine_id, temperature, vibration, machine_status)
VALUES
    (now(), 'A', 'M001', 36.5, 0.03, 'running'),
    (now(), 'A', 'M002', 39.1, 0.06, 'warning'),
    (now(), 'B', 'M010', 37.2, 0.02, 'running'),
    (now(), 'B', 'M011', 45.6, 0.12, 'error');

now()は現在のタイムスタンプを挿入します。

データのクエリ

データ検証にはData Explorerを使用します。

クエリ例

すべてのレコードを表示:

sql
SELECT * FROM factory.machine_metrics;

query_all_records

直近60分の平均温度をラインと状態別に集計:

sql
SELECT production_line, machine_status, AVG(temperature) AS avg_temp
FROM factory.machine_metrics
WHERE ts > now() - INTERVAL '60 minute'
GROUP BY production_line, machine_status;

query_by_time_limits

特定デバイスで絞り込み:

sql
SELECT ts, temperature
FROM machine_metrics
WHERE machine_id = 'M001'
ORDER BY ts DESC
LIMIT 10;

query_filter_by_device

クイッククエリで高速アクセス

テーブルスキーマからSQLスニペットを素早く生成できます:

  1. Data Explorerの左側スキーマパネルでカラムにカーソルを合わせます。
  2. カラム横の縦三点リーダー(︙)をクリックします。
  3. Quick Queryを選択し、Query columnQuery maxQuery minなどを選びます。
  4. 生成されたSQLが自動的にエディターに表示されます。

quick_query

リソース

サポートされているSQL文と句についてはGreptimeのドキュメントを参照してください。

より詳細なクエリ方法はEMQX Tablesでのデータクエリをご覧ください。