Skip to content

EMQX TablesへのMQTTデータ取り込み

EMQX Tablesは、EMQX Cloudに組み込まれたネイティブでフルマネージドの時系列データストレージサービスです。MQTTデータの高スループットかつ低レイテンシな取り込みと分析に最適化されており、IoTユースケースに理想的です。

GreptimeDBを基盤としており、EMQX ブローカーとシームレスに統合され、InfluxDB Line Protocolをサポートすることで、テレメトリデータの効率的な保存、クエリ、可視化を可能にします。

詳細はEMQX Tables概要をご覧ください。

本ガイドでは、以下の手順でMQTTデータをEMQX Tablesに取り込む方法を説明します。

  • コネクターの作成
  • Line Protocolライターを用いたルールの作成
  • データ取り込みのテストと結果のクエリ

前提条件

以下のデプロイメントを作成済みであること:

以下に習熟していること:

ブローカーとTables間の接続性

接続方法はブローカーのデプロイメントタイプとネットワーク構成によって異なります。

デプロイメントタイプ条件接続方法必要な対応
専用 / 専用FlexTablesと同じクラウドプラットフォーム、リージョン、ネットワークプライベート(セキュアかつ低レイテンシ)プロジェクトレベルネットワーク管理でネットワーク共有を設定
専用 / 専用FlexTablesと異なるリージョンまたはネットワークTLS経由のパブリックインターネットブローカーのネットワーク管理設定でNATゲートウェイを有効化
サーバレスTLS経由のパブリックインターネット対応不要。ネットワーク関連設定やNATゲートウェイは不要

EMQX Tablesコネクターの作成

データを書き込む前に、EMQX Tablesへのコネクターを作成します。

  1. EMQX ブローカーのデプロイメントにアクセスし、左メニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合はEMQX Tablesを探します。既にコネクターを作成済みの場合は、+ 新規コネクターをクリックし、EMQX Tablesを選択します。

  3. 新規コネクターページで、コネクター名は自動生成されます。以下の2つのセットアップモードから選択します。

  4. テストをクリックして接続を検証します。EMQX Tablesサービスにアクセス可能であれば成功メッセージが表示されます。

  5. 新規作成をクリックして作成を完了します。これでこのコネクターを使ったルール作成に進めます。

EMQX Tablesへのデータ取り込み用ルールの作成

次に、書き込むデータを指定し、EMQX Tablesへ書き込むアクションを追加するルールを作成します。

  1. ルールセクションで新規ルールをクリックするか、コネクター横のアクションアイコンを使います。

  2. SQLエディターでSQLルールを定義します。この例では、クライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目的としています。以下のようにSQLを設定します。

    sql
      SELECT
        timestamp,
        payload.location as location,
        payload.temp as temp,
        payload.hum as hum
      FROM "temp_hum/emqx"

    TIP

    初心者の方はTry It OutをクリックしてSQLルールの学習とテストが可能です。

  3. 次へをクリックしてルールにアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 時間精度millisecond(デフォルト)に設定します。

  6. 書き込み構文を設定し、EMQX Tablesにデータを解析・書き込みするためのLine Protocolフォーマットを定義します。

    測定値、タグセット、フィールドセット、タイムスタンプを含むテキストベースのフォーマットを指定し、サポートされるプレースホルダーを利用します。詳細はInfluxDB 2.3 Line ProtocolおよびInfluxDB 1.8 Line Protocolを参照してください。

    例:

     temp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}

    TIP

    • 符号付き整数型の値を書き込む場合は、プレースホルダーの後にiを付けます。例:${payload.int}i。詳細はInfluxDB 1.8 整数値の書き込みを参照してください。
    • 符号なし整数型の値を書く場合は、プレースホルダーの後にuを付けます。例:${payload.int}u。詳細は同上リンクを参照してください。
  7. 確定をクリックしてルールを保存します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、ルール作成を完了します。

emqx_tables_rule_action

ルールのテストとデータのクエリ

MQTTXなどのクライアントツールを使って温度・湿度データの送信をシミュレートすることを推奨します。簡単なデモとしては、ブローカーの左メニューにあるオンラインテスト機能を使うことも可能です。

  1. オンラインテストで、ユーザー名とパスワードまたは自動生成された認証情報を使ってデプロイメントに接続します。

  2. メッセージ欄で以下のメッセージを送信します。

    • トピックtemp_hum/emqx

    • ペイロード

      json
      {
        "temp": 27.5,
        "hum": 41.8,
        "location": "Prague"
      }

    emqx_tables_online_test

  3. EMQX Tablesのデプロイメントに移動し、左メニューからデータエクスプローラーをクリックします。

  4. 以下のSQLを実行して、publicテーブルに取り込まれたデータをクエリします。

    sql
    select * from "temp_hum"

    クエリ結果テーブルに1件のレコードが表示されるはずです。

    emqx_tables_query

  5. EMQX ブローカーのデプロイメントでルール統計を確認します。ルール一覧のルールIDをクリックすると、そのルールおよび関連アクションの統計情報が表示されます。

    emqx_tables_rule_statistics