# GCP Pub/Sub への MQTT データストリーム

Google Cloud Pub/Sub は、非常に高い信頼性とスケーラビリティを実現する非同期メッセージングサービスです。EMQX Platform は、MQTT データのリアルタイム抽出、処理、分析のために Google Cloud Pub/Sub とのシームレスな統合をサポートしています。Cloud Functions、App Engine、Cloud Run、Kubernetes Engine、Compute Engine などのさまざまな Google Cloud サービスへデータをプッシュできます。また、Google Cloud から MQTT へのデータ配信も可能で、ユーザーが GCP 上で迅速に IoT アプリケーションを構築できるよう支援します。

本ページでは、EMQX Platform と GCP Pub/Sub 間のデータ統合について、作成および検証の実践的な手順を交えて包括的に紹介します。

## 動作概要

GCP Pub/Sub データ統合は、EMQX Platform の標準機能として提供されており、MQTT データストリームを Google Cloud とシームレスに連携させ、IoT アプリケーション開発における豊富なサービスと機能を活用できるよう設計されています。

![data_integration_gcp_pubsub](./_assets/data_integration_gcp_pubsub.png)

EMQX Platform は、ルールエンジンと Sink を介して MQTT データを GCP Pub/Sub に転送します。GCP Pub/Sub のパブリッシャー役割の例を挙げると、全体の流れは以下の通りです。

1. **IoT デバイスがメッセージをパブリッシュ**：デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
2. **ルールエンジンがメッセージを処理**：組み込みのルールエンジンは、特定のトピックに基づいて MQTT メッセージを処理します。ルールエンジンは対応するルールとマッチし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報付加などを行います。
3. **GCP Pub/Sub へのブリッジング**：ルールはメッセージを GCP Pub/Sub に転送するアクションをトリガーし、データプロパティ、オーダーキー、MQTT トピックから GCP Pub/Sub トピックへのマッピングを簡単に設定できます。これにより、データ統合におけるより豊富なコンテキスト情報と順序保証が提供され、柔軟な IoT データ処理が可能になります。

MQTT メッセージデータが GCP Pub/Sub に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

- リアルタイムデータ処理と分析：Dataflow、BigQuery、Pub/Sub のストリーミング機能など、強力な Google Cloud のデータ処理・分析ツールを活用し、メッセージデータのリアルタイム処理と分析を行い、有用なインサイトや意思決定支援を得られます。
- イベント駆動型機能：Cloud Functions や Cloud Run などの Google Cloud イベント処理をトリガーし、動的かつ柔軟な関数トリガーと処理を実現します。
- データ保存と共有：Cloud Storage や Firestore などの Google Cloud ストレージサービスにメッセージデータを送信し、大量データの安全な保存と管理を行います。これにより、他の Google Cloud サービスと連携してデータの共有や分析が可能となり、多様なビジネスニーズに対応できます。

## 特長とメリット

GCP Pub/Sub とのデータ統合は、以下のような特長と利点を提供します。

- **堅牢なメッセージングサービス**：EMQX と GCP Pub/Sub は共に高可用性とスケーラビリティを備え、大規模なメッセージストリームの確実な受信、配信、処理を保証します。IoT データの順序付け、メッセージの品質保証、パーシステンスをサポートし、信頼性の高いメッセージ伝送と処理を実現します。
- **柔軟なルールエンジン**：組み込みのルールエンジンにより、特定の送信元メッセージやイベントをトピックマッチングに基づいて処理できます。データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの操作が可能です。これを GCP Pub/Sub と組み合わせることで、さらなる処理や分析が可能となります。
- **豊富なコンテキスト情報**：GCP Pub/Sub データ統合を通じて、メッセージにより豊かなコンテキスト情報を付加できます。クライアント属性の Pub/Sub 属性へのマッピング、ソートキーの設定などが可能で、後続のアプリケーション開発やデータ処理においてより精密な分析と処理を支援します。

まとめると、EMQX Platform と GCP Pub/Sub の統合により、高信頼性かつスケーラブルなメッセージ配信が可能となり、データ分析や統合のための豊富なツールとサービスを活用できます。これにより、堅牢な IoT アプリケーションの構築とイベント駆動型の柔軟なビジネスロジックの実装が可能です。

## はじめる前に

ここでは、GCP Pub/Sub データ統合を作成する前に必要な準備について説明します。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワークの設定

<!--@include: ./network-setting.md-->

### GCP でサービスアカウントキーを作成する

GCP Pub/Sub サービスを利用するために、サービスアカウントとサービスアカウントキーを作成する必要があります。

1. GCP アカウントで[サービスアカウント](https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount)を作成します。サービスアカウントには対象トピックに対する **Pub/Sub Editor** 権限を付与してください。

2. 作成したサービスアカウントのメールアドレスをクリックし、[キー] タブを開きます。[キーを追加] のドロップダウンリストから **新しいキーを作成** を選択し、そのアカウント用のサービスアカウントキーを JSON 形式で作成・ダウンロードします。

### GCP でトピックの作成と管理

EMQX で GCP Pub/Sub データ統合を設定する前に、トピックを作成し、GCP における基本的な管理操作に慣れておく必要があります。

1. Google Cloud コンソールで、**Pub/Sub -> トピック** ページに移動します。詳細な手順は [トピックの作成と管理](https://cloud.google.com/pubsub/docs/create-topic) を参照してください。

    ::: tip
    サービスアカウントには、そのトピックへのパブリッシュ権限が必要です。
    :::

2. **トピック ID** フィールドにトピックの ID を入力し、**トピックを作成** をクリックします。

3. **サブスクリプション** ページに移動し、リスト内の作成したトピックの **トピック ID** をクリックします。トピックに対するサブスクリプションを作成します。

   - **配信タイプ** で Pull を選択
   - **メッセージ保持期間** に 7 日を選択

   詳細は [GCP Pub/Sub サブスクリプション](https://cloud.google.com/pubsub/docs/subscriber) をご覧ください。

4. **サブスクリプション ID -> メッセージ -> Pull** をクリックすると、トピックに送信されたメッセージを確認できます。

## コネクターの作成

データ統合ルールを作成する前に、Google PubSub コネクターを作成してサーバーにアクセスできるようにします。

1. デプロイメントに移動し、左側のナビゲーションメニューから **データ統合** をクリックします。
2. 初めてコネクターを作成する場合は、**データ転送** カテゴリの下にある **Google PubSub** を選択します。既にコネクターを作成している場合は、**新規コネクター** を選択し、続けて **データ転送** カテゴリの下の **Google PubSub** を選択します。
3. **新規コネクター** ページで以下のオプションを設定します。
   - **コネクター名**：システムが自動的にコネクター名を生成します。
   - **GCP サービスアカウント認証情報**：GCP で作成・エクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
   - その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。
4. **テスト** ボタンをクリックします。Google PubSub サービスにアクセス可能であれば、成功メッセージが表示されます。
5. **新規作成** ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータを GCP PubSub に転送するアクションをルールに追加します。

1. ルールエリアで **新規ルール** をクリックするか、作成したコネクターの **アクション** 列にある新規ルールアイコンをクリックします。

2. **SQL エディター** にルールマッチング用の SQL 文を入力します。以下の例では、`temp_hum/emqx` トピックに送信されたメッセージから、報告時刻 `up_timestamp`、クライアント ID、メッセージ本文（ペイロード）を読み取り、温度と湿度を抽出しています。

   ```sql
    SELECT 
    timestamp as up_timestamp, 
    clientid as client_id, 
    payload.temp as temp,
    payload.hum as hum
    FROM
    "temp_hum/emqx"
   ```

   **テストを有効化** を使ってデータ入力をシミュレーションし、結果をテストできます。

3. **次へ** をクリックしてアクションを追加します。

4. **コネクター** ドロップダウンから先ほど作成したコネクターを選択します。

5. 以下の情報を設定します。

   - **アクション名**：システムが自動生成するか、任意に命名できます。

   - **GCP PubSub トピック**：GCP で作成したトピック ID（例：xxx）を入力します。
   
   - **ペイロードテンプレート**：空欄のままにするかテンプレートを定義します。
     - 空欄の場合、MQTT メッセージの可視入力（clientid、topic、payload など）を JSON 形式で全てエンコードします。
     - 定義したテンプレートを使用する場合、`${variable_name}` の形式のプレースホルダーは MQTT コンテキストの対応値で置換されます。例えば `${topic}` は MQTT メッセージのトピックが `my/topic` なら `my/topic` に置換されます。

    本例では、以下の GCP Pub/Sub トピックとメッセージテンプレートを利用します。

   ```sql
        # GCP Pub/Sub メッセージテンプレート
    {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
   ```

   - **属性テンプレートおよびオーダーキーのテンプレート**（任意）：同様に、送信メッセージの属性やオーダーキーのフォーマット用テンプレートを定義できます。
     - 属性では、キーと値の両方に `${variable_name}` 形式のプレースホルダーを使用可能で、MQTT コンテキストから値が抽出されます。キーのテンプレートが空文字列に解決された場合、そのキーは GCP PubSub 送信メッセージから省略されます。
     - オーダーキーでは `${variable_name}` 形式のプレースホルダーを使用可能で、解決結果が空文字列の場合は GCP PubSub 送信メッセージの orderingKey フィールドは設定されません。
   - 詳細設定（任意）：その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。

6. **確認** ボタンをクリックしてルール作成を完了します。

7. **新規ルール作成成功** ポップアップで **ルールに戻る** をクリックし、データ統合設定の一連の作業を完了します。

## ルールのテスト

温度と湿度データの報告をシミュレーションするために、[MQTTX](https://mqttx.app/) の使用を推奨しますが、他の任意のクライアントでも構いません。

1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック: `temp_hum/emqx`

   - ペイロード:

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. GCP Pub/Sub の **サブスクリプション** ページに移動し、**MESSAGES** タブをクリックすると、送信されたメッセージを確認できます。

3. コンソールで運用データを確認します。ルール一覧でルール ID をクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報が表示されます。
