Skip to content

GreptimeDBへのMQTTデータ取り込み

GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの利点を享受できます。EMQXは現在、GreptimeDBの主流バージョンであるGreptimeCloudやGreptimeDB Enterpriseとの接続をサポートしています。

本ページでは、EMQXとGreptimeDB間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。

動作の仕組み

GreptimeDBデータ統合は、EMQXに組み込まれた機能であり、EMQXのリアルタイムデータキャプチャおよび送信機能とGreptimeDBのデータ保存・分析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからGreptimeDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。ワークフローは以下の通りです。

以下の図は、EMQXとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Integration GreptimeDB

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ラインの識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
  3. GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールは、メッセージをGreptimeDBに書き込む操作をトリガーします。GreptimeDB SinkはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むデータフォーマットの定義を可能にします。

エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示する。
  • ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を実施してデータ駆動型のエネルギー管理を支援する。

特長と利点

GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。

  • 使いやすさ:EMQXとGreptimeDBは共に開発者に優しい設計を提供します。EMQXは標準的なMQTTプロトコルに加え、多様な認証・認可・クラスタリング機能を備えています。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を持ちます。両者の統合により、ビジネス統合と開発のスピードアップが期待できます。
  • 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータの書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQXのルール内で豊富な処理・変換を経てからGreptimeDBに書き込まれます。
  • 効率的なストレージとスケーラビリティ:EMQXとGreptimeDBは共にクラスター拡張機能を持ち、ビジネスの成長に応じて柔軟な水平スケーリングが可能です。
  • 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリ・分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を引き出せます。

はじめる前に

このセクションでは、GreptimeDBデータ統合の作成を開始する前に完了すべき準備事項を説明します。GreptimeDBサーバーのインストール方法も含みます。

前提条件

GreptimeDBサーバーのインストール

  1. Docker経由でGreptimeDBをインストールし、Dockerイメージを起動します。

    bash
    # GreptimeDBのDockerイメージを起動する
    docker run -p 127.0.0.1:4000-4003:4000-4003 \
      -v "$(pwd)/greptimedb_data:/greptimedb_data" \
      --name greptime --rm \
      greptime/greptimedb:latest standalone start \
      --http-addr 0.0.0.0:4000 \
      --rpc-bind-addr 0.0.0.0:4001 \
      --mysql-addr 0.0.0.0:4002 \
      --postgres-addr 0.0.0.0:4003 \
      --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
  2. user-providerパラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。

  3. GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてダッシュボードを利用できます。ユーザー名とパスワードはそれぞれgreptime_usergreptime_pwdです。

コネクターの作成

このセクションでは、SinkをGreptimeDBサーバーに接続するためのコネクター作成方法を示します。

以下の手順は、EMQXとGreptimeDBをローカルマシンで実行していることを前提としています。リモート環境で実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでGreptimeDBを選択し、Nextをクリックします。
  4. Configurationステップで以下の情報を設定します:
    • コネクター名を入力します。大文字・小文字の英数字の組み合わせで、例:my_greptimedb
    • Server Host127.0.0.1:4001を入力します。GreptimeCloudに接続する場合はポートを443にして{url}:443と入力してください。
    • Databasepublicを入力します。GreptimeCloudに接続する場合はサービス名を入力します。
    • UsernamePasswordgreptime_usergreptime_pwdを入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudの場合はサービスのユーザー名とパスワードを入力してください。
  5. Advanced Settingsを展開し、必要に応じて詳細設定を行います(任意)。詳細は高度な設定を参照してください。
  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがGreptimeDBサーバーに接続できるかテストできます。
  7. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてGreptimeDB Sinkを用いたルール作成に進むことができます。詳細はGreptimeDB Sinkを用いたルール作成を参照してください。

GreptimeDB Sinkを用いたルール作成

このセクションでは、EMQXでソースMQTTトピックt/#からのメッセージを処理し、設定済みのSinkを通じてGreptimeDBに送信するルールの作成方法を示します。

  1. EMQXダッシュボードでIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力し、SQL Editorでルールを設定します。ここではトピックt/#のMQTTメッセージをGreptimeDBに保存するため、以下のSQL文を使用します。

    注意:独自のSQL文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT部分に含めてください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。

    • Add Actionボタンをクリックし、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをGreptimeDBに送信します。
  4. Type of ActionドロップダウンリストからGreptimeDBを選択します。ActionドロップダウンはデフォルトのCreate Actionのままにします。既にSinkを作成していれば選択可能です。本例では新規Sinkを作成します。

  5. Sinkの名前を入力します。名前は大文字・小文字の英数字の組み合わせにしてください。

  6. Connectorドロップダウンから先に作成したmy_greptimedbを選択します。新規コネクターはドロップダウン横のボタンから作成可能です。設定パラメータはコネクターの作成を参照してください。

  7. Write Syntaxを設定します。データポイントのmeasurement、タグ、フィールド、タイムスタンプを指定するテキストベースのフォーマットをInfluxDBのLine Protocol仕様に準拠して指定します。GreptimeDBはInfluxDB互換のデータフォーマットをサポートしています。

    TIP

    • GreptimeDBに符号付き整数型の値を書き込む場合は、プレースホルダーの後にiを付けます。例:${payload.int}i
    • 符号なし整数型の場合はuを付けます。例:${payload.int}u
  8. Time Precisionを指定します。デフォルトはmillisecondです。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。

  10. 高度な設定(任意):同期(sync)または非同期(async)クエリモードの選択、キューやバッチの有効化設定が可能です。詳細はSinkの機能を参照してください。

  11. Createをクリックする前に、Test ConnectivityをクリックしてSinkがGreptimeDBサーバーに接続できるかテストできます。

  12. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  13. Create Ruleページに戻り、設定内容を確認します。Createボタンをクリックしてルールを生成します。

これで、GreptimeDB Sinkを通じてデータを転送するルールの作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると、新しいGreptimeDB Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#のメッセージがルールmy_ruleによって解析され、GreptimeDBに送信・保存されていることが確認できます。

ルールのテスト

MQTTXを使用してトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello GreptimeDB" }'

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

GreptimeDBダッシュボードでSQLを用いて、メッセージがGreptimeDBに書き込まれていることを確認できます。

高度な設定

このセクションでは、コネクターのパフォーマンス最適化や特定シナリオに応じたカスタマイズに役立つ高度な設定オプションを説明します。コネクター作成時にAdvanced Settingsを展開し、ビジネスニーズに応じて以下の設定を行えます。

フィールド名説明デフォルト値
Time-To-Live (TTL)GreptimeDBで自動作成されるテーブルの有効期限設定。-
Custom Timestamp Column Name定義すると、クエリ時に表示されるカスタムタイムスタンプカラム名を指定。-
Start Timeoutコネクターが自動起動したリソースの正常状態到達を待つ最大秒数。リソース作成要求に応答する前に、接続先リソースが完全に稼働しデータ処理準備が整っていることを保証するための設定。5
Health Check Intervalコネクターの稼働状態をチェックする間隔。15
Health Check TimeoutGreptimeDBサーバーとの接続に対する自動ヘルスチェックのタイムアウト時間。60