InfluxDBへのMQTTデータ取り込み
InfluxDBは時系列データの保存と分析に特化したデータベースです。高いデータスループット性能と安定した動作により、IoT分野での活用に非常に適しています。EMQXは現在、InfluxDB Cloud、InfluxDB OSS、InfluxDB Enterpriseの主要バージョンとの接続をサポートしています。
本ページでは、EMQXとInfluxDB間のデータ連携について、作成方法や検証手順を含めて包括的に解説します。
動作の仕組み
InfluxDBデータ連携はEMQXに標準搭載された機能であり、EMQXのリアルタイムデータ取得・転送機能とInfluxDBのデータ保存・分析機能を組み合わせています。内蔵のルールエンジンコンポーネントにより、EMQXからInfluxDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQXはルールエンジンとSinkを通じてデバイスデータをInfluxDBに転送し保存・分析を行います。InfluxDBは分析結果をレポートやグラフとして生成し、InfluxDBの可視化ツールでユーザーに提供します。
以下の図は、エネルギー貯蔵シナリオにおけるEMQXとInfluxDBの典型的なデータ連携アーキテクチャを示しています。
EMQXとInfluxDBは、エネルギー消費データをリアルタイムで効率的に収集・分析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQXがデバイス接続、メッセージ転送、データルーティングを担当するIoTプラットフォームとして機能し、InfluxDBがデータ保存・分析プラットフォームとして役割を担います。ワークフローは以下の通りです:
- メッセージのパブリッシュと受信:エネルギー貯蔵機器や産業用IoT機器はMQTTプロトコルを用いてEMQXに接続し、電力消費量、入出力電力などのデータを定期的にパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:内蔵のルールエンジンを使い、特定のトピックに基づいてメッセージを処理します。メッセージが到着するとルールエンジンを通過し、対応するルールにマッチしてデータ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
- InfluxDBへのデータ取り込み:ルールエンジンで定義されたルールにより、InfluxDBへの書き込み操作がトリガーされます。InfluxDB SinkはLine Protocolテンプレートを提供し、メッセージの特定フィールドをInfluxDBの対応するmeasurementやfieldに柔軟にマッピングできます。
エネルギー消費データがInfluxDBに書き込まれた後は、Line Protocolを活用して以下のような分析が可能です:
- Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データを基にグラフを生成する。
- 業務システムと連携してエネルギー貯蔵機器の状態監視やアラートを実施する。
特長とメリット
InfluxDBデータ連携は以下の特長と利点を提供します:
- 効率的なデータ処理:EMQXは大量のIoTデバイス接続とメッセージスループットを処理可能であり、InfluxDBはデータ書き込み・保存・クエリに優れた性能を発揮し、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
- メッセージ変換:EMQXのルールを通じてメッセージを多様に処理・変換してからInfluxDBに書き込めます。
- スケーラビリティ:EMQXとInfluxDBはどちらもクラスター拡張に対応し、ビジネスの成長に応じて柔軟に水平拡張可能です。
- 豊富なクエリ機能:InfluxDBは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと分析を実現し、IoTデータから価値ある洞察を正確に抽出します。
- 効率的なストレージ:InfluxDBは高圧縮率のエンコード方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズでき、不要なデータによるストレージ占有を防止します。
はじめる前に
このセクションでは、InfluxDBデータ連携を作成する前に必要な準備、特にInfluxDBのインストールとセットアップについて説明します。
前提条件
- EMQXがInfluxDBにデータを書き込む際に従うInfluxDB Line Protocolの知識
- EMQXデータ連携のルールの知識
- データ連携の知識
InfluxDBのインストールとセットアップ
- DockerでInfluxDBをインストールし、Dockerイメージを起動します。
# InfluxDBのDockerイメージを起動
docker run --name influxdb -p 8086:8086 influxdb:2.5.1
- InfluxDBが起動したら、ブラウザで http://localhost:8086 にアクセスし、Username、Password、Organization Name、Bucket Nameを設定します。
- InfluxDBのUIで、Load Data -> API Token をクリックし、指示に従って全権限トークンを作成します。
コネクターの作成
このセクションでは、SinkをInfluxDBサーバーに接続するためのコネクター作成方法を説明します。
以下の手順はEMQXとInfluxDBをローカルマシンで実行している場合を想定しています。リモート環境の場合は設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create ConnectorページでInfluxDBを選択し、Nextをクリックします。
- Configurationステップで以下を設定します:
- コネクター名を入力します。英数字の組み合わせで、例:
my_influxdb
- Version of InfluxDBを必要に応じて選択(デフォルトは
V2
) - InfluxDBサーバー接続情報を入力:
- Server Hostに
127.0.0.1:8086
を入力。InfluxDB Cloudを使う場合はポート443を指定し、{url}:443
と入力してEnable TLSを有効にします。 - InfluxDBのインストールとセットアップに従い、Token、Organization、Bucketを設定。InfluxDB v1を選択した場合は、Database、Username、Passwordを設定してください。
- Server Hostに
- TLSを有効にするかどうかを決定します。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
- コネクター名を入力します。英数字の組み合わせで、例:
- Createをクリックする前に、Test ConnectivityでInfluxDBサーバーへの接続テストが可能です。
- ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップでBack to Connector ListまたはCreate Ruleを選択可能です。続けてルールとSinkを作成し、InfluxDBに転送するデータを指定できます。詳細はInfluxDB Sink付きルールの作成を参照してください。
InfluxDB Sink付きルールの作成
このセクションでは、EMQXでMQTTトピック t/#
のメッセージを処理し、設定済みのSinkを通じてInfluxDBに送信するルールの作成方法を説明します。
EMQXダッシュボードにアクセスし、左メニューのIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルール作成ページで、ルールIDに
my_rule
を入力します。SQL Editorにルールを設定します。例えば、トピック
t/#
のMQTTメッセージをInfluxDBに保存したい場合、以下のSQL文を使用します。TIP
独自のSQL文を指定する場合は、後で設定するSinkのデータ形式に含まれるすべての変数が
SELECT
句に含まれていることを確認してください。sqlSELECT * FROM "t/#"
注:初心者の方はSQL ExamplesとEnable Testを使ってSQLルールの学習とテストが可能です。
- Add Actionボタンをクリックし、ルールがトリガーするアクションを定義します。このアクションにより、EMQXはルールで処理したデータをInfluxDBに送信します。
Type of Actionのドロップダウンから
InfluxDB
を選択します。ActionはデフォルトのCreate Action
のままにします。既に作成済みのSinkがあれば選択可能ですが、ここでは新規Sinkを作成します。Sink名を入力します。英数字の組み合わせで指定してください。
Connectorのドロップダウンから先に作成した
my_influxdb
を選択します。新規作成も可能です。設定パラメータはコネクターの作成を参照してください。Time Precisionを指定します。デフォルトは
millisecond
です。Data Formatを
JSON
またはLine Protocol
から選択し、InfluxDBへのデータ解析・書き込み方法を指定します。- JSON形式の場合、Measurement、Timestamp、Fields、Tagsの解析方法を定義します。すべてのキーは変数やプレースホルダーを利用可能で、InfluxDB line protocolに準拠して設定できます。FieldsはCSVファイルによる一括設定も可能です。詳細は一括設定を参照してください。
- Line Protocol形式の場合、テキストベースでmeasurement、tag set、field set、timestampを指定し、InfluxDB line protocolの構文に従ったプレースホルダーを利用できます。
TIP
- InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後に
i
を付けます。例:${payload.int}i
。詳細はInfluxDB 1.8で整数値を書き込むを参照してください。 - 符号なし整数型の値を書き込む場合は、プレースホルダーの後に
u
を付けます。例:${payload.int}u
。詳細は同上リンクを参照してください。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。プライマリSinkがメッセージ処理に失敗した場合にこれらがトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定(任意):詳細設定を参照してください。
Createをクリックする前に、Test ConnectivityでSinkがInfluxDBサーバーに接続可能かテストできます。
CreateをクリックしてSink作成を完了します。ルール作成ページのAction Outputsタブに新規Sinkが表示されます。
ルール作成ページで設定内容を確認し、Createをクリックしてルールを生成します。
これでルールが作成され、Ruleページに新規ルールが表示されます。**Actions(Sink)**タブをクリックすると、新規InfluxDB Sinkが確認できます。
また、Integration -> Flow Designerでトポロジーを確認できます。トピック t/#
のメッセージがルールmy_rule
で解析され、InfluxDBに送信・保存されていることがわかります。
一括設定
InfluxDBでは1つのデータエントリに数百のフィールドが含まれることが多く、データ形式の設定が複雑になりがちです。これに対応するため、EMQXはフィールドの一括設定機能を提供しています。
JSON形式でデータ形式を設定する際、CSVファイルからフィールドのキー・バリューを一括インポートできます。
FieldsテーブルのBatch Settingボタンをクリックし、Import Batch Settingポップアップを開きます。
指示に従い、一括設定テンプレートファイルをダウンロードし、テンプレートにフィールドのキー・バリューを記入します。デフォルトのテンプレート内容は以下の通りです:
Field Value 備考(任意) temp ${payload.temp} hum ${payload.hum} precip ${payload.precip}i フィールド値に i
を付けてInfluxDBに整数として保存する指示- Field:フィールドキー。定数または
${var}
形式のプレースホルダーをサポート。 - Value:フィールド値。定数またはプレースホルダーをサポートし、line protocolに従い型識別子を付加可能。
- 備考:CSV内のメモ用で、EMQXへのインポートには含まれません。
CSVファイルの行数は2048行を超えないようにしてください。
- Field:フィールドキー。定数または
記入したテンプレートファイルを保存し、Import Batch SettingポップアップにアップロードしてImportをクリックし、一括設定を完了します。
インポート後、Fields設定テーブルでキー・バリューをさらに調整可能です。
ルールのテスト
MQTTクライアントMQTTXを使い、トピック t/1
にメッセージを送信してオンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello InfluxDB" }'
Sinkの稼働状況を確認すると、新規の受信メッセージと送信メッセージが1件ずつあるはずです。
InfluxDBのUIのData Explorerウィンドウで、メッセージがInfluxDBに書き込まれていることを確認できます。
詳細設定
このセクションでは、InfluxDBコネクターおよびSinkの詳細設定オプションについて説明します。ダッシュボードでコネクターやSinkを設定する際、Advanced Settingsに進み、以下のパラメータをニーズに合わせて調整できます。
項目 | 説明 | 推奨値 |
---|---|---|
Start Timeout | コネクターが自動起動したリソース(例:InfluxDBのデータベースインスタンス)が正常状態になるまで待機する最大秒数です。リソースが準備完了するまで処理を進めないようにするための設定です。 | 5 |
Buffer Pool Size | EMQXとInfluxDB間の送信型ブリッジでデータフローを管理するバッファワーカープロセス数を指定します。これらのプロセスはデータを一時的に保持し、送信前に処理します。IngressのみのSinkでは0 に設定可能です。 | 4 |
Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)です。TTLを超えたリクエストやInfluxDBからの応答・アックが遅延したリクエストは期限切れとみなされます。 | 45 |
Health Check Interval | SinkがInfluxDB接続のヘルスチェックを自動的に行う間隔(秒)です。 | 15 |
Max Buffer Queue Size | 各バッファワーカーがInfluxDB Sinkでバッファリング可能な最大バイト数です。データフローの効率化に関わるため、システム性能や転送要件に応じて調整してください。 | 1 |
Max Batch Size | EMQXからInfluxDBに一度に送信するデータバッチの最大サイズです。サイズ調整によりデータ転送効率を最適化できます。1 に設定すると個別に送信されます。 | 100 |
Query Mode | メッセージ送信の最適化のため、asynchronous (非同期)またはsynchronous (同期)を選択可能です。非同期モードではInfluxDBへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがInfluxDB到達前にメッセージを受け取る可能性があります。 | Async |
Inflight Window | 送信済みだが応答・アックをまだ受け取っていない「インフライトクエリ」の最大数を制御します。Query Modeがasync の場合、同一MQTTクライアントのメッセージを厳密に順序処理したい場合は1 に設定してください。 | 100 |
参考情報
以下のリンクからさらに詳しく学べます:
ブログ:
1時間で構築するEMQX + InfluxDB + Grafana IoTデータ可視化ソリューション