# SnowflakeへのMQTTデータ取り込み（ストリーミングモード）

::: tip 注意

Snowflakeストリーミングデータ統合は、EMQXバージョン6.1.2以降の専用版または専用Flex版で利用可能です。

:::

[Snowflake](https://www.snowflake.com/)は、データウェアハウジング、分析、セキュアなデータ共有のためのクラウドデータプラットフォームです。EMQX Cloudは、Snowflakeストリーミングシンクを通じてMQTTメッセージをSnowflakeに書き込むことができます。このシンクはSnowflakeストリーミングコネクターを使用してSnowpipeストリーミングAPIを呼び出すため、MQTTデータを低レイテンシでSnowflakeテーブルに取り込むことが可能です。

本ページでは、EMQX CloudでSnowflakeストリーミングデータ統合を作成する方法を説明します。例として、MQTTトピック `t/#` からのメッセージを、パイプ `testdatabase.public.emqxstreaming` を介してSnowflakeテーブル `testdatabase.public.emqx` に書き込みます。

## 動作概要

Snowflakeストリーミング統合は、EMQXブローカーのルールエンジンを使ってMQTTメッセージを選択・変換し、そのルールの出力をSnowflakeストリーミングシンク経由でSnowflakeに送信します。

データフローは以下の通りです：

```text
MQTTクライアント -> EMQX Cloud -> ルール -> Snowflakeストリーミングシンク -> パイプ -> テーブル
```

1. MQTTクライアントが `t/1`、`t/device001`、`t/test` などのトピックにメッセージをパブリッシュします。
2. ルールが `t/#` のメッセージにマッチし、Snowflakeに書き込むフィールドを選択します。
3. Snowflakeストリーミングシンクが選択されたフィールドをSnowflakeのパイプに送信します。
4. Snowflakeがストリームされたレコードをターゲットテーブルにロードします。
5. Snowflakeでテーブルをクエリし、取り込まれたMQTTデータを確認できます。

## はじめる前に

### 前提条件

以下を理解していることを確認してください：

- [データ統合](./introduction.md)
- [ルール](./rules.md)
- Snowflakeのデータベース、スキーマ、テーブル、パイプ、ユーザー、ロール、およびキーペア認証

### ネットワークアクセスの設定

SnowflakeストリーミングコネクターはHTTPS経由でSnowflakeに接続します。Snowflakeアカウントの公開方法に応じてネットワークを設定してください：

- プライベートSnowflake URLを使用する場合は、EMQX CloudとSnowflake間でVPCピアリングなどのプライベートネットワーク接続を作成してください。
- パブリックSnowflake URLを使用する場合は、デプロイメントがパブリックネットワークにアクセスできることを確認してください。必要に応じて[NATゲートウェイ](../vas/nat-gateway.md)を有効にしてください。

### Snowflakeオブジェクトの準備

Snowflakeでターゲットのデータベース、スキーマ、テーブル、ストリーミングパイプを作成します。以下のSQLは本例で使用するオブジェクトを作成します：

```sql
CREATE DATABASE IF NOT EXISTS testdatabase;

CREATE SCHEMA IF NOT EXISTS testdatabase.public;

CREATE TABLE IF NOT EXISTS testdatabase.public.emqx (
  clientid STRING,
  topic STRING,
  payload STRING,
  publish_received_at TIMESTAMP_LTZ
);

CREATE PIPE IF NOT EXISTS testdatabase.public.emqxstreaming AS
COPY INTO testdatabase.public.emqx (
  clientid,
  topic,
  payload,
  publish_received_at
)
FROM (
  SELECT
    $1:clientid::STRING,
    $1:topic::STRING,
    $1:payload::STRING,
    $1:publish_received_at::TIMESTAMP_LTZ
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
```

### Snowflakeユーザーの作成と権限付与

コネクターはキーペア認証でSnowflakeに認証します。パイプ用のユーザーを作成し、ロールを割り当て、パイプの操作とテーブルへの書き込みに必要な権限を付与してください。

1. RSAキーペアを生成します。秘密鍵はEMQX Cloudコネクター用に保持し、公開鍵はSnowflakeに登録します。

   ```bash
   openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.private.pem -nocrypt
   openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
   ```

   - EMQX Cloudコネクターは秘密鍵でJWTに署名し、安全かつ検証可能なIDトークンとして使用します。
   - Snowflakeは公開鍵でトークンの署名を検証します。

   詳細は[キーペア認証とキーペアローテーション](https://docs.snowflake.com/en/user-guide/key-pair-auth)を参照してください。

2. パイプユーザーが使用するSnowflakeロールを作成します。

   ```sql
   CREATE ROLE IF NOT EXISTS snowpipe;
   ```

3. 公開鍵を割り当てたSnowflakeユーザーを作成します。`<PUBLIC_KEY_CONTENT>`は公開鍵の内容で、`-----BEGIN PUBLIC KEY-----`と`-----END PUBLIC KEY-----`の行は除いてください。

   ```sql
   CREATE USER IF NOT EXISTS snowpipeuser
     RSA_PUBLIC_KEY = '<PUBLIC_KEY_CONTENT>';
   ```

4. ロールに権限を付与し、ユーザーにロールを割り当て、デフォルトロールに設定します。

   ```sql
   GRANT USAGE ON DATABASE testdatabase TO ROLE snowpipe;
   GRANT USAGE ON SCHEMA testdatabase.public TO ROLE snowpipe;
   GRANT INSERT, SELECT ON TABLE testdatabase.public.emqx TO ROLE snowpipe;
   GRANT OPERATE, MONITOR ON PIPE testdatabase.public.emqxstreaming TO ROLE snowpipe;
   GRANT ROLE snowpipe TO USER snowpipeuser;
   ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe;
   ```

EMQX Cloudで設定する値とオブジェクト名が一致していることを確認してください：

| Snowflakeオブジェクト | 値               |
| -------------------- | ---------------- |
| データベース         | `testdatabase`   |
| スキーマ             | `public`         |
| テーブル             | `emqx`           |
| パイプ               | `emqxstreaming`  |

## Snowflakeストリーミングコネクターの作成

ルールを作成する前に、EMQX CloudとSnowflakeアカウントを接続するSnowflakeストリーミングコネクターを作成します。

1. EMQX Cloudコンソールで、対象のデプロイメントにアクセスします。
2. 左側のナビゲーションメニューから**データ統合**をクリックします。
3. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**Snowflake Streaming**を選択します。既にコネクターがある場合は**新規コネクター**をクリックし、**Snowflake Streaming**を選択します。
4. **新規コネクター**ページで以下の項目を設定します：

   - **コネクター名**：自動生成された名前を使用します。
   - **サーバーホスト**：SnowflakeのエンドポイントURLを入力します。通常は `<Your Snowflake Organization ID>-<Your Snowflake Account Name>.snowflakecomputing.com` の形式です。`<Your Snowflake Organization ID>-<Your Snowflake Account Name>` はSnowflakeインスタンス固有のサブドメインに置き換えてください。
   - **アカウント**：Snowflakeの組織IDとアカウント名をハイフン（`-`）で区切って入力します。これはSnowflakeプラットフォームにアクセスするURLの一部で、Snowflakeコンソールで確認できます。
   - **パイプユーザー**：パイプを操作するSnowflakeユーザー名を入力します。例：`snowpipeuser`。ロールには少なくとも`OPERATE`と`MONITOR`権限が必要です。
   - **秘密鍵**：キーペア認証に使用するPEM形式のRSA秘密鍵を貼り付けます。
   - **秘密鍵パスワード**：秘密鍵が暗号化されている場合はパスワードを入力します。OpenSSLの`-nocrypt`オプションで生成した場合は空欄のままにしてください。
   - **プロキシ**：デフォルト値のままで問題ありません。SnowflakeにHTTPプロキシ経由でアクセスする必要がある場合のみ設定してください。
   - **TLSを有効化**：必ず有効にしてください。SnowflakeストリーミングはHTTPSを使用します。
   - **TLS検証**、**ミドルボックス互換モード**、**SNI**、**TLS証明書**、**TLSキー**：ネットワークや証明書ポリシーで必要な場合のみ設定してください。

5. **テスト**をクリックし、接続テストが成功したら**保存**をクリックします。

これでルールにSnowflakeストリーミングシンクを追加するときに、このコネクターを選択できます。

## ルールの作成

Snowflakeに書き込むMQTTメッセージのフィールドを選択するルールを作成します。

1. EMQX Cloudコンソールで**データ統合**にアクセスします。
2. 以下のいずれかの方法でルールを作成します：

   - **コネクター**セクションで、Snowflakeストリーミングコネクターの**アクション**列にある**新規ルール**アイコンをクリック。
   - **ルール**セクションで**+ 新規ルール**をクリック。

3. **SQLエディター**に以下のSQLを入力します：

   ```sql
   SELECT
     clientid,
     unix_ts_to_rfc3339(publish_received_at, 'millisecond') AS publish_received_at,
     topic,
     payload
   FROM
     "t/#"
   ```

   このルールはトピックが `t/#` にマッチするメッセージを監視します。テスト用に `t/1`、`t/device001`、`t/test` などのトピックにメッセージをパブリッシュしてください。

   ::: tip

   Snowflake統合では、選択するフィールド名と値はターゲットのSnowflakeパイプおよびテーブルが期待するカラムと一致させてください。不必要なフィールドは選択しないようにしてください。

   :::

4. ルール作成後、ページ下部の**次へ**をクリックして**新規アクション**に進みます。

## Snowflakeストリーミングシンクの追加

**新規アクション**ページで、ルールの出力をSnowflakeに書き込むSnowflakeストリーミングシンクを設定します。

1. アクションを設定します：

   - **コネクター**：作成したSnowflakeストリーミングコネクターを選択。
   - **アクションタイプ**：`Snowflake Streaming` が選択されています。
   - **アクション名**：自動生成された名前を使用するか任意の名前を入力。
   - **データベース名**：`testdatabase` を入力。
   - **スキーマ**：`public` を入力。
   - **パイプ**：`emqxstreaming` を入力。

2. **詳細設定**は接続やバッファリングの調整が必要な場合を除き、デフォルトのままにします。
3. **確定**をクリックしてルールとアクションを作成します。

## ルールのテスト

MQTTXなどのMQTTクライアントを使って、`t/#` にマッチするトピックにテストメッセージをパブリッシュします。

1. EMQX Cloudに以下のメッセージをパブリッシュします：

   - トピック：`t/1`
   - ペイロード：

     ```json
     {"msg":"hello snowflake"}
     ```

   MQTTX CLIの例：

   ```bash
   mqttx pub -i emqx_c -t t/1 -m '{"msg":"hello snowflake"}'
   ```

2. Snowflakeでターゲットテーブルをクエリします：

   ```sql
   SELECT
     clientid,
     topic,
     payload,
     publish_received_at
   FROM testdatabase.public.emqx
   ORDER BY publish_received_at DESC
   LIMIT 10;
   ```

クエリでテストメッセージが返されれば、統合は正常に動作しています：

```text
MQTT -> ルール -> Snowflakeストリーミングシンク -> パイプ -> テーブル
```
