# CassandraへのMQTTデータ取り込み

[Apache Cassandra](https://cassandra.apache.org/_/index.html) は、大規模データセットの処理と高スループットアプリケーションの構築に適した、オープンソースの分散型NoSQLデータベース管理システムです。EMQX PlatformとApache Cassandraの連携により、Cassandraデータベースへのメッセージやイベントの保存が可能となり、時系列データの保存、デバイス登録・管理、リアルタイムデータ分析などの機能を実現できます。

本ページでは、EMQX PlatformとCassandra間のデータ統合について、実践的な手順を交えて包括的に解説します。

:::tip
現在の実装はCassandra v3.xのみ対応しており、v4.xには未対応です。
:::

## 動作概要

Cassandraデータ統合はEMQX Platformの標準機能であり、EMQX Platformのデバイス接続およびメッセージ送受信機能とCassandraの強力なデータ保存機能を組み合わせています。組み込みの[ルールエンジン](./rules.md)コンポーネントにより、EMQX PlatformからCassandraへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXとCassandra間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform Integration Cassandra](./_assets/data_integration_cassandra.png)

MQTTデータのCassandraへの取り込みは以下の流れで行われます。

1. **メッセージのパブリッシュと受信**：接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQX Platformに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQX Platformがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージ到着後、ルールエンジンを通過し、EMQX Platformで定義されたルールに従って処理されます。ルールは事前定義された条件に基づき、Cassandraへルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
3. **Cassandraへのデータ取り込み**：ルールエンジンがCassandra保存対象のメッセージを特定すると、Cassandraへの転送アクションをトリガーします。処理済みデータはCassandraデータベースのコレクションにシームレスに書き込まれます。
4. **データの保存と活用**：データがCassandraに保存されることで、企業はそのクエリ機能を活用して多様なユースケースに対応可能です。例えば、接続車両分野では、車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などに利用できます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用されます。

## 特長と利点

Cassandraとのデータ統合は、効率的なデータ送信、保存、活用を実現するための多彩な特長と利点を提供します。

- **大規模時系列データ保存**：EMQX Platformは大量のデバイス接続とメッセージ送信を処理可能です。Cassandraの高いスケーラビリティと分散ストレージ機能を活用し、大規模データセット（時系列データを含む）の保存・管理が可能で、時間範囲に基づくクエリや集計操作をサポートします。
- **リアルタイムデータストリーミング**：EMQX Platformはリアルタイムデータストリーム処理に最適化されており、ソースシステムからCassandraへの効率的かつ信頼性の高いデータ送信を保証します。即時の洞察やアクションが必要なユースケースに適しています。
- **高可用性の保証**：EMQXとCassandraの両方がクラスタリング機能を提供します。組み合わせて使用することで、デバイス接続やデータを複数サーバーに分散可能です。ノード障害時には他の利用可能なノードへ自動的に切り替わり、高いスケーラビリティとフォールトトレランスを実現します。
- **柔軟なデータ変換**：EMQX Platformの強力なSQLベースのルールエンジンにより、Cassandraへの保存前にデータを前処理できます。フィルタリング、ルーティング、集計、強化など多様な変換機構をサポートし、ニーズに応じたデータ整形が可能です。
- **柔軟なデータモデル**：Cassandraはカラムベースのデータモデルを採用し、柔軟なスキーマ設計およびカラムの動的追加をサポートします。構造化されたデバイスイベントやメッセージデータの保存・管理に適しており、多様なMQTTメッセージデータを容易に格納できます。

## はじめる前に

このセクションでは、TimescaleDBデータブリッジ作成の準備として、Cassandraサーバーのインストールやキー スペース・テーブルの作成方法について説明します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Cassandraサーバーのインストール

以下のコマンドでDockerを使い、シンプルなCassandraサービスを起動します。

```bash
docker run --name cassa --rm -p 9042:9042 cassandra:3.11.14
```

### キースペースとテーブルの作成

データブリッジ作成前に、キースペースとテーブルを作成する必要があります。

1. `mqtt`という名前のキースペースを作成します。

```bash
docker exec -it cassa cqlsh "-e CREATE KEYSPACE mqtt WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"
```

2. Cassandraに`temp_hum`テーブルを作成します。

```bash
docker exec -it cassa cqlsh "-e \
    CREATE TABLE mqtt.temp_hum( \
        msgid text, \
        temp text, \
        hum text, \
        arrived timestamp, \
        PRIMARY KEY(msgid));"
```

## Cassandraコネクターの作成

データ統合ルールを作成する前に、CassandraサーバーにアクセスするためのCassandraコネクターを作成する必要があります。

1. デプロイメントにアクセスし、左側ナビゲーションメニューから**データ統合**をクリックします。

2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**Cassandra**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、続いて**データ永続化**カテゴリの**Cassandra**を選択します。

3. 接続情報を入力します。

   - **Servers**：サーバーのIPアドレスとポート
   - **Keyspace**：`mqtt`
   - その他はデフォルトのままにします。
   - 暗号化接続を確立したい場合は、**TLSを有効にする**のトグルスイッチをオンにします。

4. 詳細設定（任意）

5. **テスト**ボタンをクリックし、Cassandraサービスにアクセス可能であれば成功メッセージが表示されます。

6. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込み対象のデータを指定し、処理済みデータをCassandraに転送するアクションをルールに追加します。

1. ルールエリアで**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールのマッチングSQL文を入力します。以下の例では、メッセージ到着時刻`arrived`、クライアントID、`temp_hum/emqx`トピックのペイロードから温度と湿度を読み取ります。

   ```sql
     SELECT
       id as msgid,
       payload.temp as temp,
       payload.hum as hum,
       timestamp as arrived
     FROM
       "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL例**をクリックし、**テストを有効にする**を使ってSQLルールの学習と検証が可能です。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. **CQLテンプレート**を設定し、`msgid`、`temp`、`hum`、`arrived`をCassandraに保存します。このテンプレートはCassandra Query Languageで実行され、サンプルコードは以下の通りです。

   ```sql
     INSERT INTO temp_hum(msgid, temp, hum, arrived)
     VALUES (
       ${msgid},
       ${temp},
       ${hum},
       ${arrived}
     )
   ```

6. 詳細設定（任意）

7. **確定**ボタンをクリックしてルール作成を完了します。

8. **新規ルール成功**のポップアップで**ルールに戻る**をクリックし、データ統合設定の一連の流れが完了します。

## ルールのテスト

[MQTTX](https://mqttx.app/)を使って温度・湿度データの送信をシミュレートすることを推奨しますが、他の任意のクライアントでも可能です。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. 以下のコマンドでメッセージがCassandraに保存されているか確認します。

   ```bash
    $ docker exec -it cassa cqlsh "-e SELECT * FROM mqtt.temp_hum;"

    msgid                            | arrived                         | hum  | temp
    ----------------------------------+---------------------------------+------+------
    00061488D7FBFE8F2C770000467D0011 | 2024-03-26 04:37:11.987000+0000 | 41.8 | 27.5

    (1 rows)
   ```

3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報が表示されます。
