Skip to content

InfluxDBへのMQTTデータ取り込み

InfluxDBは時系列データの保存と分析に特化したデータベースです。高いデータスループット性能と安定した動作により、IoT分野での活用に非常に適しています。EMQXは現在、InfluxDB Cloud、InfluxDB OSS、InfluxDB Enterpriseの主要バージョンとの接続をサポートしています。

本ページでは、EMQXとInfluxDB間のデータ連携について、作成方法や検証手順を含めて包括的に解説します。

動作の仕組み

InfluxDBデータ連携はEMQXに標準搭載された機能であり、EMQXのリアルタイムデータ取得・転送機能とInfluxDBのデータ保存・分析機能を組み合わせています。内蔵のルールエンジンコンポーネントにより、EMQXからInfluxDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQXはルールエンジンとSinkを通じてデバイスデータをInfluxDBに転送し保存・分析を行います。InfluxDBは分析結果をレポートやグラフとして生成し、InfluxDBの可視化ツールでユーザーに提供します。

以下の図は、エネルギー貯蔵シナリオにおけるEMQXとInfluxDBの典型的なデータ連携アーキテクチャを示しています。

MQTT to InfluxDB

EMQXとInfluxDBは、エネルギー消費データをリアルタイムで効率的に収集・分析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイス接続、メッセージ転送、データルーティングを担当するIoTプラットフォームとして機能し、InfluxDBがデータ保存・分析プラットフォームとして役割を担います。ワークフローは以下の通りです:

  1. メッセージのパブリッシュと受信:エネルギー貯蔵機器や産業用IoT機器はMQTTプロトコルを用いてEMQXに接続し、電力消費量、入出力電力などのデータを定期的にパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:内蔵のルールエンジンを使い、特定のトピックに基づいてメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールにマッチしてデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  3. InfluxDBへのデータ取り込み:ルールエンジンで定義されたルールにより、InfluxDBへの書き込み操作がトリガーされます。InfluxDB SinkはLine Protocolテンプレートを提供し、メッセージの特定フィールドをInfluxDBの対応するmeasurementやfieldに柔軟にマッピングできます。

エネルギー消費データがInfluxDBに書き込まれた後は、Line Protocolを活用して以下のような分析が可能です:

  • Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データを基にグラフを生成する。
  • 業務システムと連携してエネルギー貯蔵機器の状態監視やアラートを実施する。

特長とメリット

InfluxDBデータ連携は以下の特長と利点を提供します:

  • 効率的なデータ処理:EMQXは大量のIoTデバイス接続とメッセージスループットを処理可能であり、InfluxDBはデータ書き込み・保存・クエリに優れた性能を発揮し、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
  • メッセージ変換:EMQXのルールを通じてメッセージを多様に処理・変換してからInfluxDBに書き込めます。
  • スケーラビリティ:EMQXとInfluxDBはどちらもクラスター拡張に対応し、ビジネスの成長に応じて柔軟に水平拡張可能です。
  • 豊富なクエリ機能:InfluxDBは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと分析を実現し、IoTデータから価値ある洞察を正確に抽出します。
  • 効率的なストレージ:InfluxDBは高圧縮率のエンコード方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズでき、不要なデータによるストレージ占有を防止します。

はじめる前に

このセクションでは、InfluxDBデータ連携を作成する前に必要な準備、特にInfluxDBのインストールとセットアップについて説明します。

前提条件

InfluxDBのインストールとセットアップ

  1. DockerでInfluxDBをインストールし、Dockerイメージを起動します。
bash
# InfluxDBのDockerイメージを起動
docker run --name influxdb -p 8086:8086 influxdb:2.5.1
  1. InfluxDBが起動したら、ブラウザで http://localhost:8086 にアクセスし、UsernamePasswordOrganization NameBucket Nameを設定します。
  2. InfluxDBのUIで、Load Data -> API Token をクリックし、指示に従って全権限トークンを作成します。

コネクターの作成

このセクションでは、SinkをInfluxDBサーバーに接続するためのコネクター作成方法を説明します。

以下の手順はEMQXとInfluxDBをローカルマシンで実行している場合を想定しています。リモート環境の場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでInfluxDBを選択し、Nextをクリックします。
  4. Configurationステップで以下を設定します:
    • コネクター名を入力します。英数字の組み合わせで、例:my_influxdb
    • Version of InfluxDBを必要に応じて選択(デフォルトはV2
    • InfluxDBサーバー接続情報を入力:
      • Server Host127.0.0.1:8086を入力。InfluxDB Cloudを使う場合はポート443を指定し、{url}:443と入力してEnable TLSを有効にします。
      • InfluxDBのインストールとセットアップに従い、TokenOrganizationBucketを設定。InfluxDB v1を選択した場合は、DatabaseUsernamePasswordを設定してください。
    • TLSを有効にするかどうかを決定します。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
  5. Createをクリックする前に、Test ConnectivityでInfluxDBサーバーへの接続テストが可能です。
  6. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップでBack to Connector ListまたはCreate Ruleを選択可能です。続けてルールとSinkを作成し、InfluxDBに転送するデータを指定できます。詳細はInfluxDB Sink付きルールの作成を参照してください。

InfluxDB Sink付きルールの作成

このセクションでは、EMQXでMQTTトピック t/# のメッセージを処理し、設定済みのSinkを通じてInfluxDBに送信するルールの作成方法を説明します。

  1. EMQXダッシュボードにアクセスし、左メニューのIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルール作成ページで、ルールIDにmy_ruleを入力します。

  4. SQL Editorにルールを設定します。例えば、トピック t/# のMQTTメッセージをInfluxDBに保存したい場合、以下のSQL文を使用します。

    TIP

    独自のSQL文を指定する場合は、後で設定するSinkのデータ形式に含まれるすべての変数がSELECT句に含まれていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    注:初心者の方はSQL ExamplesEnable Testを使ってSQLルールの学習とテストが可能です。

    • Add Actionボタンをクリックし、ルールがトリガーするアクションを定義します。このアクションにより、EMQXはルールで処理したデータをInfluxDBに送信します。
  5. Type of ActionのドロップダウンからInfluxDBを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能ですが、ここでは新規Sinkを作成します。

  6. Sink名を入力します。英数字の組み合わせで指定してください。

  7. Connectorのドロップダウンから先に作成したmy_influxdbを選択します。新規作成も可能です。設定パラメータはコネクターの作成を参照してください。

  8. Time Precisionを指定します。デフォルトはmillisecondです。

  9. Data FormatJSONまたはLine Protocolから選択し、InfluxDBへのデータ解析・書き込み方法を指定します。

    • JSON形式の場合、MeasurementTimestampFieldsTagsの解析方法を定義します。すべてのキーは変数やプレースホルダーを利用可能で、InfluxDB line protocolに準拠して設定できます。FieldsはCSVファイルによる一括設定も可能です。詳細は一括設定を参照してください。
    • Line Protocol形式の場合、テキストベースでmeasurement、tag set、field set、timestampを指定し、InfluxDB line protocolの構文に従ったプレースホルダーを利用できます。

    TIP

    • InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後にiを付けます。例:${payload.int}i。詳細はInfluxDB 1.8で整数値を書き込むを参照してください。
    • 符号なし整数型の値を書き込む場合は、プレースホルダーの後にuを付けます。例:${payload.int}u。詳細は同上リンクを参照してください。
  10. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。プライマリSinkがメッセージ処理に失敗した場合にこれらがトリガーされます。詳細はフォールバックアクションを参照してください。

  11. 詳細設定(任意)詳細設定を参照してください。

  12. Createをクリックする前に、Test ConnectivityでSinkがInfluxDBサーバーに接続可能かテストできます。

  13. CreateをクリックしてSink作成を完了します。ルール作成ページのAction Outputsタブに新規Sinkが表示されます。

  14. ルール作成ページで設定内容を確認し、Createをクリックしてルールを生成します。

これでルールが作成され、Ruleページに新規ルールが表示されます。**Actions(Sink)**タブをクリックすると、新規InfluxDB Sinkが確認できます。

また、Integration -> Flow Designerでトポロジーを確認できます。トピック t/# のメッセージがルールmy_ruleで解析され、InfluxDBに送信・保存されていることがわかります。

一括設定

InfluxDBでは1つのデータエントリに数百のフィールドが含まれることが多く、データ形式の設定が複雑になりがちです。これに対応するため、EMQXはフィールドの一括設定機能を提供しています。

JSON形式でデータ形式を設定する際、CSVファイルからフィールドのキー・バリューを一括インポートできます。

  1. FieldsテーブルのBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。

  2. 指示に従い、一括設定テンプレートファイルをダウンロードし、テンプレートにフィールドのキー・バリューを記入します。デフォルトのテンプレート内容は以下の通りです:

    FieldValue備考(任意)
    temp${payload.temp}
    hum${payload.hum}
    precip${payload.precip}iフィールド値にiを付けてInfluxDBに整数として保存する指示
    • Field:フィールドキー。定数または${var}形式のプレースホルダーをサポート。
    • Value:フィールド値。定数またはプレースホルダーをサポートし、line protocolに従い型識別子を付加可能。
    • 備考:CSV内のメモ用で、EMQXへのインポートには含まれません。

    CSVファイルの行数は2048行を超えないようにしてください。

  3. 記入したテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックし、一括設定を完了します。

  4. インポート後、Fields設定テーブルでキー・バリューをさらに調整可能です。

ルールのテスト

MQTTクライアントMQTTXを使い、トピック t/1 にメッセージを送信してオンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello InfluxDB" }'

Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージが1件ずつあるはずです。

InfluxDBのUIのData Explorerウィンドウで、メッセージがInfluxDBに書き込まれていることを確認できます。

詳細設定

このセクションでは、InfluxDBコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードでコネクターやSinkを設定する際、Advanced Settingsに進み、以下のパラメータをニーズに合わせて調整できます。

項目説明推奨値
Start Timeoutコネクターが自動起動したリソース(例:InfluxDBのデータベースインスタンス)が正常状態になるまで待機する最大秒数です。リソースが準備完了するまで処理を進めないようにするための設定です。5
Buffer Pool SizeEMQXとInfluxDB間の送信型ブリッジでデータフローを管理するバッファワーカープロセス数を指定します。これらのプロセスはデータを一時的に保持し、送信前に処理します。IngressのみのSinkでは0に設定可能です。4
Request TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)です。TTLを超えたリクエストやInfluxDBからの応答・アックが遅延したリクエストは期限切れとみなされます。45
Health Check IntervalSinkがInfluxDB接続のヘルスチェックを自動的に行う間隔(秒)です。15
Max Buffer Queue Size各バッファワーカーがInfluxDB Sinkでバッファリング可能な最大バイト数です。データフローの効率化に関わるため、システム性能や転送要件に応じて調整してください。1
Max Batch SizeEMQXからInfluxDBに一度に送信するデータバッチの最大サイズです。サイズ調整によりデータ転送効率を最適化できます。1に設定すると個別に送信されます。100
Query Modeメッセージ送信の最適化のため、asynchronous(非同期)またはsynchronous(同期)を選択可能です。非同期モードではInfluxDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがInfluxDB到達前にメッセージを受け取る可能性があります。Async
Inflight Window送信済みだが応答・アックをまだ受け取っていない「インフライトクエリ」の最大数を制御します。Query Modeasyncの場合、同一MQTTクライアントのメッセージを厳密に順序処理したい場合は1に設定してください。100

参考情報

以下のリンクからさらに詳しく学べます:

ブログ

1時間で構築するEMQX + InfluxDB + Grafana IoTデータ可視化ソリューション

時系列IoTアプリケーション向けMQTTデータのInfluxDB統合

MQTTパフォーマンスベンチマークテスト:EMQX-InfluxDB統合