Skip to content

GreptimeDBへのMQTTデータ取り込み

GreptimeDBは、スケーラビリティ、分析機能、効率性に特化したオープンソースの時系列データベースです。クラウド時代のインフラ上での動作を想定して設計されており、ユーザーはその弾力性と汎用ストレージの恩恵を受けられます。EMQXは現在、GreptimeDBの主流バージョンであるGreptimeCloudやGreptimeDB Enterpriseとの接続をサポートしています。

本ページでは、EMQXとGreptimeDB間のデータ連携について包括的に紹介し、データ連携の作成および検証方法を実践的に解説します。

動作概要

GreptimeDBデータ連携はEMQXに組み込まれた機能で、EMQXのリアルタイムデータキャプチャと転送能力と、GreptimeDBのデータ保存・分析能力を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからGreptimeDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。ワークフローは以下の通りです。

以下の図は、EMQXとGreptimeDB間の典型的なデータ連携アーキテクチャを示しています。

EMQX Integration GreptimeDB

  1. メッセージのパブリッシュと受信:産業用デバイスはMQTTプロトコルを通じてEMQXに正常に接続し、定期的にエネルギー消費データをパブリッシュします。このデータには生産ラインの識別子やエネルギー消費値が含まれます。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールとマッチングしてメッセージデータを処理します。これにはデータ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報付加などが含まれます。
  3. GreptimeDBへのデータ取り込み:ルールエンジンで定義されたルールにより、メッセージをGreptimeDBに書き込む操作がトリガーされます。GreptimeDB SinkはLine Protocolテンプレートを提供し、特定のメッセージフィールドをGreptimeDBの対応するテーブルやカラムに柔軟に書き込むデータ形式を定義できます。

エネルギー消費データがGreptimeDBに書き込まれた後は、SQL文やPrometheusクエリ言語を用いて柔軟に分析が可能です。例えば:

  • Grafanaなどの可視化ツールに接続してチャートを生成し、エネルギー消費データを表示する。
  • ERPなどの業務システムに接続し、生産分析や生産計画の調整に活用する。
  • ビジネスシステムに接続し、リアルタイムのエネルギー使用分析を行い、データ駆動型のエネルギー管理を実現する。

特長とメリット

GreptimeDBとのデータ連携は、以下の特長と利点をビジネスにもたらします。

  • 使いやすさ:EMQXとGreptimeDBはともに開発者に優しい設計です。EMQXは標準のMQTTプロトコルに加え、多様な認証・認可・クラスタリング機能を提供します。GreptimeDBは時系列テーブルやスキーマレス構造などユーザーフレンドリーな設計を持ちます。両者の連携により、ビジネス統合と開発を加速できます。
  • 効率的なデータ処理:EMQXは多数のIoTデバイス接続とメッセージスループットを効率的に処理可能です。GreptimeDBはデータ書き込み、保存、クエリに優れており、IoTシナリオのデータ処理要件をシステムに負荷をかけずに満たします。
  • メッセージ変換:メッセージはEMQXルール内で豊富な処理・変換を経てからGreptimeDBに書き込まれます。
  • 効率的な保存とスケーラビリティ:EMQXとGreptimeDBはともにクラスタースケール機能を持ち、ビジネスの成長に応じて柔軟に水平スケールが可能です。
  • 高度なクエリ機能:GreptimeDBはタイムスタンプデータの効率的なクエリ・分析のために最適化された関数、演算子、インデックス技術を提供し、IoT時系列データから精緻な洞察を引き出せます。

はじめる前に

このセクションでは、GreptimeDBデータ連携の作成を始める前に必要な準備、特にGreptimeDBサーバーのインストール方法について説明します。

前提条件

GreptimeDBサーバーのインストール

  1. Dockerを使ってGreptimeDBをインストールし、Dockerイメージを起動します。

    bash
    # GreptimeDBのDockerイメージを起動する
    docker run -p 4000-4004:4000-4004 \
    -p 4242:4242 -v "$(pwd)/greptimedb:/tmp/greptimedb" \
    --name greptime --rm \
    greptime/greptimedb standalone start \
    --http-addr 0.0.0.0:4000 \
    --rpc-addr 0.0.0.0:4001 \
    --mysql-addr 0.0.0.0:4002 \
    --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd
  2. user-providerパラメータはGreptimeDBの認証を設定します。ファイルによる設定も可能です。詳細はドキュメントを参照してください。

  3. GreptimeDBが起動したら、http://localhost:4000/dashboardにアクセスしてGreptimeDBダッシュボードを利用できます。ユーザー名とパスワードはそれぞれgreptime_usergreptime_pwdです。

コネクターの作成

このセクションでは、SinkをGreptimeDBサーバーに接続するためのコネクター作成方法を説明します。

以下の手順はEMQXとGreptimeDBをローカルマシンで動作させていることを前提としています。リモート環境の場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectors をクリックします。
  2. ページ右上の Create をクリックします。
  3. Create Connector ページで GreptimeDB を選択し、Next をクリックします。
  4. Configuration ステップで以下を設定します:
    • コネクター名を入力します。英数字の組み合わせで、例:my_greptimedb
    • Server Host127.0.0.1:4001 と入力します。GreptimeCloudに接続する場合はポートを443にして {url}:443 と入力してください。
    • Databasepublic と入力します。GreptimeCloudの場合はサービス名を入力します。
    • UsernamePasswordgreptime_usergreptime_pwd を入力します(GreptimeDBサーバーのインストールで設定したもの)。GreptimeCloudの場合はサービスのユーザー名とパスワードを入力してください。
  5. Create をクリックする前に、Test Connectivity をクリックしてコネクターがGreptimeDBサーバーに接続できるかテストできます。
  6. ページ下部の Create ボタンをクリックしてコネクター作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックしてGreptimeDB Sinkを使ったルール作成に進めます。詳細はGreptimeDB Sinkを使ったルール作成を参照してください。

GreptimeDB Sinkを使ったルール作成

このセクションでは、EMQXでルールを作成し、ソースMQTTトピック t/# からのメッセージを処理して、設定済みのSinkを通じてGreptimeDBに送信する方法を説明します。

  1. EMQXダッシュボードに入り、Integration -> Rules をクリックします。

  2. ページ右上の Create をクリックします。

  3. ルールIDに my_rule を入力し、SQL Editor にルールを設定します。ここではトピック t/# のMQTTメッセージをGreptimeDBに保存するため、以下のSQL文を使用します。

    注意:独自のSQL文を指定する場合は、Sinkが必要とするすべてのフィールドを SELECT 部分に含めてください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方は SQL ExamplesEnable Test をクリックしてSQLルールの学習とテストができます。

    • Add Action ボタンをクリックして、ルールに紐づくアクションを定義します。このアクションにより、EMQXはルールで処理したデータをGreptimeDBに送信します。
  4. Type of Action ドロップダウンから GreptimeDB を選択します。Action はデフォルトの Create Action のままにします。既にSinkを作成している場合はそれを選択可能ですが、ここでは新規Sinkを作成します。

  5. Sinkの名前を入力します。英数字の組み合わせで指定してください。

  6. Connector ドロップダウンから先ほど作成した my_greptimedb を選択します。新規コネクターを作成する場合はドロップダウン横のボタンをクリックします。設定パラメータはコネクターの作成を参照してください。

  7. Write Syntax を設定します。これはテキストベースのフォーマットで、データポイントのmeasurement、tags、fields、timestampを指定し、InfluxDB line protocolの構文に準拠したプレースホルダーをサポートします。GreptimeDBはInfluxDB互換のデータフォーマットをサポートしています。

    TIP

    • GreptimeDBに符号付き整数型の値を書き込む場合は、プレースホルダーの後に i を付けます。例:${payload.int}i
    • 符号なし整数型の値を書く場合は、プレースホルダーの後に u を付けます。例:${payload.int}u
  8. Time Precision を指定します。デフォルトは millisecond です。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のために、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  10. 詳細設定(任意)sync または async クエリモードの選択、キューやバッチの有効化を設定します。詳細はSinkの機能を参照してください。

  11. Create をクリックする前に、Test Connectivity をクリックしてSinkがGreptimeDBサーバーに接続できるかテストできます。

  12. Create ボタンをクリックしてSinkの設定を完了します。新しいSinkが Action Outputs に追加されます。

  13. Create Rule ページに戻り、設定内容を確認して Create をクリックしルールを生成します。

これでGreptimeDB Sinkを通じたデータ転送ルールの作成が完了しました。Integration -> Rules ページで新規作成したルールを確認できます。Actions(Sink) タブをクリックすると新しいGreptimeDB Sinkが表示されます。

また、Integration -> Flow Designer をクリックするとトポロジーが表示され、トピック t/# のメッセージがルール my_rule により解析されGreptimeDBに送信・保存されている様子を確認できます。

ルールのテスト

MQTTXを使ってトピック t/1 にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello GreptimeDB" }'

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージが1件ずつあるはずです。

GreptimeDBダッシュボードでSQLを使い、メッセージがGreptimeDBに書き込まれていることを確認できます。