# HStreamDBへのMQTTデータストリーミング

[HStreamDB](https://hstream.io/)は、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQX PlatformとHStreamDBの連携により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現するとともに、標準SQLやマテリアライズドビューを用いたデータストリームのリアルタイム処理、監視、分析が可能になります。

本ページでは、EMQX PlatformとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。

## 動作概要

HStreamDBデータ統合は、EMQX Platformのデバイス接続およびメッセージ伝送機能と、HStreamDBの堅牢なデータ保存・処理機能を組み合わせたEMQX Platformの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングおよび処理プロセスが簡素化されています。

以下の図は、EMQXとHStreamDB間の典型的なデータ統合アーキテクチャを示しています。

![EMQX Platform Integration HStreamDB](./_assets/data_integration_hstreamdb.png)

EMQX Platformはルールエンジンと設定されたSinkを通じてMQTTデータをHStreamDBに転送し、全体の流れは以下の通りです。

1. **メッセージのパブリッシュと受信**：IoTデバイスはMQTTプロトコルで正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
2. **ルールエンジンによるメッセージ処理**：組み込みのルールエンジンを用いて、特定のトピックからのMQTTメッセージをトピックマッチングに基づき処理します。ルールエンジンは対応するルールをマッチングし、データ形式変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などの処理を行います。
3. **HStreamDBへのデータストリーミング**：ルールがトリガーされると、メッセージをHStreamDBへ転送するアクションが実行されます。HStreamDBのストリーム名、パーティションキー、レコードを簡単に設定でき、後続のデータ処理や分析を容易にします。

MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

- 特定のMQTTメッセージ受信時にHStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
- HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常検知や特定イベントパターンの検出を行い、アラート通知や対応アクションを実行。
- 複数のMQTTトピックからのデータを統合し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。

## 特長とメリット

HStreamDBとのデータ統合により、以下の特長と利点が得られます。

- **信頼性の高いIoTデータメッセージ配信**：EMQX PlatformはMQTTメッセージをバッチで確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を可能にします。
- **MQTTメッセージの変換**：ルールエンジンを活用し、EMQX PlatformはMQTTメッセージの抽出、フィルタリング、付加情報の追加、変換を行い、HStreamDBに送信します。
- **大規模データストリームの保存**：HStreamDBは数百万のデータストリームを分散型でフォールトトレラントなログストレージクラスターに信頼性高く保存可能です。必要に応じてリアルタイムのデータストリーム更新を再生またはプッシュできます。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータ収集、伝送、保存を実現します。
- **クラスターとスケーラビリティ**：クラウドネイティブアーキテクチャで構築されたEMQXとHStreamDBはオンラインスケールやクラスターの動的な拡張・縮小をサポートし、増大するビジネスニーズに柔軟に対応可能です。
- **柔軟な処理能力**：HStreamDBでは馴染みのあるSQLを使って複数のデータストリームをフィルタリング、変換、集計、結合できます。標準SQLやマテリアライズドビューを用いたリアルタイム処理、監視、分析をサポートし、リアルタイムのデータインサイトを提供します。
- **高スループットシナリオでの処理能力**：HStreamDBデータ統合は同期・非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。

## はじめる前に

本節では、HStreamDBデータ統合の作成前に必要な準備として、HStreamDBサービスの起動方法やストリームの作成方法を説明します。

以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を解説しています。Dockerがインストールされていること、可能であればDocker Compose v2を使用していることを確認してください。その他のHStreamDBおよびHStreamDB Platformのインストール方法は、[Quickstart with Docker-Compose](https://docs.hstream.io/start/quickstart-with-docker.html)および[Getting Started with HStream Platform](https://docs.hstream.io/start/try-out-hstream-platform.html)を参照してください。

### 前提条件

- [ルール](./rules.md)の理解
- [データ統合](./introduction.md)の理解

### ネットワーク設定

<!--@include: ./network-setting.md-->

### HStreamDB TCPサービスの起動とストリーム作成

本節では、ローカルのDocker環境で単一ノードのHStreamDB TCPサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

::: tip 注意

HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。

:::

1. 以下の内容で`docker-compose-tcp.yaml`ファイルを作成してください。`< server ip >`はサーバーのIPアドレスに置き換えてください。

   <details>
   <summary><code>docker-compose-tcp.yaml</code></summary>

   ```yaml
   version: '3.9'

   services:
     hserver:
       image: hstreamdb/hstream:v0.17.0
       container_name: quickstart-tcp-hserver
       depends_on:
         - zookeeper
         - hstore
       ports:
         - '127.0.0.1:6570:6570'
       expose:
         - 6570
       networks:
         - quickstart-tcp
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock
         - /tmp:/tmp
         - data_store:/data/store
       command:
         - bash
         - '-c'
         - |
           set -e
           /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
           /usr/local/bin/hstream-server \
           --bind-address 0.0.0.0 --port 6570 \
           --internal-port 6571 \
           --server-id 100 \
           --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
           --advertised-address < server ip > \
           --metastore-uri zk://zookeeper:2181 \
           --store-config /data/store/logdevice.conf \
           --store-admin-host hstore --store-admin-port 6440 \
           --store-log-level warning \
           --io-tasks-path /tmp/io/tasks \
           --io-tasks-network quickstart-tcp

     hstore:
       image: hstreamdb/hstream:v0.17.0
       container_name: quickstart-tcp-hstore
       networks:
         - quickstart-tcp
       volumes:
         - data_store:/data/store
       command:
         - bash
         - '-c'
         - |
           set -ex
           # N.B. "enable-dscp-reflection=false" is required for linux kernel which
           # doesn't support dscp reflection, e.g. centos7.
           /usr/local/bin/ld-dev-cluster --root /data/store \
           --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
           --user-admin-port 6440 \
           --param enable-dscp-reflection=false \
           --no-interactive

     zookeeper:
       image: zookeeper:3.8.1
       container_name: quickstart-tcp-zk
       expose:
         - 2181
       networks:
         - quickstart-tcp
       volumes:
         - data_zk_data:/data
         - data_zk_datalog:/datalog

   networks:
     quickstart-tcp:
       name: quickstart-tcp

   volumes:
     data_store:
       name: quickstart_tcp_data_store
     data_zk_data:
       name: quickstart_tcp_data_zk_data
     data_zk_datalog:
       name: quickstart_tcp_data_zk_datalog
   ```

   </details>

2. 以下のシェルコマンドを実行し、HStreamDB TCPサービスを起動します。

   ```bash
   docker compose -f docker-compose-tcp.yaml up --build
   ```

3. HStreamDBコンテナに入って、`temp_hum`という名前のストリームを作成します。

   ::: tip

   HStreamDBの対話型SQL CLIを使ってストリームを作成することも可能です。`hstream --help`で`hstream`コマンドの使用方法を確認してください。

   :::

   <details>
   <summary><b>HStreamDBコンテナに入ってストリームを作成するコマンド例</b></summary>

   ```bash
    $ docker container exec -it quickstart-tcp-hserver bash
    # ストリーム `temp_hum` を作成
    root@ed6a64e65ac0:/# hstream stream create temp_hum
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+
    # ストリーム一覧を表示
    root@ed6a64e65ac0:/# hstream stream list
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+
   ```

   </details>

### HStreamDB TLSサービスの起動とストリーム作成

本節では、ローカルDocker環境で二ノードのHStreamDB TLSサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

::: tip 注意

HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。

:::

::: tip Dockerネットワーク環境と証明書ファイルについて

- 現バージョンのHStreamでは、コンテナ間通信に影響を与えるため、`http_proxy`、`https_proxy`、`all_proxy`などの環境変数をコンテナに設定しないでください。詳細は[_Docker Network Proxy_](https://docs.docker.com/network/proxy/)を参照してください。
- ルート証明書および自己署名証明書は[_smallstep/step-ca_](https://hub.docker.com/r/smallstep/step-ca)コンテナを使って自動生成されます。
- 他の証明書要件がある場合は、証明書ファイルをHStreamDBコンテナにマウントするか、[_step-caの設定_](https://smallstep.com/docs/step-ca/configuration/index.html)を参照してください。
  - step-caのデフォルト設定で生成される証明書は有効期限が1日です。有効期限を変更したい場合は`ca`ディレクトリ内の証明書を削除し、[_step-ca設定オプション_](https://smallstep.com/docs/step-ca/configuration/#configuration-options)に従って有効期限を変更してください。

:::

1. 証明書を保存するために`tls-deploy/ca`ディレクトリを作成します。

   ```bash
   mkdir tls-deploy/ca
   ```

   ```bash
   $ cd tls-deploy
   # "ca"ディレクトリに書き込み権限を付与
   $ sudo chmod 777 ca
   ```

2. `tls-deploy`配下に以下の内容で`docker-compose-tls.yaml`ファイルを作成してください。`< server ip >`はサーバーのIPアドレスに置き換えてください。

   <details>
   <summary><code>docker-compose-tls.yaml</code></summary>

   ```yaml
   version: '3.9'
   
   services:
     step-ca:
       image: smallstep/step-ca:0.23.0
       container_name: quickstart-tls-step-ca
       networks:
         - quickstart-tls
       volumes:
         - ${step_ca}:/home/step
       environment:
         - DOCKER_STEPCA_INIT_NAME=HStream
         - DOCKER_STEPCA_INIT_DNS_NAMES=step-ca
   
     generate-hstream-cert:
       image: smallstep/step-ca:0.23.0
       container_name: quickstart-tls-generate-hstream-cert
       depends_on:
         step-ca:
           condition: service_healthy
       networks:
         - quickstart-tls
       volumes:
         - ${step_ca}:/home/step
       command:
         - bash
         - '-c'
         - |
           sleep 1
           if [ -f hstream.crt ]; then exit 0; fi
           step ca certificate "hstream" hstream.crt hstream.key \
           --provisioner-password-file secrets/password --ca-url https://step-ca:9000 \
           --root certs/root_ca.crt \
           --san localhost \
           --san 127.0.0.1 \
           --san < server ip >
   
     hserver0:
       image: hstreamdb/hstream:v0.19.3
       container_name: quickstart-tls-hserver-0
       depends_on:
         - generate-hstream-cert
         - zookeeper
         - hstore
       ports:
         - '6570:6570'
         - '6571:6571'
         - '26570:26570'
       expose:
         - 6571
         - 26570
       networks:
         - quickstart-tls
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock
         - /tmp:/tmp
         - data_store:/data/store
         - ${step_ca}:/data/server
       command:
         - bash
         - '-c'
         - |
           set -e
           /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
           timeout=60; \
           until ( \
             [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
           ) >/dev/null 2>&1; do
               >&2 echo 'Waiting for tls files ...'
               sleep 1
               timeout=$$((timeout - 1))
               [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
           done; \
           /usr/local/bin/hstream-server \
           --bind-address 0.0.0.0 --port 6570 \
           --internal-port 6571 \
           --server-id 100 \
           --seed-nodes "hserver0:6571,hserver1:6573" \
           --advertised-address < server ip > \
           --metastore-uri zk://zookeeper:2181 \
           --store-config /data/store/logdevice.conf \
           --store-admin-host hstore --store-admin-port 6440 \
           --io-tasks-path /tmp/io/tasks \
           --io-tasks-network quickstart-tls \
           --tls-cert-path /data/server/hstream.crt \
           --tls-key-path /data/server/hstream.key \
           --enable-tls \
           --advertised-listeners l1:hstream://< server ip >:26570 \
           --listeners-security-protocol-map l1:plaintext
   
           # NOTE:
           # advertised-listeners ip addr should same as container addr for tls listener
   
     hserver1:
       image: hstreamdb/hstream:v0.19.3
       container_name: quickstart-tls-hserver-1
       depends_on:
         - generate-hstream-cert
         - zookeeper
         - hstore
       ports:
         - '6572:6572'
         - '6573:6573'
         - '26572:26572'
       expose:
         - 6573
         - 26572
       networks:
         - quickstart-tls
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock
         - /tmp:/tmp
         - data_store:/data/store
         - ${step_ca}:/data/server
       command:
         - bash
         - '-c'
         - |
           set -e
           /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
           timeout=60; \
           until ( \
             [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
           ) >/dev/null 2>&1; do
               >&2 echo 'Waiting for tls files ...'
               sleep 1
               timeout=$$((timeout - 1))
               [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
           done; \
           /usr/local/bin/hstream-server \
           --bind-address 0.0.0.0 --port 6572 \
           --internal-port 6573 \
           --server-id 101 \
           --seed-nodes "hserver0:6571,hserver1:6573" \
           --advertised-address < server ip > \
           --metastore-uri zk://zookeeper:2181 \
           --store-config /data/store/logdevice.conf \
           --store-admin-host hstore --store-admin-port 6440 \
           --io-tasks-path /tmp/io/tasks \
           --io-tasks-network quickstart-tls \
           --tls-cert-path /data/server/hstream.crt \
           --tls-key-path /data/server/hstream.key \
           --enable-tls \
           --advertised-listeners l1:hstream://< server ip >:26572 \
           --listeners-security-protocol-map l1:plaintext
   
           # NOTE:
           # advertised-listeners ip addr should same as container addr for tls listener
   
     hserver-init:
       image: hstreamdb/hstream:v0.19.3
       container_name: quickstart-tls-hserver-init
       depends_on:
         - hserver0
         - hserver1
       networks:
         - quickstart-tls
       command:
         - bash
         - '-c'
         - |
           timeout=60
           until ( \
               /usr/local/bin/hadmin server --host hserver0 --port 26570 status && \
               /usr/local/bin/hadmin server --host hserver1 --port 26572 status \
           ) >/dev/null 2>&1; do
               >&2 echo 'Waiting for servers ...'
               sleep 1
               timeout=$$((timeout - 1))
               [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
           done; \
           /usr/local/bin/hadmin server --host hserver0 --port 26570 init
   
     hstore:
       image: hstreamdb/hstream:v0.19.3
       container_name: quickstart-tls-hstore
       networks:
         - quickstart-tls
       volumes:
         - data_store:/data/store
       command:
         - bash
         - '-c'
         - |
           set -ex
           /usr/local/bin/ld-dev-cluster --root /data/store \
           --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
           --user-admin-port 6440 \
           --no-interactive
   
     zookeeper:
       image: zookeeper:3.8.1
       container_name: quickstart-tls-zk
       expose:
         - 2181
       networks:
         - quickstart-tls
       volumes:
         - data_zk_data:/data
         - data_zk_datalog:/datalog
   
   networks:
     quickstart-tls:
       name: quickstart-tls
   
   volumes:
     data_store:
       name: quickstart_tls_data_store
     data_zk_data:
       name: quickstart_tls_data_zk_data
     data_zk_datalog:
       name: quickstart_tls_data_zk_datalog
   ```

   </details>

ディレクトリ構成は以下のようになります。

```bash
$ tree tls-deploy
tls-deploy
├── ca
└── docker-compose-tls.yaml

2 directories, 1 file
```

3. `tls-deploy`ディレクトリに移動し、以下のシェルコマンドでHStreamDB TLSサービスを起動します。

   ```bash
   # 同一フォルダで起動
   env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up --build

   # バックグラウンド起動
   env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up -d --build
   ```

4. HStreamDBコンテナに入り、`temp_hum`という名前のストリームを作成します。

   ::: tip TLS接続コマンドオプション

   HStreamDB TCPサービスと同様に、コマンドラインに`--tls-ca [CA_PATH]`オプションを追加するだけです。`quickstart-tls-hserver-1`ノードでコマンドを実行する場合は、docker-composeファイルで指定したポートと一致させるために`--port 6572`オプションも指定してください。

   :::

   <details>
   <summary><b>HStreamDBコンテナに入ってストリームを作成するコマンド例</b></summary>

   ```bash
   $ docker container exec -it quickstart-tls-hserver-0 bash
   # ストリーム `temp_hum` を作成
   root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream create temp_hum
   +-------------+---------+----------------+-------------+
   | Stream Name | Replica | Retention Time | Shard Count |
   +-------------+---------+----------------+-------------+
   | temp_hum    | 1       | 604800 seconds | 1           |
   +-------------+---------+----------------+-------------+
   # ストリーム一覧を表示
   root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream list
   +-------------+---------+----------------+-------------+
   | Stream Name | Replica | Retention Time | Shard Count |
   +-------------+---------+----------------+-------------+
   | temp_hum    | 1       | 604800 seconds | 1           |
   +-------------+---------+----------------+-------------+
   ```

   </details>

## HStreamDBコネクターの作成

データ統合ルールを作成する前に、まずHStreamDBサーバーにアクセスするためのHStreamDBコネクターを作成する必要があります。

1. デプロイメント画面に移動し、左ナビゲーションメニューから**データ統合**をクリックします。

2. 初めてコネクターを作成する場合は、**データ永続化**カテゴリの中から**HStreamDB**を選択します。すでにコネクターを作成済みの場合は、**新規コネクター**を選択し、続けて**データ永続化**カテゴリの中から**HStreamDB**を選択します。

3. 接続情報を入力します。

   - **サーバーURL**：サーバーのIPアドレスとポート
   - **gRPCタイムアウト**：gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間（秒）。デフォルトは`30`秒です。
   - 必要に応じてTLS接続を有効にするためのトグルスイッチをクリックします。TLSを有効にした場合は**TLS検証**を無効にしてください。`tls-deploy/ca`ディレクトリで生成された証明書とキーをアップロードします。
     - `ca/hstream.crt`を**TLS証明書**にアップロード
     - `ca/hstream.key`を**TLSキー**にアップロード
     - `ca/certs/root_ca.crt`を**CA証明書**にアップロード

4. 詳細設定（任意）

5. **テスト**ボタンをクリックし、HStreamDBサービスにアクセス可能であれば成功メッセージが表示されます。

6. **新規作成**ボタンをクリックして作成を完了します。

## ルールの作成

次に、書き込むデータを指定し、処理したデータをHStreamDBに転送するアクションをルールに追加するためのルールを作成します。

1. ルールエリアで**新規ルール**をクリックするか、作成したコネクターの**アクション**列にある新規ルールアイコンをクリックします。

2. **SQLエディター**にルールのマッチングSQL文を入力します。以下のルールでは、`temp_hum/emqx`トピックからメッセージの報告時刻`up_timestamp`、クライアントID、ペイロードを読み取り、温度と湿度を取得しています。

   ```sql
   SELECT
     timestamp as up_timestamp,
     clientid as client_id,
     payload.temp as temp,
     payload.hum as hum
   FROM
     "temp_hum/emqx"
   ```

   ::: tip

   初心者の方は**SQL例**をクリックし、**テストを有効化**してSQLルールの学習とテストが可能です。

   :::

3. **次へ**をクリックしてアクションを追加します。

4. **コネクター**のドロップダウンから先ほど作成したコネクターを選択します。

5. メッセージを特定のトピックに転送するための**HStreamレコードテンプレート**を以下のテンプレートで設定します。

   ```bash
   {
     "up_timestamp": ${up_timestamp},
     "client_id": ${client_id},
     "temp": ${temp},
     "hum": ${hum}
   }
   ```

6. 詳細設定（任意）

7. **確定**ボタンをクリックしてルール作成を完了します。

8. **新規ルール作成成功**のポップアップで**ルールに戻る**をクリックし、データ統合の設定チェーンを完了します。

## ルールのテスト

温度と湿度のデータ報告をシミュレートするには、[MQTTX](https://mqttx.app/)の使用を推奨しますが、他のクライアントでも問題ありません。

1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

   - トピック：`temp_hum/emqx`

   - ペイロード：

     ```json
     {
       "temp": "27.5",
       "hum": "41.8"
     }
     ```

2. メッセージがKafkaに転送されているか確認します。

   ```bash
    # Stream `temp_hum`の読み取りを中断するにはControl-Cを押します
    root@7f963b999883:/# hstream stream read-stream temp_hum
    timestamp: "1711442849073", id: 2241614080977213-21474836481-0, key: "", record: {"up_timestamp": 1711442848921, "client_id": mqttx_3f5a2868, "temp": 27.5, "hum": 41.8}
   ```

3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を確認できます。
