# Upstash for Redis への MQTT データ取り込み

Upstash はクラウドベースのサーバーレスデータプラットフォームであり、開発者がインフラ管理の手間なく Redis データベースと Kafka をアプリケーションにシームレスに統合できる環境を提供します。サーバーレスアーキテクチャを採用しており、高性能なインメモリデータストアである Redis と Kafka の利点を、デプロイやスケーリング、メンテナンスの複雑さを気にすることなく享受できます。

本ページでは、Upstash データ統合の機能概要と実装に関する実践的なガイドを詳述します。Kafka コネクターの作成、ルールの定義、動作検証などの基本作業をカバーし、さらに MQTT プロトコルを用いてシミュレートした温湿度データを EMQX Platform に報告し、設定したデータ統合を通じて Upstash に保存する手順を示します。

## 動作概要

Upstash Redis データ統合は EMQX Platform に標準搭載された機能であり、MQTT ベースの IoT データと Kafka の強力なデータ処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを通じて、複雑なコーディングなしに両プラットフォーム間のデータフローと処理を簡素化します。

以下の図は、EMQX Platform と Redis 間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform Integration Redis](./_assets/data_integration_redis.png)

Upstash Redis への MQTT データ取り込みは以下の流れで行われます。

1. **メッセージのパブリッシュと受信**：産業用 IoT デバイスは MQTT プロトコルを通じて EMQX Platform デプロイメントに正常に接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイム MQTT データを EMQX Platform にパブリッシュします。EMQX Platform はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **メッセージデータの処理**：メッセージ到着時にルールエンジンを通過し、EMQX Platform で定義されたルールに基づいて処理されます。ルールは事前定義された条件により、どのメッセージを Redis にルーティングするかを判定します。ペイロード変換が指定されている場合は、データ形式の変換や特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
3. **Redis へのデータ取り込み**：ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために設定された Redis コマンドを実行するアクションがトリガーされます。
4. **データの保存と活用**：Redis に保存されたデータを読み取ることで、企業は豊富なデータ操作機能を活用し、多様なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得や GPS 地理位置情報の分析、リアルタイムデータ分析やソートなどの操作が可能となり、リアルタイム追跡やルート推奨などの機能を支えます。

## 特長とメリット

Redis とのデータ統合は、効率的なデータ送信、処理、活用を実現するための多彩な特長とメリットを備えています。

- **高性能かつスケーラブル**：EMQX の分散アーキテクチャと Redis のクラスター モードにより、データ量の増加に応じてアプリケーションをシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を保証します。
- **リアルタイムデータストリーム**：EMQX Platform はリアルタイムデータストリーム処理に特化しており、デバイスから Redis への効率的かつ信頼性の高いデータ伝送を実現します。Redis は高速なデータ操作を実行できるため、リアルタイムのデータキャッシュに最適なデータストレージコンポーネントとなります。
- **リアルタイムデータ分析**：Redis はデバイス接続数やメッセージパブリッシュ数、特定の業務指標などのリアルタイムメトリクスを計算可能です。一方、EMQX Platform はリアルタイムメッセージの送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
- **地理位置情報分析**：Redis は地理空間データ構造とコマンドを備え、地理位置情報の保存とクエリが可能です。EMQX Platform の強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様な IoT アプリケーションに幅広く応用できます。

## はじめる前に

本節では、EMQX Platform で Upstash for Redis データ統合を作成するための準備作業を紹介します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### Upstash for Redis データベースのセットアップ

Upstash を利用開始するには、https://upstash.com/ にアクセスし、アカウントを作成してください。

#### Redis データベースの作成

1. ログイン後、**Create Database** ボタンをクリックして Redis データベースを作成します。

2. 有効な名前を入力し、データベースをデプロイしたいリージョンを選択します。パフォーマンスを最適化するため、デプロイメントのリージョンに最も近いリージョンを選択することを推奨します。

3. **Create** をクリックすると、サーバーレスの Redis データベースが作成されます。

#### 詳細の確認

データベースコンソールに入り、次のステップに必要な情報を確認してください。

## コネクターの作成

1. デプロイメントに移動し、左ナビゲーションメニューから **Data Integration** をクリックします。

2. 初めてコネクターを作成する場合は、**Data Persistence** カテゴリーの中から **Upstash for Redis** を選択します。既にコネクターを作成済みの場合は、**New Connector** を選択し、続けて **Data Persistence** カテゴリーの中から **Upstash for Redis** を選択します。

3. **Connector Name**：システムが自動的にコネクター名を生成します。

4. 接続情報を入力します：

   - **Server Host**：Redis 詳細ページの **Endpoints** と **Port** の情報
   - **Password**：Redis 詳細ページの **Password** の情報
   - その他はデフォルトのままにします。

5. 詳細設定（任意）

6. **Test** ボタンをクリックします。Redis サービスにアクセス可能であれば成功のメッセージが表示されます。

7. **New** ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータを Upstash for Redis に転送するためのアクションをルールに追加します。

1. ルールエリアの **New Rule** をクリックするか、作成したコネクターの **Actions** 列にある新規ルールアイコンをクリックします。

2. **SQL Editor** にルールのマッチング SQL 文を入力します。以下のルールでは、メッセージが報告された時間 `arrived`、クライアント ID、`temp_hum/emqx` トピックのペイロードから温度と湿度を読み取ります。

   ```sql
   SELECT
     timestamp as up_timestamp,
     clientid as client_id,
     payload.temp as temp,
     payload.hum as hum
   FROM
     "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は、**SQL Examples** と **Enable Test** をクリックして SQL ルールの学習とテストを行うことができます。

   :::

3. **Next** をクリックしてアクションを追加します。

4. **Connector** ドロップダウンから先ほど作成したコネクターを選択します。

5. **Redis Command Template** を設定します。トピックから読み取った「up_timestamp」「client ID」「温度」「湿度」のデータを Redis に保存します。

   ```sql
   HMSET ${client_id} ${up_timestamp} ${temp}
   ```

6. **Confirm** ボタンをクリックしてルール作成を完了します。

7. **Successful new rule** ポップアップで **Back to Rules** をクリックし、データ統合設定の一連の作業を完了します。

## ルールのテスト

温湿度データの報告をシミュレートするために [MQTTX](https://mqttx.app/) の使用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック: `temp_hum/emqx`
   - クライアント ID: `test_client`
   - ペイロード:

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. Upstash コンソールでデータを確認します。Data Browser でクライアントエントリーを選択すると、メッセージを確認できます。

3. コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報とそのルールに属するすべてのアクションの統計情報が表示されます。
