EMQX Tables に MQTT データを取り込む
EMQX Tables は、EMQX Cloud に組み込まれたネイティブで完全管理された時系列データストレージサービスです。MQTT データの高スループットかつ低レイテンシの取り込みと分析に最適化されており、IoT(Internet of Things)ユースケースに理想的です。
GreptimeDB を基盤としており、EMQX ブローカーとシームレスに統合され、InfluxDB Line Protocol をサポートすることで、テレメトリデータの効率的な保存、クエリ、および可視化を可能にします。
詳細は、EMQX Tables 概要をご覧ください。
本ガイドでは、以下の手順で MQTT データを EMQX Tables に取り込む方法を説明します。
- コネクターの作成
- Line Protocol ライターを使ったルールの作成
- データ取り込みのテストと結果のクエリ
前提条件
以下のデプロイメントを作成していること:
- EMQX ブローカーのデプロイメント:サーバレス、[専用] または 専用 Flex
- EMQX Tables デプロイメント
以下に習熟していること:
ブローカーと Tables 間の接続性
接続方法は、ブローカーのデプロイメントタイプとネットワーク構成によって異なります。
| デプロイメントタイプ | 条件 | 接続方法 | 必要な対応 |
|---|---|---|---|
| 専用 / 専用 Flex | Tables と同じクラウドプラットフォーム、リージョン、ネットワーク | プライベート(安全で低レイテンシ) | プロジェクトレベルネットワーク管理 によるネットワーク共有設定を行う。 |
| 専用 / 専用 Flex | Tables と異なるリージョンまたはネットワーク | TLS 上のパブリックインターネット | ブローカーのデプロイメントの「ネットワーク管理」設定で NAT Gateway を有効にする。 |
| サーバレス | — | TLS 上のパブリックインターネット | 対応不要。ネットワーク関連の設定や NAT Gateway は不要。 |
EMQX Tables コネクターの作成
データを書き込む前に、EMQX Tables へのコネクターを作成します。新しい EMQX Tables デプロイメントの場合は、コネクター設定前に EMQX Tables ユーザーを作成してください。新しいデプロイメントでは、デプロイメント詳細にデフォルトのユーザー名やパスワードは返されません。詳細は ユーザー管理 を参照してください。
EMQX ブローカーのデプロイメントに移動し、左メニューから データ統合 をクリックします。
初めてコネクターを作成する場合は EMQX Tables を探します。既にコネクターを作成済みの場合は、+ 新規コネクター をクリックし、EMQX Tables を選択します。
新規コネクター ページで、コネクター名 は自動生成されます。以下の2つのセットアップモードから選択してください。


適切なユーザーが存在しない場合は、ユーザー名ドロップダウンの ユーザー追加 をクリックして EMQX Tables の ユーザー管理 ページを開き、ユーザーを作成してください。詳細は ユーザー管理 を参照してください。
:::
テスト をクリックして接続性を検証します。EMQX Tables サービスにアクセス可能で、選択したユーザーに必要な書き込み権限があれば、成功メッセージが表示されます。
新規作成 をクリックして作成を完了します。これで、このコネクターを使ったルール作成に進めます。
EMQX Tables へのデータ取り込み用ルールの作成
次に、書き込むデータを指定し、EMQX Tables に書き込むためのアクションをルールに追加します。
ルール セクションで 新規ルール をクリックするか、コネクター横の アクション アイコンを使用します。
SQL エディターで SQL ルールを定義します。この例では、クライアントが
temp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目的としています。以下のように SQL を設定します。sqlSELECT timestamp, payload.location as location, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は Try It Out をクリックして、SQL ルールの学習とテストを行ってください。
次へ をクリックして、ルールにアクションを追加します。
コネクター ドロップダウンから先ほど作成したコネクターを選択します。
時間精度 を
millisecond(デフォルト)に設定します。書き込み構文 を設定し、EMQX Tables へのデータ解析と書き込みに使う Line Protocol フォーマットを定義します。
測定値、タグセット、フィールドセット、データポイントのタイムスタンプ、およびサポートされるプレースホルダーを指定するテキストベースのフォーマットを指定してください。詳細は InfluxDB 2.3 Line Protocol および InfluxDB 1.8 Line Protocol を参照してください。
例:
temp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}TIP
- 符号付き整数型の値を書き込む場合は、プレースホルダーの後に型識別子
iを付けます。例:${payload.int}i。詳細は InfluxDB 1.8 整数値の書き込み を参照してください。 - 符号なし整数型の値を書き込む場合は、プレースホルダーの後に型識別子
uを付けます。例:${payload.int}u。詳細は同上リンクを参照してください。
- 符号付き整数型の値を書き込む場合は、プレースホルダーの後に型識別子
確定 をクリックしてルールを保存します。
新規ルール作成成功 のポップアップで ルールに戻る をクリックし、ルール作成を完了します。

ルールのテストとデータのクエリ
MQTTX などのクライアントツールを使って温度と湿度のデータ送信をシミュレートすることを推奨します。簡単なデモとしては、ブローカーのデプロイメント内にある組み込みの診断ツールを使い、左メニューの オンラインテスト をクリックして利用できます。
オンラインテスト で、ユーザー名とパスワード、または自動生成された認証情報を使ってデプロイメントに接続します。
メッセージ セクションで以下のメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": 27.5, "hum": 41.8, "location": "Prague" }

EMQX Tables のデプロイメントに移動し、左メニューから データエクスプローラー をクリックします。
以下の SQL を実行して、
publicテーブルに取り込まれたデータをクエリします。sqlselect * from "temp_hum"クエリ結果 テーブルに1件のレコードが表示されるはずです。

EMQX ブローカーのデプロイメントでルール統計情報を確認します。ルール一覧のルール ID をクリックすると、そのルールおよび関連アクションの統計情報が表示されます。

