ElasticsearchへのMQTTデータ取り込み
Elasticsearchは、分散型の検索およびデータ分析エンジンであり、多様なデータタイプに対して全文検索、構造化検索、分析機能を提供します。EMQX PlatformはElasticsearchと連携することで、MQTTデータをElasticsearchにシームレスに取り込み、保存することが可能です。この連携により、Elasticsearchの強力なスケーラビリティと分析機能を活用し、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存および分析ソリューションを提供します。
本ページでは、EMQX PlatformとElasticsearch間のデータ統合について詳細に説明し、ルールおよびアクションの作成方法を実践的に解説します。
動作概要
Elasticsearchとのデータ統合はEMQX Platformの標準機能であり、EMQX Platformのデバイスアクセスおよびメッセージ転送機能とElasticsearchのデータ保存・分析機能を組み合わせています。シンプルな設定によりMQTTデータのシームレスな統合が実現できます。
EMQX PlatformとElasticsearchは、リアルタイムのデバイスデータを効率的に収集・分析するスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQX Platformがデバイスアクセス、メッセージ転送、データルーティングを担うIoTプラットフォームとして機能し、Elasticsearchはデータ保存および分析プラットフォームとして、データの保存、検索、分析を担当します。
EMQX Platformはルールエンジンとアクションを通じてデバイスデータをElasticsearchに転送し、Elasticsearchは強力な検索・分析機能を活用してレポートやグラフなどのデータ分析結果を生成し、Kibanaの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです:
- デバイスからのメッセージパブリッシュと受信:IoTデバイスはMQTTプロトコルで接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Platformはこれを受信し、ルールエンジンで比較処理を行います。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンを用いて、特定のトピックに基づくMQTTメッセージを処理します。ルールエンジンは対応するルールにマッチしたメッセージを処理し、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などを行います。
- Elasticsearchへの書き込み:ルールエンジンで定義されたルールにより、メッセージをElasticsearchに書き込む操作がトリガーされます。Elasticsearchアクションは柔軟な操作方法とドキュメントテンプレートを提供し、メッセージの特定フィールドを対応するインデックスに書き込みます。
デバイスデータがElasticsearchに書き込まれた後は、Elasticsearchの検索・分析機能を柔軟に活用して以下のような処理が可能です:
- ログ監視:IoTデバイスは大量のログデータを生成し、これをElasticsearchに送信して保存・分析できます。Kibanaなどの可視化ツールと連携し、デバイスの状態、稼働記録、エラーメッセージなどのリアルタイム情報をグラフ化し、開発者や運用者が潜在的な問題を迅速に特定・解決できます。
- 地理情報(マップ):IoTデバイスは位置情報データを生成することが多く、これをElasticsearchに保存可能です。KibanaのMaps機能を使って、デバイスの位置情報を地図上に可視化し、追跡や分析を行えます。
- エンドポイントセキュリティ:IoTデバイスのセキュリティログデータをElasticsearchに送信し、Elastic Securityと連携してセキュリティレポートを生成。デバイスのセキュリティ状況をリアルタイムで監視し、潜在的な脅威を検知・対応できます。
特長と利点
Elasticsearchデータ統合は、以下の特長と利点をビジネスにもたらします:
- 効率的なデータインデックスと検索:ElasticsearchはEMQX Platformからの大規模リアルタイムメッセージデータを容易に処理可能です。強力な全文検索およびインデックス機能により、IoTメッセージデータの高速かつ効率的な検索・クエリが実現します。
- データの可視化:Elastic Stackの一部であるKibanaとの連携により、IoTデータの強力な可視化が可能で、データの理解と分析を支援します。
- 柔軟なデータ操作:EMQX PlatformのElasticsearch連携は、インデックス名、ドキュメントID、ドキュメントテンプレートの動的設定をサポートし、ドキュメントの作成、更新、削除が可能で、多様なIoTデータ統合シナリオに適応します。
- スケーラビリティ:ElasticsearchおよびEMQX Platformはクラスターをサポートし、ノードの追加により処理能力を容易に拡張可能で、ビジネスの継続的な拡大を支えます。
はじめる前に
このセクションでは、EMQX PlatformでElasticsearchデータ統合を作成する前の準備作業として、Elasticsearchのインストールおよびインデックス作成について説明します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Elasticsearchのデプロイとインデックス作成
EMQX PlatformはプライベートにデプロイしたElasticsearchまたはクラウド上のElasticと連携可能です。DockerまたはElastic Cloudを用いてElasticsearchインスタンスをデプロイできます。
DockerでElasticsearchをデプロイ
Docker環境がない場合は、Dockerをインストールしてください。
X-Packセキュリティ認証を有効にしたElasticsearchコンテナを起動します。デフォルトのユーザー名は
elastic
、パスワードはpublic
に設定します。bashdocker run -d --name elasticsearch \ -p 9200:9200 \ -p 9300:9300 \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=true" \ -e "ELASTIC_PASSWORD=public" \ docker.elastic.co/elasticsearch/elasticsearch:7.10.1
デバイスがパブリッシュするメッセージを保存するための
device_data
インデックスを作成します。Elasticsearchのユーザー名とパスワードは適宜置き換えてください。bashcurl -u elastic:public -X PUT "localhost:9200/device_data?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "ts": { "type": "date" }, "clientid": { "type": "keyword" }, "payload": { "type": "object", "dynamic": true } } } }'
Elastic Cloudでのデプロイ
Elastic Cloudの利用方法については、公式ガイドをご参照ください。
- Elastic Cloudは14日間の無料トライアルを提供しており、登録後に独自のデプロイメントを作成できます。登録後、Elastic Cloudコンソールが表示されます。
- デプロイメントを開始するには、Create deploymentをクリックします。
- Elasticsearchのエンドポイント情報および認証情報を控えておきます。
- デバイスがパブリッシュするメッセージを保存するための
device_data
インデックスを作成します。Elasticsearchのユーザー名とパスワードは適宜置き換えてください。
curl -u elastic:xxxx -X PUT "{Elasticsearch endpoint}/device_data?pretty" -H 'Content-Type: application/json' -d'
{
"mappings": {
"properties": {
"ts": { "type": "date" },
"clientid": { "type": "keyword" },
"payload": {
"type": "object",
"dynamic": true
}
}
}
}'
コネクターの作成
データ統合ルールを作成する前に、ElasticsearchサーバーにアクセスするためのElasticsearchコネクターを作成する必要があります。
デプロイメントにアクセスし、左側のナビゲーションメニューからデータ統合をクリックします。初めてコネクターを作成する場合は、Data Forwardカテゴリの下にあるElasticsearchを選択します。すでにコネクターを作成済みの場合は、New Connectorを選択し、続いてData ForwardカテゴリのElasticsearchを選択します。
コネクター名はシステムが自動生成します。
接続情報を入力します:
- Server:ElasticsearchサービスのRESTインターフェースURLを
http://{host}:9200
など適切に入力します。 - Username:Elasticsearchサービスのユーザー名(例:
elastic
)を指定します。 - Password:Elasticsearchサービスのパスワードを入力します。
- Enable TLS:暗号化接続を確立する場合はトグルスイッチをオンにします。
- 必要に応じて高度な設定を行います(任意)。
- Server:ElasticsearchサービスのRESTインターフェースURLを
Testボタンをクリックし、Elasticsearchにアクセス可能であればconnector availableのメッセージが表示されます。
Newボタンをクリックして作成を完了します。
これで、このコネクターを基にデータブリッジルールを作成できます。
ルールの作成
このセクションでは、EMQX Platformコンソールを使ってElasticsearchルールを作成し、ルールにアクションを追加する方法を示します。
ルールエリアでNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。
利用したい機能に基づき、SQL Editorでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温湿度メッセージを送信した際にエンジンをトリガーするSQL例を示します。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload FROM "temp_hum/emqx"
TIP
初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことを推奨します。
Nextをクリックしてアクションを追加します。
Connectorドロップダウンから先ほど作成したコネクターを選択します。
EMQX PlatformからElasticsearchサービスへメッセージをパブリッシュするための情報を設定します:
Action:
Create
、Update
、Delete
のいずれかのアクションを選択(任意)。Index Name:アクションを実行するインデックスまたはインデックスエイリアス名。
${var}
形式のプレースホルダーをサポート。Document ID:
Create
アクションでは任意、その他のアクションでは必須。インデックス内のドキュメントの一意識別子。${var}
形式のプレースホルダーをサポート。指定しない場合はElasticsearchが自動生成。Routing:ドキュメントを格納するインデックスのシャードを指定。空欄の場合はElasticsearchが決定。
Document Template:カスタムドキュメントテンプレート。JSONオブジェクトに変換可能で、
${var}
形式のプレースホルダーをサポート。例:{ "field": "${payload.field}"}
や${payload}
。Max Retries:書き込み失敗時の最大リトライ回数。デフォルトは3回。
Overwrite Document(
Create
アクション特有):既存ドキュメントがある場合に上書きするかどうか。No
の場合は書き込み失敗。
例では、インデックス名を
device_data
に設定し、クライアントIDとタイムスタンプの組み合わせ${clientid}_${ts}
をドキュメントIDとして使用しています。ドキュメントにはクライアントID、現在のタイムスタンプ、メッセージ本文全体を格納します。ドキュメントテンプレートは以下の通りです:json{ "clientid": "${clientid}", "ts": ${ts}, "payload": ${payload} }
Advanced Settingsを展開し、同期/非同期モード、キューやキャッシュなどのパラメータを必要に応じて設定します(任意)。
Confirmボタンをクリックしてルール作成を完了します。
Successful new ruleのポップアップでBack to Rulesをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
温湿度データの送信をシミュレートするために、MQTTXの利用を推奨しますが、他のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
_search
APIを使ってインデックス内のドキュメント内容を確認し、device_data
インデックスにデータが書き込まれているかを確認します。bashcurl -u elastic:public -X GET "localhost:9200/device_data/_search?pretty"
正しいレスポンス例は以下の通りです:
json{ "took": 484, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "device_data", "_type": "_doc", "_id": "mqttx_a2acfd19_1711359139238", "_score": 1.0, "_source": { "clientid": "mqttx_a2acfd19", "ts": 1711359139238, "payload": { "temp": "27.5", "hum": "41.8" } } } ] } }