Skip to content

MQTTデータをInfluxDBに取り込む

InfluxDBは時系列データの保存と解析に特化したデータベースです。高いデータスループット性能と安定した動作により、IoT分野での活用に非常に適しています。EMQX Platformは、InfluxDB Cloud、InfluxDB OSS、InfluxDB Enterpriseの主要なバージョンへの接続をサポートしています。

本ページでは、EMQX PlatformとInfluxDB間のデータ統合について、作成方法や動作確認を含めて詳しく解説します。

動作の仕組み

InfluxDBデータ統合は、EMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータ取得・転送機能とInfluxDBのデータ保存・解析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX PlatformからInfluxDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQX PlatformはルールエンジンとSinkを介してデバイスデータをInfluxDBに転送し保存・解析を行います。InfluxDBは解析結果としてレポートやグラフを生成し、InfluxDBの可視化ツールを通じてユーザーに提供します。

以下の図は、エネルギー貯蔵シナリオにおけるEMQXとInfluxDBの典型的なデータ統合アーキテクチャを示しています。

EMQX Platform InfluxDB Data Integration

EMQX PlatformとInfluxDBは、エネルギー消費データをリアルタイムに効率的に収集・解析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQX PlatformがIoTプラットフォームとしてデバイス接続、メッセージ送受信、データルーティングを担当し、InfluxDBがデータ保存と解析を担います。ワークフローは以下の通りです。

  1. メッセージのパブリッシュと受信:エネルギー貯蔵機器や産業用IoT機器はMQTTプロトコルでEMQX Platformに接続し、電力消費量や入出力電力などのデータを定期的にパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:組み込みのルールエンジンを使い、特定のトピックに基づくメッセージを処理します。メッセージはルールエンジンを通過し、対応するルールにマッチすると、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
  3. InfluxDBへのデータ取り込み:ルールエンジンで定義したルールがトリガーとなり、メッセージをInfluxDBに書き込む操作が実行されます。InfluxDB SinkはLine Protocolテンプレートを提供し、メッセージの特定フィールドをInfluxDBのメジャメントやフィールドに柔軟にマッピング可能です。

エネルギー消費データがInfluxDBに書き込まれた後は、Line Protocolを活用して以下のような解析が可能です。

  • Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データのグラフを生成。
  • 業務システムに接続し、エネルギー貯蔵機器の状態監視やアラート発報を実施。

特長とメリット

InfluxDBデータ統合は以下の特長と利点を提供します。

  • 効率的なデータ処理:EMQX Platformは大量のIoTデバイス接続とメッセージスループットを処理可能であり、InfluxDBは高速なデータ書き込み、保存、クエリ性能を持つため、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
  • メッセージ変換:EMQX Platformのルールを用いて、InfluxDBに書き込む前にメッセージを多様に処理・変換可能です。
  • スケーラビリティ:EMQX PlatformとInfluxDBの両方がクラスター拡張に対応し、ビジネスの成長に応じて柔軟に水平拡張できます。
  • 豊富なクエリ機能:InfluxDBは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと解析を実現し、IoTデータから価値あるインサイトを抽出します。
  • 効率的なストレージ:InfluxDBは高圧縮率のエンコード方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズ可能で、不要なデータによるストレージ占有を防止します。

はじめる前に

本節では、EMQX PlatformでInfluxDBデータ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

InfluxDBのインストールと設定

DockerによるInfluxDBのインストール

  1. Dockerを使ってInfluxDBをインストールし、コンテナを起動します。
bash
# InfluxDBのDockerイメージを起動
docker run --name influxdb -p 8086:8086 influxdb:2.5.1
  1. InfluxDBが起動したら、サーバーの8086ポートにブラウザでアクセスし、UsernamePasswordOrganization NameBucket Nameを設定します。

  2. InfluxDB UIでLoad Data -> API Tokenをクリックし、all-accessトークンの作成手順に従います。

InfluxDB Cloudでのサービス作成

  1. InfluxDB Cloudにログインします。

  2. InfluxDB UIでUsernamePasswordOrganization NameBucket Nameを設定します。

  3. InfluxDB UIでLoad Data -> API Tokenをクリックし、all-accessトークンの作成手順に従います。

InfluxDBコネクターの作成

データ統合ルールを作成する前に、まずInfluxDBサーバーにアクセスするためのInfluxDBコネクターを作成します。

  1. デプロイメントに移動し、左メニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、Data Forwardカテゴリの下にあるInfluxDBを選択します。すでにコネクターを作成済みの場合は、New Connectorを選択し、続けてData ForwardカテゴリのInfluxDBを選択します。

  3. Connector Name:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • 必要に応じてVersion of InfluxDBを選択(デフォルトはV2)。
    • Server Host:サーバーのIPアドレスとポート。InfluxDB Cloudを利用する場合はポート443を指定し、{url}:443の形式で入力し、Enable TLSを有効化してTLS接続を行います。
    • InfluxDBのインストールと設定に従い、TokenOrganizationBucketを設定します。InfluxDB v1を選択した場合は、DatabaseUsernamePasswordの設定も必要です。
  5. Testボタンをクリックし、InfluxDBサービスにアクセスできるか確認します。成功すると成功メッセージが表示されます。

  6. Newボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをInfluxDBに転送するアクションを追加するルールを作成します。

  1. ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。

  2. 使用する機能に基づいてSQL Editorでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度・湿度メッセージを送信した際にエンジンをトリガーするSQLの例を示します。

    sql
      SELECT
        timestamp,
        payload.location as location,
        payload.temp as temp,
        payload.hum as hum
      FROM "temp_hum/emqx"

    TIP

    初心者の方はSQL ExamplesEnable Testをクリックして、SQLルールの学習とテストが可能です。

  3. Nextをクリックしてアクションを追加します。

  4. Connectorドロップダウンから先ほど作成したコネクターを選択します。

  5. Time Precisionを指定します。デフォルトはmillisecondです。

  6. Data FormatJSONまたはLine Protocolから選択し、InfluxDBへのデータ解析・書き込み方法を指定します。

    • JSON形式の場合、MeasurementTimestampFieldsTagsの解析方法を定義します。すべてのキー値は変数やプレースホルダーを使用可能で、InfluxDB line protocolに準拠して設定できます。FieldsはCSVファイルをインポートして一括設定も可能です。詳細はバッチ設定を参照してください。

    • Line Protocol形式の場合、テキストベースのフォーマットで、メジャメント、タグセット、フィールドセット、タイムスタンプを指定し、プレースホルダーもサポートします。詳細はInfluxDB 2.3 Line ProtocolおよびInfluxDB 1.8 Line Protocolを参照してください。

      例:

      bash
       temp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}

      TIP

      • InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後にiを付けます。例:${payload.int}i。詳細はInfluxDB 1.8整数値書き込みを参照してください。
      • 符号なし整数型の値を書き込む場合は、uを付けます。例:${payload.int}u。詳細は同上を参照してください。
  7. 高度な設定(任意):高度な設定を参照してください。

  8. Confirmボタンをクリックしてルール作成を完了します。

  9. Successful new ruleポップアップでBack to Rulesをクリックし、データ統合設定の一連の作業が完了します。

バッチ設定

InfluxDBでは1つのデータエントリに数百のフィールドが含まれることが多く、データ形式の設定が複雑になる場合があります。これを解決するため、EMQX Platformはフィールドのバッチ設定機能を提供しています。

JSON形式でデータ形式を設定する際、CSVファイルからフィールドのキー・バリューを一括インポートできます。

  1. FieldsテーブルのImportボタンをクリックし、Import Batch Settingポップアップを開きます。

  2. 指示に従い、まずバッチ設定テンプレートファイルをダウンロードし、テンプレートにフィールドのキー・バリューを記入します。テンプレートのデフォルト内容は以下の通りです。

    FieldValue備考(任意)
    temp${payload.temp}
    hum${payload.hum}
    precip${payload.precip}iフィールド値にiを付けてInfluxDBに整数として保存する指示。
    • Field:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • Value:フィールド値。定数またはプレースホルダーをサポートし、line protocolに従い型識別子を付加可能。
    • 備考:CSVファイル内の注釈用で、EMQXへのインポート対象外。

    CSVファイルのバッチ設定データは2048行以内にしてください。

  3. 記入済みテンプレートファイルを保存し、Import Batch Settingポップアップにアップロード後、Importをクリックしてバッチ設定を完了します。

  4. インポート後、Fields設定テーブルでキー・バリューの調整も可能です。

ルールのテスト

温度・湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": 27.5,
        "hum": 41.8,
        "location": "Prague"
      }
  2. InfluxDB UIのData Explorerウィンドウで、メッセージがInfluxDBに書き込まれているか確認します。

  3. InfluxDB V1を使用している場合は、InfluxDBコンテナに入り、データを確認します。

    bash
      $ docker exec -it influxdb influx
      $ use db
    bash
      > select * from "temp_hum"
      name: temp_hum
      time                hum  location temp
      ----                ---  -------- ----
      1711093437420000000 41.8 Prague   27.5
      >
  4. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報および該当ルール下のすべてのアクション統計が表示されます。