# ElasticsearchへのMQTTデータ取り込み

[Elasticsearch](https://www.elastic.co/elasticsearch/)は、分散型の検索およびデータ分析エンジンであり、多様なデータタイプに対して全文検索、構造化検索、分析機能を提供します。EMQX PlatformはElasticsearchと連携することで、MQTTデータをElasticsearchにシームレスに取り込み、保存することが可能です。この連携により、Elasticsearchの強力なスケーラビリティと分析機能を活用し、IoTアプリケーション向けに効率的かつスケーラブルなデータ保存および分析ソリューションを提供します。

本ページでは、EMQX PlatformとElasticsearch間のデータ統合について詳細に説明し、ルールおよびアクションの作成方法を実践的に解説します。

## 動作概要

Elasticsearchとのデータ統合はEMQX Platformの標準機能であり、EMQX Platformのデバイスアクセスおよびメッセージ転送機能とElasticsearchのデータ保存・分析機能を組み合わせています。シンプルな設定によりMQTTデータのシームレスな統合が実現できます。

![EMQX Platform-Elasticsearch連携](./_assets/data_integration_elasticsearch.jpg)

EMQX PlatformとElasticsearchは、リアルタイムのデバイスデータを効率的に収集・分析するスケーラブルなIoTプラットフォームを提供します。このアーキテクチャでは、EMQX Platformがデバイスアクセス、メッセージ転送、データルーティングを担うIoTプラットフォームとして機能し、Elasticsearchはデータ保存および分析プラットフォームとして、データの保存、検索、分析を担当します。

EMQX Platformはルールエンジンとアクションを通じてデバイスデータをElasticsearchに転送し、Elasticsearchは強力な検索・分析機能を活用してレポートやグラフなどのデータ分析結果を生成し、Kibanaの可視化ツールを通じてユーザーに表示します。ワークフローは以下の通りです：

1. **デバイスからのメッセージパブリッシュと受信**：IoTデバイスはMQTTプロトコルで接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Platformはこれを受信し、ルールエンジンで比較処理を行います。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンを用いて、特定のトピックに基づくMQTTメッセージを処理します。ルールエンジンは対応するルールにマッチしたメッセージを処理し、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などを行います。
3. **Elasticsearchへの書き込み**：ルールエンジンで定義されたルールにより、メッセージをElasticsearchに書き込む操作がトリガーされます。Elasticsearchアクションは柔軟な操作方法とドキュメントテンプレートを提供し、メッセージの特定フィールドを対応するインデックスに書き込みます。

デバイスデータがElasticsearchに書き込まれた後は、Elasticsearchの検索・分析機能を柔軟に活用して以下のような処理が可能です：

1. **ログ監視**：IoTデバイスは大量のログデータを生成し、これをElasticsearchに送信して保存・分析できます。Kibanaなどの可視化ツールと連携し、デバイスの状態、稼働記録、エラーメッセージなどのリアルタイム情報をグラフ化し、開発者や運用者が潜在的な問題を迅速に特定・解決できます。
2. **地理情報（マップ）**：IoTデバイスは位置情報データを生成することが多く、これをElasticsearchに保存可能です。KibanaのMaps機能を使って、デバイスの位置情報を地図上に可視化し、追跡や分析を行えます。
3. **エンドポイントセキュリティ**：IoTデバイスのセキュリティログデータをElasticsearchに送信し、Elastic Securityと連携してセキュリティレポートを生成。デバイスのセキュリティ状況をリアルタイムで監視し、潜在的な脅威を検知・対応できます。

## 特長と利点

Elasticsearchデータ統合は、以下の特長と利点をビジネスにもたらします：

- **効率的なデータインデックスと検索**：ElasticsearchはEMQX Platformからの大規模リアルタイムメッセージデータを容易に処理可能です。強力な全文検索およびインデックス機能により、IoTメッセージデータの高速かつ効率的な検索・クエリが実現します。
- **データの可視化**：Elastic Stackの一部であるKibanaとの連携により、IoTデータの強力な可視化が可能で、データの理解と分析を支援します。
- **柔軟なデータ操作**：EMQX PlatformのElasticsearch連携は、インデックス名、ドキュメントID、ドキュメントテンプレートの動的設定をサポートし、ドキュメントの作成、更新、削除が可能で、多様なIoTデータ統合シナリオに適応します。
- **スケーラビリティ**：ElasticsearchおよびEMQX Platformはクラスターをサポートし、ノードの追加により処理能力を容易に拡張可能で、ビジネスの継続的な拡大を支えます。

## はじめる前に

このセクションでは、EMQX PlatformでElasticsearchデータ統合を作成する前の準備作業として、Elasticsearchのインストールおよびインデックス作成について説明します。

### 前提条件

- [データ統合](./introduction.md)の知識
- データ統合の[ルール](./rules.md)の知識

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Elasticsearchのデプロイとインデックス作成

EMQX PlatformはプライベートにデプロイしたElasticsearchまたはクラウド上のElasticと連携可能です。DockerまたはElastic Cloudを用いてElasticsearchインスタンスをデプロイできます。

#### DockerでElasticsearchをデプロイ

1. Docker環境がない場合は、[Dockerをインストール](https://docs.docker.com/install/)してください。

2. X-Packセキュリティ認証を有効にしたElasticsearchコンテナを起動します。デフォルトのユーザー名は`elastic`、パスワードは`public`に設定します。

   ```bash
   docker run -d --name elasticsearch \
       -p 9200:9200 \
       -p 9300:9300 \
       -e "discovery.type=single-node" \
       -e "xpack.security.enabled=true" \
       -e "ELASTIC_PASSWORD=public" \
       docker.elastic.co/elasticsearch/elasticsearch:7.10.1
   ```

3. デバイスがパブリッシュするメッセージを保存するための`device_data`インデックスを作成します。Elasticsearchのユーザー名とパスワードは適宜置き換えてください。

   ```bash
   curl -u elastic:public -X PUT "localhost:9200/device_data?pretty" -H 'Content-Type: application/json' -d'
   {
     "mappings": {
       "properties": {
         "ts": { "type": "date" },
         "clientid": { "type": "keyword" },
         "payload": {
           "type": "object",
           "dynamic": true
         }
       }
     }
   }'
   ```

#### Elastic Cloudでのデプロイ

Elastic Cloudの利用方法については、[公式ガイド](https://www.elastic.co/guide/en/starting-with-the-elasticsearch-platform-and-its-solutions/8.13/getting-started-guides.html)をご参照ください。

1. Elastic Cloudは[14日間の無料トライアル](https://cloud.elastic.co/registration)を提供しており、登録後に独自のデプロイメントを作成できます。登録後、Elastic Cloudコンソールが表示されます。
2. デプロイメントを開始するには、**Create deployment**をクリックします。
3. Elasticsearchのエンドポイント情報および認証情報を控えておきます。
4. デバイスがパブリッシュするメッセージを保存するための`device_data`インデックスを作成します。Elasticsearchのユーザー名とパスワードは適宜置き換えてください。

```bash
  curl -u elastic:xxxx -X PUT "{Elasticsearch endpoint}/device_data?pretty" -H 'Content-Type: application/json' -d'
  {
    "mappings": {
      "properties": {
        "ts": { "type": "date" },
        "clientid": { "type": "keyword" },
        "payload": {
          "type": "object",
          "dynamic": true
        }
      }
    }
  }'
```

## コネクターの作成

データ統合ルールを作成する前に、ElasticsearchサーバーにアクセスするためのElasticsearchコネクターを作成する必要があります。

1. デプロイメントにアクセスし、左側のナビゲーションメニューから**データ統合**をクリックします。初めてコネクターを作成する場合は、**Data Forward**カテゴリの下にある**Elasticsearch**を選択します。すでにコネクターを作成済みの場合は、**New Connector**を選択し、続いて**Data Forward**カテゴリの**Elasticsearch**を選択します。

2. **コネクター名**はシステムが自動生成します。

3. 接続情報を入力します：

    - **Server**：ElasticsearchサービスのRESTインターフェースURLを`http://{host}:9200`など適切に入力します。
    - **Username**：Elasticsearchサービスのユーザー名（例：`elastic`）を指定します。
    - **Password**：Elasticsearchサービスのパスワードを入力します。
    - **Enable TLS**：暗号化接続を確立する場合はトグルスイッチをオンにします。
    - 必要に応じて高度な設定を行います（任意）。

4. **Test**ボタンをクリックし、Elasticsearchにアクセス可能であれば**connector available**のメッセージが表示されます。

5. **New**ボタンをクリックして作成を完了します。

これで、このコネクターを基にデータブリッジルールを作成できます。

## ルールの作成

このセクションでは、EMQX Platformコンソールを使ってElasticsearchルールを作成し、ルールにアクションを追加する方法を示します。

1. ルールエリアで**New Rule**をクリックするか、作成したコネクターの**Actions**列にある新規ルールアイコンをクリックします。

2. 利用したい機能に基づき、**SQL Editor**でルールを設定します。ここでは、クライアントが`temp_hum/emqx`トピックに温湿度メッセージを送信した際にエンジンをトリガーするSQL例を示します。

   ```sql
    SELECT

     timestamp as up_timestamp,
     clientid as client_id,
     payload

    FROM
      "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL Examples**をクリックし、**Enable Test**を有効にしてSQLルールの学習とテストを行うことを推奨します。

   :::

3. **Next**をクリックしてアクションを追加します。

4. **Connector**ドロップダウンから先ほど作成したコネクターを選択します。

5. EMQX PlatformからElasticsearchサービスへメッセージをパブリッシュするための情報を設定します：

   - **Action**：`Create`、`Update`、`Delete`のいずれかのアクションを選択（任意）。

   - **Index Name**：アクションを実行するインデックスまたはインデックスエイリアス名。`${var}`形式のプレースホルダーをサポート。

   - **Document ID**：`Create`アクションでは任意、その他のアクションでは必須。インデックス内のドキュメントの一意識別子。`${var}`形式のプレースホルダーをサポート。指定しない場合はElasticsearchが自動生成。

   - **Routing**：ドキュメントを格納するインデックスのシャードを指定。空欄の場合はElasticsearchが決定。

   - **Document Template**：カスタムドキュメントテンプレート。JSONオブジェクトに変換可能で、`${var}`形式のプレースホルダーをサポート。例：`{ "field": "${payload.field}"}`や`${payload}`。

   - **Max Retries**：書き込み失敗時の最大リトライ回数。デフォルトは3回。

   - **Overwrite Document**（`Create`アクション特有）：既存ドキュメントがある場合に上書きするかどうか。`No`の場合は書き込み失敗。

   例では、インデックス名を`device_data`に設定し、クライアントIDとタイムスタンプの組み合わせ`${clientid}_${ts}`をドキュメントIDとして使用しています。ドキュメントにはクライアントID、現在のタイムスタンプ、メッセージ本文全体を格納します。ドキュメントテンプレートは以下の通りです：

   ```json
   {
     "clientid": "${clientid}",
     "ts": ${ts},
     "payload": ${payload}
   }
   ```

6. **Advanced Settings**を展開し、同期／非同期モード、キューやキャッシュなどのパラメータを必要に応じて設定します（任意）。

7. **Confirm**ボタンをクリックしてルール作成を完了します。

8. **Successful new rule**のポップアップで**Back to Rules**をクリックし、データ統合設定の一連の流れを完了します。

## ルールのテスト

温湿度データの送信をシミュレートするために、[MQTTX](https://mqttx.app/)の利用を推奨しますが、他のクライアントでも構いません。

1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - クライアントID：`test_client`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. `_search` APIを使ってインデックス内のドキュメント内容を確認し、`device_data`インデックスにデータが書き込まれているかを確認します。

   ```bash
   curl -u elastic:public -X GET "localhost:9200/device_data/_search?pretty"
   ```

   正しいレスポンス例は以下の通りです：

   ```json
   {
     "took": 484,
     "timed_out": false,
     "_shards": {
       "total": 1,
       "successful": 1,
       "skipped": 0,
       "failed": 0
     },
     "hits": {
       "total": {
         "value": 1,
         "relation": "eq"
       },
       "max_score": 1.0,
       "hits": [
         {
           "_index": "device_data",
           "_type": "_doc",
           "_id": "mqttx_a2acfd19_1711359139238",
           "_score": 1.0,
           "_source": {
             "clientid": "mqttx_a2acfd19",
             "ts": 1711359139238,
             "payload": {
               "temp": "27.5",
               "hum": "41.8"
             }
           }
         }
       ]
     }
   }
   ```
