Skip to content

GreptimeDBへのMQTTデータ取り込み

GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの恩恵を受けられます。EMQX Cloudは現在、GreptimeDBの主流バージョンであるGreptimeCloudやGreptimeDB Enterpriseとの接続をサポートしています。

本ページでは、EMQX CloudとGreptimeDB間のデータ統合について、作成方法と検証方法を含めて包括的に解説します。

動作概要

GreptimeDBデータ統合は、EMQX Cloudに組み込まれた機能であり、EMQX Cloudのリアルタイムデータキャプチャおよび送信機能と、GreptimeDBのデータ保存・分析機能を組み合わせたものです。組み込みのルールエンジンコンポーネントにより、EMQX CloudからGreptimeDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。ワークフローは以下の通りです。

以下の図は、EMQX CloudとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Cloud-Integration GreptimeDB

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQX Cloudに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着すると、ルールエンジンを通過し、対応するルールと照合してメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などが含まれます。
  3. GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーされると、メッセージをGreptimeDBに書き込む操作が実行されます。GreptimeDBアクションはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むためのデータ形式を定義できます。

エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフ化・表示を行う。
  • ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整に活用する。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を促進する。

特長とメリット

GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。

  • 使いやすさ:EMQX CloudとGreptimeDBは共に開発者に優しい設計です。EMQX Cloudは標準のMQTTプロトコルに加え、多様な認証・認可・クラスタリング機能を提供します。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を持ちます。両者の統合により、ビジネス統合や開発のスピードを加速できます。
  • 効率的なデータ処理:EMQX Cloudは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータの書き込み、保存、クエリに優れ、IoTシナリオのデータ処理要件をシステム負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQX Cloudのルール内で多彩な処理や変換を経てからGreptimeDBに書き込まれます。
  • 効率的なストレージとスケーラビリティ:EMQX CloudとGreptimeDBは共にクラスターのスケールアウト機能を持ち、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
  • 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリと分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻なインサイトを抽出できます。

はじめる前に

このセクションでは、GreptimeDBデータ統合の作成に先立ち必要な準備について説明します。GreptimeDBサーバーのインストール方法も含みます。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

GreptimeDBサーバーのインストール

  1. Docker経由でGreptimeDBをインストールし、Dockerイメージを起動します。

    bash
    # GreptimeDB Dockerイメージの起動
    docker run -p 4000-4004:4000-4004 \
    -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \
    --name greptime --rm \
    greptime/greptimedb standalone start \
    --http-addr 0.0.0.0:4000 \
    --rpc-addr 0.0.0.0:4001 \
    --mysql-addr 0.0.0.0:4002 \
    --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
  2. user-providerパラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。

  3. GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれgreptime_usergreptime_pwdです。

コネクターの作成

データ統合ルールを作成する前に、GreptimeDBサーバーへアクセスするためのGreptimeDBコネクターを作成する必要があります。

  1. デプロイメントに移動し、左ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるGreptimeDBを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、同じくデータ永続化カテゴリのGreptimeDBを選択します。

  2. コネクター名はシステムが自動生成します。

  3. 接続情報を入力します:

    • サーバーホスト{host}:4001を入力します。GreptimeCloudに接続する場合はポートを443にして{url}:443を入力してください。
    • データベースpublicを入力します。GreptimeCloudに接続する場合はサービス名を入力してください。
    • ユーザー名パスワードgreptime_usergreptime_pwdを入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudに接続する場合はサービスのユーザー名とパスワードを入力してください。
    • TLSを有効にする:暗号化接続を確立したい場合はトグルスイッチをオンにします。
    • ビジネス要件に応じて詳細設定を行います(任意)。
  4. テストボタンをクリックします。GreptimeDBサービスにアクセス可能であれば、コネクター利用可能のメッセージが表示されます。

  5. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

このセクションでは、GreptimeDBルールの作成とEMQX Cloudコンソールを通じたルールへのアクション追加方法を示します。

  1. ルールエリアの新規ルールをクリックするか、先ほど作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づき、SQLエディターでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度・湿度メッセージを送信した際にエンジンをトリガーすることを目標とします。SQLは以下のように記述します。

    sql
     SELECT
      timestamp, clientid, payload
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方はSQL例をクリックし、Try It OutでSQLルールの学習とテストを行うことを推奨します。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンボックスから先ほど作成したコネクターを選択します。

  5. 書き込み構文を設定します。これは、データポイントの計測名、タグ、フィールド、タイムスタンプをテキストベースで指定する形式で、InfluxDBのLine Protocolの構文に準拠しプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータ形式をサポートしています。
    本チュートリアルの例は以下の構文です。

    sql
     myMeasurement,tag1=${clientid} fieldKey=${payload}

    TIP

    • GreptimeDBに符号付き整数型の値を書き込む場合は、プレースホルダーの後にiをタイプ識別子として付加します。例:${payload.int}i
    • 符号なし整数型の値を書き込む場合は、uを付加します。例:${payload.int}u
  6. 時間精度を指定します。デフォルトはmillisecondです。

  7. 詳細設定を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します(任意)。

  8. 確定ボタンをクリックしてルール作成を完了します。

  9. ルール作成成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

温度・湿度データの送信シミュレーションにはMQTTXの使用を推奨しますが、他の任意のクライアントでも可能です。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. ルールの稼働状況を確認し、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。

  3. GreptimeDBダッシュボードでSQLを用いて、メッセージがGreptimeDBに書き込まれているかを確認できます。