GreptimeDBへのMQTTデータ取り込み
GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの利点を享受できます。EMQXプラットフォームは、主流のGreptimeDB、GreptimeCloud、GreptimeDB Enterpriseとの接続をサポートしています。
本ページでは、EMQXプラットフォームとGreptimeDB間のデータ統合について、実践的な手順を交えて包括的に紹介します。
動作概要
GreptimeDBデータ統合はEMQXプラットフォームに組み込まれた機能であり、EMQXプラットフォームのリアルタイムデータ取得・送信能力とGreptimeDBのデータ保存・分析能力を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからGreptimeDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。ワークフローは以下の通りです。
以下の図は、EMQXプラットフォームとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。
- メッセージのパブリッシュと受信:産業機器はMQTTプロトコルを用いてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータフォーマットの変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
- GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをGreptimeDBに書き込む操作が実行されます。GreptimeDBアクションはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むデータフォーマット定義が可能です。
エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば:
- Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示する。
- ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
- ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を促進する。
特長と利点
GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。
- 使いやすさ:EMQXプラットフォームとGreptimeDBはどちらも開発者に優しい設計です。EMQXプラットフォームは標準のMQTTプロトコルに加え、多様な認証、認可、クラスタリング機能を標準で提供します。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を備えています。両者の統合により、ビジネス統合と開発のスピードアップが期待できます。
- 効率的なデータ処理:EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに負担をかけずに満たします。
- メッセージ変換:メッセージはEMQXプラットフォームのルール内で豊富な処理・変換を経てからGreptimeDBに書き込まれます。
- 効率的なストレージとスケーラビリティ:EMQXプラットフォームとGreptimeDBはどちらもクラスターのスケールアウト機能を備え、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
- 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリ・分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を抽出できます。
はじめる前に
このセクションでは、GreptimeDBデータ統合の作成を始める前に必要な準備、特にGreptimeDBサーバーのインストール方法について説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
GreptimeDBサーバーのインストール
Dockerを使ってGreptimeDBをインストールし、Dockerイメージを起動します。
bash# GreptimeDBのDockerイメージを起動する docker run -p 4000-4004:4000-4004 \ -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \ --name greptime --rm \ greptime/greptimedb standalone start \ --http-addr 0.0.0.0:4000 \ --rpc-addr 0.0.0.0:4001 \ --mysql-addr 0.0.0.0:4002 \ --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
user-provider
パラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれ
greptime_user
とgreptime_pwd
です。
コネクターの作成
データ統合ルールを作成する前に、GreptimeDBサーバーにアクセスするためのGreptimeDBコネクターを作成する必要があります。
デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるGreptimeDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリの下のGreptimeDBを選択します。
コネクター名はシステムが自動生成します。
接続情報を入力します:
- サーバーホスト:
{host}:4001
を入力します。GreptimeCloudに接続する場合はポートを443にして{url}:443
を入力してください。 - データベース:
public
を入力します。GreptimeCloudに接続する場合はサービス名を入力します。 - ユーザー名とパスワード:
greptime_user
とgreptime_pwd
を入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudの場合はサービスのユーザー名とパスワードを入力してください。 - TLSを有効化:暗号化接続を確立したい場合はトグルスイッチをオンにします。
- ビジネス要件に応じて詳細設定を行います(任意)。
- サーバーホスト:
テストボタンをクリックします。GreptimeDBサービスにアクセス可能であれば、connector availableのメッセージが返されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
このセクションでは、EMQXプラットフォームコンソールを使ってGreptimeDBルールを作成し、ルールにアクションを追加する方法を示します。
ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
利用したい機能に基づいてSQLエディターでルールを設定します。ここではクライアントが
temp_hum/emqx
トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目標とします。SQLは以下のように記述します。sqlSELECT timestamp, clientid, payload FROM "temp_hum/emqx"
TIP
初心者の方はSQL例をクリックし、テストを有効化してSQLルールの学習とテストを行うことを推奨します。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
書き込み構文を設定します。これはテキストベースのフォーマットで、データポイントの計測名、タグ、フィールド、タイムスタンプを指定し、InfluxDB line protocolの構文に準拠したプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータフォーマットをサポートしています。
本チュートリアルの例は以下の構文です。sqlmyMeasurement,tag1=${clientid} fieldKey=${payload}
TIP
- GreptimeDBに符号付き整数型の値を書き込む場合、プレースホルダーの後に
i
を型識別子として付加します。例:${payload.int}i
- 符号なし整数型の場合は
u
を付加します。例:${payload.int}u
- GreptimeDBに符号付き整数型の値を書き込む場合、プレースホルダーの後に
時間精度を指定します。デフォルトは
millisecond
です。詳細設定を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します(任意)。
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功ポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
温度・湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
ルールの稼働状況を確認し、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。
GreptimeDBダッシュボードで
SQL
を使い、メッセージがGreptimeDBに書き込まれているかを確認できます。