Skip to content

GreptimeDBへのMQTTデータ取り込み

GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上で動作するよう設計されており、ユーザーはその弾力性と汎用ストレージの利点を享受できます。EMQXプラットフォームは、主流のGreptimeDB、GreptimeCloud、GreptimeDB Enterpriseとの接続をサポートしています。

本ページでは、EMQXプラットフォームとGreptimeDB間のデータ統合について、実践的な手順を交えて包括的に紹介します。

動作概要

GreptimeDBデータ統合はEMQXプラットフォームに組み込まれた機能であり、EMQXプラットフォームのリアルタイムデータ取得・送信能力とGreptimeDBのデータ保存・分析能力を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからGreptimeDBへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。ワークフローは以下の通りです。

以下の図は、EMQXプラットフォームとGreptimeDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Platform-Integration GreptimeDB

  1. メッセージのパブリッシュと受信:産業機器はMQTTプロトコルを用いてEMQXプラットフォームに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ライン識別子やエネルギー消費値が含まれます。EMQXプラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づいて特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータフォーマットの変換、特定情報のフィルタリング、コンテキスト情報の付加などが含まれます。
  3. GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールがトリガーとなり、メッセージをGreptimeDBに書き込む操作が実行されます。GreptimeDBアクションはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むデータフォーマット定義が可能です。

エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟にデータ分析が可能です。例えば:

  • Grafanaなどの可視化ツールに接続し、エネルギー消費データのグラフを生成・表示する。
  • ERPなどのアプリケーションシステムに接続し、生産分析や生産計画の調整を行う。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を実施し、データ駆動型のエネルギー管理を促進する。

特長と利点

GreptimeDBとのデータ統合は、以下の特長と利点をビジネスにもたらします。

  • 使いやすさ:EMQXプラットフォームとGreptimeDBはどちらも開発者に優しい設計です。EMQXプラットフォームは標準のMQTTプロトコルに加え、多様な認証、認可、クラスタリング機能を標準で提供します。GreptimeDBは時系列テーブルやスキーマレスアーキテクチャなどユーザーフレンドリーな設計を備えています。両者の統合により、ビジネス統合と開発のスピードアップが期待できます。
  • 効率的なデータ処理:EMQXプラットフォームは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに負担をかけずに満たします。
  • メッセージ変換:メッセージはEMQXプラットフォームのルール内で豊富な処理・変換を経てからGreptimeDBに書き込まれます。
  • 効率的なストレージとスケーラビリティ:EMQXプラットフォームとGreptimeDBはどちらもクラスターのスケールアウト機能を備え、ビジネスの成長に応じて柔軟に水平スケーリングが可能です。
  • 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリ・分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を抽出できます。

はじめる前に

このセクションでは、GreptimeDBデータ統合の作成を始める前に必要な準備、特にGreptimeDBサーバーのインストール方法について説明します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

GreptimeDBサーバーのインストール

  1. Dockerを使ってGreptimeDBをインストールし、Dockerイメージを起動します。

    bash
    # GreptimeDBのDockerイメージを起動する
    docker run -p 4000-4004:4000-4004 \
    -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \
    --name greptime --rm \
    greptime/greptimedb standalone start \
    --http-addr 0.0.0.0:4000 \
    --rpc-addr 0.0.0.0:4001 \
    --mysql-addr 0.0.0.0:4002 \
    --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
  2. user-providerパラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。

  3. GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれgreptime_usergreptime_pwdです。

コネクターの作成

データ統合ルールを作成する前に、GreptimeDBサーバーにアクセスするためのGreptimeDBコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるGreptimeDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリの下のGreptimeDBを選択します。

  2. コネクター名はシステムが自動生成します。

  3. 接続情報を入力します:

    • サーバーホスト{host}:4001を入力します。GreptimeCloudに接続する場合はポートを443にして{url}:443を入力してください。
    • データベースpublicを入力します。GreptimeCloudに接続する場合はサービス名を入力します。
    • ユーザー名パスワードgreptime_usergreptime_pwdを入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudの場合はサービスのユーザー名とパスワードを入力してください。
    • TLSを有効化:暗号化接続を確立したい場合はトグルスイッチをオンにします。
    • ビジネス要件に応じて詳細設定を行います(任意)。
  4. テストボタンをクリックします。GreptimeDBサービスにアクセス可能であれば、connector availableのメッセージが返されます。

  5. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

このセクションでは、EMQXプラットフォームコンソールを使ってGreptimeDBルールを作成し、ルールにアクションを追加する方法を示します。

  1. ルールエリアの新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 利用したい機能に基づいてSQLエディターでルールを設定します。ここではクライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーすることを目標とします。SQLは以下のように記述します。

    sql
     SELECT
      timestamp, clientid, payload
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方はSQL例をクリックし、テストを有効化してSQLルールの学習とテストを行うことを推奨します。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. 書き込み構文を設定します。これはテキストベースのフォーマットで、データポイントの計測名、タグ、フィールド、タイムスタンプを指定し、InfluxDB line protocolの構文に準拠したプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータフォーマットをサポートしています。
    本チュートリアルの例は以下の構文です。

    sql
     myMeasurement,tag1=${clientid} fieldKey=${payload}

    TIP

    • GreptimeDBに符号付き整数型の値を書き込む場合、プレースホルダーの後にiを型識別子として付加します。例:${payload.int}i
    • 符号なし整数型の場合はuを付加します。例:${payload.int}u
  6. 時間精度を指定します。デフォルトはmillisecondです。

  7. 詳細設定を展開し、同期/非同期モード、キューやバッチ処理などのパラメータを必要に応じて設定します(任意)。

  8. 確定ボタンをクリックしてルール作成を完了します。

  9. 新規ルール作成成功ポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

温度・湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他のクライアントでも構いません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. ルールの稼働状況を確認し、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあることを確認します。

  3. GreptimeDBダッシュボードでSQLを使い、メッセージがGreptimeDBに書き込まれているかを確認できます。