MQTTデータをInfluxDBに取り込む
InfluxDBは時系列データの保存と解析に特化したデータベースです。高いデータスループット性能と安定した動作により、IoT分野での活用に非常に適しています。EMQX Platformは、InfluxDB Cloud、InfluxDB OSS、InfluxDB Enterpriseの主要なバージョンへの接続をサポートしています。
本ページでは、EMQX PlatformとInfluxDB間のデータ統合について、作成方法や動作確認を含めて詳しく解説します。
動作の仕組み
InfluxDBデータ統合は、EMQX Platformに標準搭載された機能であり、EMQX Platformのリアルタイムデータ取得・転送機能とInfluxDBのデータ保存・解析機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX PlatformからInfluxDBへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。EMQX PlatformはルールエンジンとSinkを介してデバイスデータをInfluxDBに転送し保存・解析を行います。InfluxDBは解析結果としてレポートやグラフを生成し、InfluxDBの可視化ツールを通じてユーザーに提供します。
以下の図は、エネルギー貯蔵シナリオにおけるEMQXとInfluxDBの典型的なデータ統合アーキテクチャを示しています。
EMQX PlatformとInfluxDBは、エネルギー消費データをリアルタイムに効率的に収集・解析するための拡張可能なIoTプラットフォームを提供します。このアーキテクチャでは、EMQX PlatformがIoTプラットフォームとしてデバイス接続、メッセージ送受信、データルーティングを担当し、InfluxDBがデータ保存と解析を担います。ワークフローは以下の通りです。
- メッセージのパブリッシュと受信:エネルギー貯蔵機器や産業用IoT機器はMQTTプロトコルでEMQX Platformに接続し、電力消費量や入出力電力などのデータを定期的にパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:組み込みのルールエンジンを使い、特定のトピックに基づくメッセージを処理します。メッセージはルールエンジンを通過し、対応するルールにマッチすると、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
- InfluxDBへのデータ取り込み:ルールエンジンで定義したルールがトリガーとなり、メッセージをInfluxDBに書き込む操作が実行されます。InfluxDB SinkはLine Protocolテンプレートを提供し、メッセージの特定フィールドをInfluxDBのメジャメントやフィールドに柔軟にマッピング可能です。
エネルギー消費データがInfluxDBに書き込まれた後は、Line Protocolを活用して以下のような解析が可能です。
- Grafanaなどの可視化ツールに接続し、エネルギー貯蔵データのグラフを生成。
- 業務システムに接続し、エネルギー貯蔵機器の状態監視やアラート発報を実施。
特長とメリット
InfluxDBデータ統合は以下の特長と利点を提供します。
- 効率的なデータ処理:EMQX Platformは大量のIoTデバイス接続とメッセージスループットを処理可能であり、InfluxDBは高速なデータ書き込み、保存、クエリ性能を持つため、IoTシナリオのデータ処理要件をシステムに過負荷をかけずに満たします。
- メッセージ変換:EMQX Platformのルールを用いて、InfluxDBに書き込む前にメッセージを多様に処理・変換可能です。
- スケーラビリティ:EMQX PlatformとInfluxDBの両方がクラスター拡張に対応し、ビジネスの成長に応じて柔軟に水平拡張できます。
- 豊富なクエリ機能:InfluxDBは最適化された関数、演算子、インデックス技術を備え、時系列データの効率的なクエリと解析を実現し、IoTデータから価値あるインサイトを抽出します。
- 効率的なストレージ:InfluxDBは高圧縮率のエンコード方式を採用し、ストレージコストを大幅に削減します。また、データタイプごとに保存期間をカスタマイズ可能で、不要なデータによるストレージ占有を防止します。
はじめる前に
本節では、EMQX PlatformでInfluxDBデータ統合を作成するための準備作業を紹介します。
前提条件
- InfluxDB line protocolの知識。
- ルールの理解。
- データ統合の理解。
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
InfluxDBのインストールと設定
DockerによるInfluxDBのインストール
- Dockerを使ってInfluxDBをインストールし、コンテナを起動します。
# InfluxDBのDockerイメージを起動
docker run --name influxdb -p 8086:8086 influxdb:2.5.1
InfluxDBが起動したら、サーバーの8086ポートにブラウザでアクセスし、Username、Password、Organization Name、Bucket Nameを設定します。
InfluxDB UIでLoad Data -> API Tokenをクリックし、all-accessトークンの作成手順に従います。
InfluxDB Cloudでのサービス作成
InfluxDB Cloudにログインします。
InfluxDB UIでUsername、Password、Organization Name、Bucket Nameを設定します。
InfluxDB UIでLoad Data -> API Tokenをクリックし、all-accessトークンの作成手順に従います。
InfluxDBコネクターの作成
データ統合ルールを作成する前に、まずInfluxDBサーバーにアクセスするためのInfluxDBコネクターを作成します。
デプロイメントに移動し、左メニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、Data Forwardカテゴリの下にあるInfluxDBを選択します。すでにコネクターを作成済みの場合は、New Connectorを選択し、続けてData ForwardカテゴリのInfluxDBを選択します。
Connector Name:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- 必要に応じてVersion of InfluxDBを選択(デフォルトは
V2
)。 - Server Host:サーバーのIPアドレスとポート。InfluxDB Cloudを利用する場合はポート443を指定し、
{url}:443
の形式で入力し、Enable TLSを有効化してTLS接続を行います。 - InfluxDBのインストールと設定に従い、Token、Organization、Bucketを設定します。InfluxDB v1を選択した場合は、Database、Username、Passwordの設定も必要です。
- 必要に応じてVersion of InfluxDBを選択(デフォルトは
Testボタンをクリックし、InfluxDBサービスにアクセスできるか確認します。成功すると成功メッセージが表示されます。
Newボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理済みデータをInfluxDBに転送するアクションを追加するルールを作成します。
ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。
使用する機能に基づいてSQL Editorでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温度・湿度メッセージを送信した際にエンジンをトリガーするSQLの例を示します。sqlSELECT timestamp, payload.location as location, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
初心者の方はSQL ExamplesとEnable Testをクリックして、SQLルールの学習とテストが可能です。
Nextをクリックしてアクションを追加します。
Connectorドロップダウンから先ほど作成したコネクターを選択します。
Time Precisionを指定します。デフォルトは
millisecond
です。Data Formatを
JSON
またはLine Protocol
から選択し、InfluxDBへのデータ解析・書き込み方法を指定します。JSON形式の場合、Measurement、Timestamp、Fields、Tagsの解析方法を定義します。すべてのキー値は変数やプレースホルダーを使用可能で、InfluxDB line protocolに準拠して設定できます。FieldsはCSVファイルをインポートして一括設定も可能です。詳細はバッチ設定を参照してください。
Line Protocol形式の場合、テキストベースのフォーマットで、メジャメント、タグセット、フィールドセット、タイムスタンプを指定し、プレースホルダーもサポートします。詳細はInfluxDB 2.3 Line ProtocolおよびInfluxDB 1.8 Line Protocolを参照してください。
例:
bashtemp_hum,location=${location} temp=${temp},hum=${hum} ${timestamp}
TIP
- InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後に
i
を付けます。例:${payload.int}i
。詳細はInfluxDB 1.8整数値書き込みを参照してください。 - 符号なし整数型の値を書き込む場合は、
u
を付けます。例:${payload.int}u
。詳細は同上を参照してください。
- InfluxDB 1.xまたは2.xに符号付き整数型の値を書き込む場合は、プレースホルダーの後に
高度な設定(任意):高度な設定を参照してください。
Confirmボタンをクリックしてルール作成を完了します。
Successful new ruleポップアップでBack to Rulesをクリックし、データ統合設定の一連の作業が完了します。
バッチ設定
InfluxDBでは1つのデータエントリに数百のフィールドが含まれることが多く、データ形式の設定が複雑になる場合があります。これを解決するため、EMQX Platformはフィールドのバッチ設定機能を提供しています。
JSON形式でデータ形式を設定する際、CSVファイルからフィールドのキー・バリューを一括インポートできます。
FieldsテーブルのImportボタンをクリックし、Import Batch Settingポップアップを開きます。
指示に従い、まずバッチ設定テンプレートファイルをダウンロードし、テンプレートにフィールドのキー・バリューを記入します。テンプレートのデフォルト内容は以下の通りです。
Field Value 備考(任意) temp ${payload.temp} hum ${payload.hum} precip ${payload.precip}i フィールド値に i
を付けてInfluxDBに整数として保存する指示。- Field:フィールドキー。定数または
${var}
形式のプレースホルダーをサポート。 - Value:フィールド値。定数またはプレースホルダーをサポートし、line protocolに従い型識別子を付加可能。
- 備考:CSVファイル内の注釈用で、EMQXへのインポート対象外。
CSVファイルのバッチ設定データは2048行以内にしてください。
- Field:フィールドキー。定数または
記入済みテンプレートファイルを保存し、Import Batch Settingポップアップにアップロード後、Importをクリックしてバッチ設定を完了します。
インポート後、Fields設定テーブルでキー・バリューの調整も可能です。
ルールのテスト
温度・湿度データの送信シミュレーションにはMQTTXの利用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": 27.5, "hum": 41.8, "location": "Prague" }
InfluxDB UIのData Explorerウィンドウで、メッセージがInfluxDBに書き込まれているか確認します。
InfluxDB V1を使用している場合は、InfluxDBコンテナに入り、データを確認します。
bash$ docker exec -it influxdb influx $ use db
bash> select * from "temp_hum" name: temp_hum time hum location temp ---- --- -------- ---- 1711093437420000000 41.8 Prague 27.5 >
コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報および該当ルール下のすべてのアクション統計が表示されます。