GreptimeDBへのMQTTデータ取り込み
GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの恩恵を受けられます。EMQX Cloudは現在、GreptimeDBの主流バージョンであるGreptimeCloudやGreptimeDB Enterpriseとの接続をサポートしています。
本ページでは、EMQX CloudとGreptimeDB間のデータ統合について、作成方法と検証方法を含めて包括的に解説します。
動作概要
GreptimeDBデータ統合は、EMQX Cloudに組み込まれた機能であり、EMQX Cloudのリアルタイムデータキャプチャおよび送信機能と、GreptimeDBのデータ保存・分析機能を組み合わせたものです。組み込みのルールエンジンコンポーネントにより、EMQX CloudからGreptimeDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。ワークフローは以下の通りです。
以下の図は、EMQX CloudとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。

- メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着すると、ルールエンジンを通過し、対応するルールと照合してメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
- GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーされると、メッセージをGreptimeDBに書き込む操作が実行されます。GreptimeDBアクションはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むためのデータ形式を定義できます。
エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば:
- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフ化・表示を行う。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整に活用する。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を促進する。
特長とメリット
GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。
- 使いやすさ:EMQX CloudとGreptimeDBは共に開発者に優しい設計です。EMQX Cloudは標準のMQTTプロトコルに加え、多様な認証・認可・クラスタリング機能を提供します。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を持ちます。両者の統合により、ビジネス統合や開発のスピードを加速できます。
- 効率的なデータ処理:EMQX Cloudは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータの書き込み、保存、クエリに優れ、IoTシナリオのデータ処理要件をシステム負荷をかけずに満たします。
- メッセージ変換:メッセージはEMQX Cloudのルール内で多彩な処理や変換を経てからGreptimeDBに書き込まれます。
- 効率的なストレージとスケーラビリティ:EMQX CloudとGreptimeDBは共にクラスターのスケールアウト機能を持ち、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
- 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻なインサイトを抽出できます。
はじめる前に
このセクションでは、GreptimeDBデータ統合の作成に先立ち必要な準備について説明します。GreptimeDBサーバーのインストール方法も含みます。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
GreptimeDBサーバーのインストール
Docker経由でGreptimeDBをインストールし、Dockerイメージを起動します。
bash# GreptimeDB Dockerイメージの起動 docker run -p 4000-4004:4000-4004 \ -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \ --name greptime --rm \ greptime/greptimedb standalone start \ --http-addr 0.0.0.0:4000 \ --rpc-addr 0.0.0.0:4001 \ --mysql-addr 0.0.0.0:4002 \ --user-provider=static_user_provider:cmd:greptime_user=greptime_pwduser-providerパラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれ
greptime_userとgreptime_pwdです。
コネクターの作成
データ統合ルールを作成する前に、GreptimeDBサーバーへアクセスするためのGreptimeDBコネクターを作成する必要があります。
デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるGreptimeDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリのGreptimeDBを選択します。
コネクター名はシステムが自動生成します。
接続情報を入力します:
- サーバーホスト:
{host}:4001を入力します。GreptimeCloudに接続する場合はポートを443にして{url}:443を入力してください。 - データベース:
publicを入力します。GreptimeCloudに接続する場合はサービス名を入力してください。 - ユーザー名とパスワード:
greptime_userとgreptime_pwdを入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudに接続する場合はサービスのユーザー名とパスワードを入力してください。 - TLSを有効にする:暗号化接続を確立したい場合はトグルスイッチをオンにします。
- ビジネス要件に応じて詳細設定を行います(任意)。
- サーバーホスト:
テストボタンをクリックします。GreptimeDBサービスにアクセス可能であれば、コネクター利用可能のメッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
このセクションでは、GreptimeDBルールの作成とEMQX Cloudコンソールを通じたルールへのアクション追加方法を示します。
ルールエリアの新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
使用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqxトピックに温度・湿度メッセージを送信した際にエンジンをトリガーすることを目標とします。SQLは以下のように記述します。sqlSELECT timestamp, clientid, payload FROM "temp_hum/emqx"TIP
初心者の方はSQL例をクリックし、Try It OutでSQLルールの学習とテストを行うことを推奨します。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。
書き込み構文を設定します。これは、データポイントの計測名、タグ、フィールド、タイムスタンプをテキストベースで指定する形式で、InfluxDBのLine Protocolの構文に準拠しプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータ形式をサポートしています。
本チュートリアルの例は以下の構文です。sqlmyMeasurement,tag1=${clientid} fieldKey=${payload}TIP
- GreptimeDBに符号付き整数型の値を書き込む場合は、プレースホルダーの後に
iをタイプ識別子として付加します。例:${payload.int}i - 符号なし整数型の値を書き込む場合は、
uを付加します。例:${payload.int}u
- GreptimeDBに符号付き整数型の値を書き込む場合は、プレースホルダーの後に
時間精度を指定します。デフォルトは
millisecondです。詳細設定を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します(任意)。
確定ボタンをクリックしてルール作成を完了します。
ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
温度・湿度データの送信シミュレーションにはMQTTXの使用を推奨しますが、他の任意のクライアントでも可能です。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxクライアントID:
test_clientペイロード:
json{ "temp": "27.5", "hum": "41.8" }
ルールの稼働状況を確認し、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。
GreptimeDBダッシュボードで
SQLを用いて、メッセージがGreptimeDBに書き込まれているかを確認できます。