Skip to content

HStreamDBへのMQTTデータストリーミング

HStreamDBは、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQX CloudとHStreamDBの統合により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現するとともに、標準SQLやマテリアライズドビューを用いたリアルタイムのデータストリーム処理、監視、分析が可能になります。

本ページでは、EMQX CloudとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を提供します。

動作の仕組み

HStreamDBデータ統合は、EMQX Cloudのデバイス接続およびメッセージ伝送機能と、HStreamDBの堅牢なデータ保存および処理機能を組み合わせたEMQX Cloudの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理のプロセスが簡素化されています。

以下の図は、EMQXとHStreamDB間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud Integration HStreamDB

EMQX Cloudはルールエンジンと設定済みのSinkを介してMQTTデータをHStreamDBに転送し、全体の処理は以下の通りです:

  1. メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを通じて正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンを用いて、特定のソースからのMQTTメッセージをトピックマッチングに基づき処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
  3. HStreamDBへのデータストリーミング:ルールがトリガーされると、メッセージをHStreamDBに転送するアクションが実行されます。ここでデータはHStreamDBのストリーム名、パーティションキー、レコードに簡単に設定でき、後続のデータ処理や分析を容易にします。

MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:

  • 特定のMQTTメッセージを受信した際に、HStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
  • HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知や対応アクションを実行。
  • 複数のMQTTトピックからのデータを一元化し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。

特長と利点

HStreamDBとのデータ統合は、以下の特長と利点をビジネスにもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQX CloudはMQTTメッセージをバッチで確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を可能にします。
  • MQTTメッセージの変換:ルールエンジンを利用して、EMQX CloudはMQTTメッセージのフィルタリングや変換を行えます。メッセージはHStreamDBに送信される前にデータ抽出、フィルタリング、付加情報の追加、変換が可能です。
  • 大規模データストリーム保存:HStreamDBは特別に設計された分散型のフォールトトレラントなログストレージクラスターで、数百万のデータストリームを信頼性高く保存できます。必要に応じてリアルタイムのデータストリーム更新をアプリケーションにリプレイまたはプッシュ可能です。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータの収集、伝送、保存を実現します。
  • クラスターとスケーラビリティ:クラウドネイティブアーキテクチャで構築されたEMQXとHStreamDBはオンラインスケーリングとクラスターの動的な拡張・縮小をサポートし、増大するビジネスニーズに柔軟に対応可能です。
  • 柔軟な処理能力:HStreamDBでは、馴染みのあるSQLを用いて複数のデータストリームのフィルタリング、変換、集計、結合が行えます。標準SQLやマテリアライズドビューを使ったリアルタイム処理、監視、分析もサポートし、リアルタイムのデータインサイトを提供します。
  • 高スループットシナリオでの処理能力:HStreamDBデータ統合は同期・非同期の両書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。

はじめる前に

このセクションでは、HStreamDBデータ統合の作成を開始する前に必要な準備、具体的にはHStreamDBサービスの起動方法とストリームの作成方法について説明します。

以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を説明します。Dockerがインストールされており、可能であればDocker Compose v2を利用してください。その他のHStreamDBおよびHStreamDBプラットフォームのインストール方法については、Quickstart with Docker-ComposeおよびGetting Started with HStream Platformを参照してください。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

HStreamDB TCPサービスの起動とストリーム作成

このセクションでは、ローカルのDocker環境で単一ノードのHStreamDB TCPサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

注意

HStreamDBリソースが接続状態になった後、ストリームの削除や再作成などの操作を行う場合は、HStreamDBへの再接続が必要となり、つまりHStreamDBリソースの再起動が必要です。

  1. 以下の内容でdocker-compose-tcp.yamlファイルを作成します。< server ip >はサーバーのIPアドレスに置き換えてください。

    docker-compose-tcp.yaml
    yaml
    version: '3.9'
    
    services:
      hserver:
        image: hstreamdb/hstream:v0.17.0
        container_name: quickstart-tcp-hserver
        depends_on:
          - zookeeper
          - hstore
        ports:
          - '127.0.0.1:6570:6570'
        expose:
          - 6570
        networks:
          - quickstart-tcp
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6570 \
            --internal-port 6571 \
            --server-id 100 \
            --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --store-log-level warning \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tcp
    
      hstore:
        image: hstreamdb/hstream:v0.17.0
        container_name: quickstart-tcp-hstore
        networks:
          - quickstart-tcp
        volumes:
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -ex
            # N.B. "enable-dscp-reflection=false" is required for linux kernel which
            # doesn't support dscp reflection, e.g. centos7.
            /usr/local/bin/ld-dev-cluster --root /data/store \
            --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
            --user-admin-port 6440 \
            --param enable-dscp-reflection=false \
            --no-interactive
    
      zookeeper:
        image: zookeeper:3.8.1
        container_name: quickstart-tcp-zk
        expose:
          - 2181
        networks:
          - quickstart-tcp
        volumes:
          - data_zk_data:/data
          - data_zk_datalog:/datalog
    
    networks:
      quickstart-tcp:
        name: quickstart-tcp
    
    volumes:
      data_store:
        name: quickstart_tcp_data_store
      data_zk_data:
        name: quickstart_tcp_data_zk_data
      data_zk_datalog:
        name: quickstart_tcp_data_zk_datalog
  2. 以下のシェルコマンドを実行してHStreamDB TCPサービスを起動します。

    bash
    docker compose -f docker-compose-tcp.yaml up --build
  3. HStreamコンテナに入って、temp_humという名前のストリームを作成します。

    TIP

    HStreamDBのインタラクティブSQL CLIを使ってストリームを作成することも可能です。hstream --helphstreamコマンドの使い方を確認してください。

    HStreamDBコンテナに入ってストリームを作成するコマンド
    bash
     $ docker container exec -it quickstart-tcp-hserver bash
     # Stream `temp_hum`を作成
     root@ed6a64e65ac0:/# hstream stream create temp_hum
     +-------------+---------+----------------+-------------+
     | Stream Name | Replica | Retention Time | Shard Count |
     +-------------+---------+----------------+-------------+
     | temp_hum    | 1       | 604800 seconds | 1           |
     +-------------+---------+----------------+-------------+
     # すべてのストリームを一覧表示
     root@ed6a64e65ac0:/# hstream stream list
     +-------------+---------+----------------+-------------+
     | Stream Name | Replica | Retention Time | Shard Count |
     +-------------+---------+----------------+-------------+
     | temp_hum    | 1       | 604800 seconds | 1           |
     +-------------+---------+----------------+-------------+

HStreamDB TLSサービスの起動とストリーム作成

このセクションでは、ローカルのDocker環境でデュアルノードのHStreamDB TLSサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

注意

HStreamDBリソースが接続状態になった後、ストリームの削除や再作成などの操作を行う場合は、HStreamDBへの再接続が必要となり、つまりHStreamDBリソースの再起動が必要です。

Dockerネットワーク環境と証明書ファイルについて

  • 現バージョンのHStreamでは、コンテナ間通信に影響を与える可能性があるため、http_proxyhttps_proxyall_proxyなどの環境変数をコンテナに設定しないでください。詳細はDocker Network Proxyを参照してください。
  • ルート証明書および自己署名証明書はsmallstep/step-caコンテナを使って自動生成されます。
  • 他の証明書要件がある場合は、証明書ファイルをHStreamDBコンテナにマウントするか、Configuring step-caを参照してください。
    • step-caでデフォルト設定により生成された証明書は有効期限が1日です。有効期限を変更したい場合はcaディレクトリ内の証明書を削除し、step-ca-configuration-optionsに従って設定を変更してください。
  1. 証明書を保存するためにtls-deploy/caディレクトリを作成します。

    bash
    mkdir tls-deploy/ca
    bash
    $ cd tls-deploy
    # "ca"ディレクトリに書き込み権限を付与
    $ sudo chmod 777 ca
  2. tls-deploy配下に以下の内容でdocker-compose-tcp.yamlファイルを作成します。< server ip >はサーバーのIPアドレスに置き換えてください。

    docker-compose-tls.yaml
    yaml
    version: '3.9'
    
    services:
      step-ca:
        image: smallstep/step-ca:0.23.0
        container_name: quickstart-tls-step-ca
        networks:
          - quickstart-tls
        volumes:
          - ${step_ca}:/home/step
        environment:
          - DOCKER_STEPCA_INIT_NAME=HStream
          - DOCKER_STEPCA_INIT_DNS_NAMES=step-ca
    
      generate-hstream-cert:
        image: smallstep/step-ca:0.23.0
        container_name: quickstart-tls-generate-hstream-cert
        depends_on:
          step-ca:
            condition: service_healthy
        networks:
          - quickstart-tls
        volumes:
          - ${step_ca}:/home/step
        command:
          - bash
          - '-c'
          - |
            sleep 1
            if [ -f hstream.crt ]; then exit 0; fi
            step ca certificate "hstream" hstream.crt hstream.key \
            --provisioner-password-file secrets/password --ca-url https://step-ca:9000 \
            --root certs/root_ca.crt \
            --san localhost \
            --san 127.0.0.1 \
            --san < server ip >
    
      hserver0:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-0
        depends_on:
          - generate-hstream-cert
          - zookeeper
          - hstore
        ports:
          - '6570:6570'
          - '6571:6571'
          - '26570:26570'
        expose:
          - 6571
          - 26570
        networks:
          - quickstart-tls
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
          - ${step_ca}:/data/server
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
            timeout=60; \
            until ( \
              [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for tls files ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6570 \
            --internal-port 6571 \
            --server-id 100 \
            --seed-nodes "hserver0:6571,hserver1:6573" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tls \
            --tls-cert-path /data/server/hstream.crt \
            --tls-key-path /data/server/hstream.key \
            --enable-tls \
            --advertised-listeners l1:hstream://< server ip >:26570 \
            --listeners-security-protocol-map l1:plaintext
    
            # NOTE:
            # advertised-listeners ip addr should same as container addr for tls listener
    
      hserver1:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-1
        depends_on:
          - generate-hstream-cert
          - zookeeper
          - hstore
        ports:
          - '6572:6572'
          - '6573:6573'
          - '26572:26572'
        expose:
          - 6573
          - 26572
        networks:
          - quickstart-tls
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
          - ${step_ca}:/data/server
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
            timeout=60; \
            until ( \
              [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for tls files ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6572 \
            --internal-port 6573 \
            --server-id 101 \
            --seed-nodes "hserver0:6571,hserver1:6573" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tls \
            --tls-cert-path /data/server/hstream.crt \
            --tls-key-path /data/server/hstream.key \
            --enable-tls \
            --advertised-listeners l1:hstream://< server ip >:26572 \
            --listeners-security-protocol-map l1:plaintext
    
            # NOTE:
            # advertised-listeners ip addr should same as container addr for tls listener
    
      hserver-init:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-init
        depends_on:
          - hserver0
          - hserver1
        networks:
          - quickstart-tls
        command:
          - bash
          - '-c'
          - |
            timeout=60
            until ( \
                /usr/local/bin/hadmin server --host hserver0 --port 26570 status && \
                /usr/local/bin/hadmin server --host hserver1 --port 26572 status \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for servers ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hadmin server --host hserver0 --port 26570 init
    
      hstore:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hstore
        networks:
          - quickstart-tls
        volumes:
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -ex
            /usr/local/bin/ld-dev-cluster --root /data/store \
            --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
            --user-admin-port 6440 \
            --no-interactive
    
      zookeeper:
        image: zookeeper:3.8.1
        container_name: quickstart-tls-zk
        expose:
          - 2181
        networks:
          - quickstart-tls
        volumes:
          - data_zk_data:/data
          - data_zk_datalog:/datalog
    
    networks:
      quickstart-tls:
        name: quickstart-tls
    
    volumes:
      data_store:
        name: quickstart_tls_data_store
      data_zk_data:
        name: quickstart_tls_data_zk_data
      data_zk_datalog:
        name: quickstart_tls_data_zk_datalog

ディレクトリ構成は以下のようになります:

bash
$ tree tls-deploy
tls-deploy
├── ca
└── docker-compose-tls.yaml

2 directories, 1 file
  1. tls-deployディレクトリに入り、以下のシェルコマンドを実行してHStreamDB TLSサービスを起動します。

    bash
    # 同じフォルダで起動
    env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up --build
    
    # バックグラウンド起動
    env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up -d --build
  2. HStreamDBコンテナに入り、temp_humという名前のストリームを作成します。

    TLS接続コマンドオプションについて

    TCPサービスと同様に、コマンドラインに--tls-ca [CA_PATH]オプションを追加するだけで接続可能です。quickstart-tls-hserver-1ノードでコマンドを実行する場合は、docker-composeファイルで指定したポートと一致させるために--port 6572オプションも指定してください。

    HStreamDBコンテナに入ってストリームを作成するコマンド
    bash
    $ docker container exec -it quickstart-tls-hserver-0 bash
    # Stream `temp_hum`を作成
    root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream create temp_hum
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+
    # すべてのストリームを一覧表示
    root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream list
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+

HStreamDBコネクターの作成

データ統合ルールを作成する前に、まずHStreamDBサーバーにアクセスするためのHStreamDBコネクターを作成する必要があります。

  1. ご自身のデプロイメントにアクセスし、左側ナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるHStreamDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリの下のHStreamDBを選択します。

  3. 接続情報を入力します:

    • サーバーURL:サーバーのIPアドレスとポート。
    • gRPCタイムアウト:gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間を秒単位で指定します。デフォルトは30秒です。
    • 必要に応じてTLS接続を有効にするためのトグルスイッチをクリックします。TLSを有効にした場合はTLS検証を無効にしてください。tls-deploy/caディレクトリで生成された証明書とキーをアップロードします:
      • ca/hstream.crtTLS証明書にアップロード。
      • ca/hstream.keyTLSキーにアップロード。
      • ca/certs/root_ca.crtCA証明書にアップロード。
  4. 詳細設定(任意)。

  5. テストボタンをクリックします。HStreamDBサービスにアクセス可能であれば成功メッセージが表示されます。

  6. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込み対象のデータを指定し、処理済みデータをHStreamDBに転送するためのアクションをルールに追加する必要があります。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、temp_hum/emqxトピックからメッセージの報告時刻up_timestamp、クライアントID、ペイロードを読み取り、温度と湿度を取得しています。

    sql
    SELECT
      timestamp as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックしてTry It OutでSQLルールを学習・テストしてください。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. メッセージを特定トピックに転送するためのHStreamレコードテンプレートを以下のテンプレートで設定します。

    bash
    {
      "up_timestamp": ${up_timestamp},
      "client_id": ${client_id},
      "temp": ${temp},
      "hum": ${hum}
    }
  6. 詳細設定(任意)。

  7. 確定ボタンをクリックしてルールの作成を完了します。

  8. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温度・湿度データの報告をシミュレートするためにMQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがKafkaに転送されているか確認します。

    bash
     # Stream `temp_hum`の読み取り後、Control-Cで停止
     root@7f963b999883:/# hstream stream read-stream temp_hum
     timestamp: "1711442849073", id: 2241614080977213-21474836481-0, key: "", record: {"up_timestamp": 1711442848921, "client_id": mqttx_3f5a2868, "temp": 27.5, "hum": 41.8}
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルール下のすべてのアクションの統計情報が表示されます。