Skip to content

HStreamDBへのMQTTデータストリーミング

HStreamDBは、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQX PlatformとHStreamDBの連携により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現するとともに、標準SQLやマテリアライズドビューを用いたデータストリームのリアルタイム処理、監視、分析が可能になります。

本ページでは、EMQX PlatformとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。

動作概要

HStreamDBデータ統合は、EMQX Platformのデバイス接続およびメッセージ伝送機能と、HStreamDBの堅牢なデータ保存・処理機能を組み合わせたEMQX Platformの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングおよび処理プロセスが簡素化されています。

以下の図は、EMQXとHStreamDB間の典型的なデータ統合アーキテクチャを示しています。

EMQX Platform Integration HStreamDB

EMQX Platformはルールエンジンと設定されたSinkを通じてMQTTデータをHStreamDBに転送し、全体の流れは以下の通りです。

  1. メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルで正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンを用いて、特定のトピックからのMQTTメッセージをトピックマッチングに基づき処理します。ルールエンジンは対応するルールをマッチングし、データ形式変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などの処理を行います。
  3. HStreamDBへのデータストリーミング:ルールがトリガーされると、メッセージをHStreamDBへ転送するアクションが実行されます。HStreamDBのストリーム名、パーティションキー、レコードを簡単に設定でき、後続のデータ処理や分析を容易にします。

MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

  • 特定のMQTTメッセージ受信時にHStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
  • HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常検知や特定イベントパターンの検出を行い、アラート通知や対応アクションを実行。
  • 複数のMQTTトピックからのデータを統合し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。

特長とメリット

HStreamDBとのデータ統合により、以下の特長と利点が得られます。

  • 信頼性の高いIoTデータメッセージ配信:EMQX PlatformはMQTTメッセージをバッチで確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を可能にします。
  • MQTTメッセージの変換:ルールエンジンを活用し、EMQX PlatformはMQTTメッセージの抽出、フィルタリング、付加情報の追加、変換を行い、HStreamDBに送信します。
  • 大規模データストリームの保存:HStreamDBは数百万のデータストリームを分散型でフォールトトレラントなログストレージクラスターに信頼性高く保存可能です。必要に応じてリアルタイムのデータストリーム更新を再生またはプッシュできます。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータ収集、伝送、保存を実現します。
  • クラスターとスケーラビリティ:クラウドネイティブアーキテクチャで構築されたEMQXとHStreamDBはオンラインスケールやクラスターの動的な拡張・縮小をサポートし、増大するビジネスニーズに柔軟に対応可能です。
  • 柔軟な処理能力:HStreamDBでは馴染みのあるSQLを使って複数のデータストリームをフィルタリング、変換、集計、結合できます。標準SQLやマテリアライズドビューを用いたリアルタイム処理、監視、分析をサポートし、リアルタイムのデータインサイトを提供します。
  • 高スループットシナリオでの処理能力:HStreamDBデータ統合は同期・非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。

はじめる前に

本節では、HStreamDBデータ統合の作成前に必要な準備として、HStreamDBサービスの起動方法やストリームの作成方法を説明します。

以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を解説しています。Dockerがインストールされていること、可能であればDocker Compose v2を使用していることを確認してください。その他のHStreamDBおよびHStreamDB Platformのインストール方法は、Quickstart with Docker-ComposeおよびGetting Started with HStream Platformを参照してください。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

HStreamDB TCPサービスの起動とストリーム作成

本節では、ローカルのDocker環境で単一ノードのHStreamDB TCPサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

注意

HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。

  1. 以下の内容でdocker-compose-tcp.yamlファイルを作成してください。< server ip >はサーバーのIPアドレスに置き換えてください。

    docker-compose-tcp.yaml
    yaml
    version: '3.9'
    
    services:
      hserver:
        image: hstreamdb/hstream:v0.17.0
        container_name: quickstart-tcp-hserver
        depends_on:
          - zookeeper
          - hstore
        ports:
          - '127.0.0.1:6570:6570'
        expose:
          - 6570
        networks:
          - quickstart-tcp
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6570 \
            --internal-port 6571 \
            --server-id 100 \
            --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --store-log-level warning \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tcp
    
      hstore:
        image: hstreamdb/hstream:v0.17.0
        container_name: quickstart-tcp-hstore
        networks:
          - quickstart-tcp
        volumes:
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -ex
            # N.B. "enable-dscp-reflection=false" is required for linux kernel which
            # doesn't support dscp reflection, e.g. centos7.
            /usr/local/bin/ld-dev-cluster --root /data/store \
            --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
            --user-admin-port 6440 \
            --param enable-dscp-reflection=false \
            --no-interactive
    
      zookeeper:
        image: zookeeper:3.8.1
        container_name: quickstart-tcp-zk
        expose:
          - 2181
        networks:
          - quickstart-tcp
        volumes:
          - data_zk_data:/data
          - data_zk_datalog:/datalog
    
    networks:
      quickstart-tcp:
        name: quickstart-tcp
    
    volumes:
      data_store:
        name: quickstart_tcp_data_store
      data_zk_data:
        name: quickstart_tcp_data_zk_data
      data_zk_datalog:
        name: quickstart_tcp_data_zk_datalog
  2. 以下のシェルコマンドを実行し、HStreamDB TCPサービスを起動します。

    bash
    docker compose -f docker-compose-tcp.yaml up --build
  3. HStreamDBコンテナに入って、temp_humという名前のストリームを作成します。

    TIP

    HStreamDBの対話型SQL CLIを使ってストリームを作成することも可能です。hstream --helphstreamコマンドの使用方法を確認してください。

    HStreamDBコンテナに入ってストリームを作成するコマンド例
    bash
     $ docker container exec -it quickstart-tcp-hserver bash
     # ストリーム `temp_hum` を作成
     root@ed6a64e65ac0:/# hstream stream create temp_hum
     +-------------+---------+----------------+-------------+
     | Stream Name | Replica | Retention Time | Shard Count |
     +-------------+---------+----------------+-------------+
     | temp_hum    | 1       | 604800 seconds | 1           |
     +-------------+---------+----------------+-------------+
     # ストリーム一覧を表示
     root@ed6a64e65ac0:/# hstream stream list
     +-------------+---------+----------------+-------------+
     | Stream Name | Replica | Retention Time | Shard Count |
     +-------------+---------+----------------+-------------+
     | temp_hum    | 1       | 604800 seconds | 1           |
     +-------------+---------+----------------+-------------+

HStreamDB TLSサービスの起動とストリーム作成

本節では、ローカルDocker環境で二ノードのHStreamDB TLSサービスを起動し、HStreamDBにストリームを作成する方法を説明します。

注意

HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。

Dockerネットワーク環境と証明書ファイルについて

  • 現バージョンのHStreamでは、コンテナ間通信に影響を与えるため、http_proxyhttps_proxyall_proxyなどの環境変数をコンテナに設定しないでください。詳細はDocker Network Proxyを参照してください。
  • ルート証明書および自己署名証明書はsmallstep/step-caコンテナを使って自動生成されます。
  • 他の証明書要件がある場合は、証明書ファイルをHStreamDBコンテナにマウントするか、step-caの設定を参照してください。
    • step-caのデフォルト設定で生成される証明書は有効期限が1日です。有効期限を変更したい場合はcaディレクトリ内の証明書を削除し、step-ca設定オプションに従って有効期限を変更してください。
  1. 証明書を保存するためにtls-deploy/caディレクトリを作成します。

    bash
    mkdir tls-deploy/ca
    bash
    $ cd tls-deploy
    # "ca"ディレクトリに書き込み権限を付与
    $ sudo chmod 777 ca
  2. tls-deploy配下に以下の内容でdocker-compose-tls.yamlファイルを作成してください。< server ip >はサーバーのIPアドレスに置き換えてください。

    docker-compose-tls.yaml
    yaml
    version: '3.9'
    
    services:
      step-ca:
        image: smallstep/step-ca:0.23.0
        container_name: quickstart-tls-step-ca
        networks:
          - quickstart-tls
        volumes:
          - ${step_ca}:/home/step
        environment:
          - DOCKER_STEPCA_INIT_NAME=HStream
          - DOCKER_STEPCA_INIT_DNS_NAMES=step-ca
    
      generate-hstream-cert:
        image: smallstep/step-ca:0.23.0
        container_name: quickstart-tls-generate-hstream-cert
        depends_on:
          step-ca:
            condition: service_healthy
        networks:
          - quickstart-tls
        volumes:
          - ${step_ca}:/home/step
        command:
          - bash
          - '-c'
          - |
            sleep 1
            if [ -f hstream.crt ]; then exit 0; fi
            step ca certificate "hstream" hstream.crt hstream.key \
            --provisioner-password-file secrets/password --ca-url https://step-ca:9000 \
            --root certs/root_ca.crt \
            --san localhost \
            --san 127.0.0.1 \
            --san < server ip >
    
      hserver0:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-0
        depends_on:
          - generate-hstream-cert
          - zookeeper
          - hstore
        ports:
          - '6570:6570'
          - '6571:6571'
          - '26570:26570'
        expose:
          - 6571
          - 26570
        networks:
          - quickstart-tls
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
          - ${step_ca}:/data/server
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
            timeout=60; \
            until ( \
              [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for tls files ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6570 \
            --internal-port 6571 \
            --server-id 100 \
            --seed-nodes "hserver0:6571,hserver1:6573" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tls \
            --tls-cert-path /data/server/hstream.crt \
            --tls-key-path /data/server/hstream.key \
            --enable-tls \
            --advertised-listeners l1:hstream://< server ip >:26570 \
            --listeners-security-protocol-map l1:plaintext
    
            # NOTE:
            # advertised-listeners ip addr should same as container addr for tls listener
    
      hserver1:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-1
        depends_on:
          - generate-hstream-cert
          - zookeeper
          - hstore
        ports:
          - '6572:6572'
          - '6573:6573'
          - '26572:26572'
        expose:
          - 6573
          - 26572
        networks:
          - quickstart-tls
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - /tmp:/tmp
          - data_store:/data/store
          - ${step_ca}:/data/server
        command:
          - bash
          - '-c'
          - |
            set -e
            /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \
            timeout=60; \
            until ( \
              [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for tls files ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hstream-server \
            --bind-address 0.0.0.0 --port 6572 \
            --internal-port 6573 \
            --server-id 101 \
            --seed-nodes "hserver0:6571,hserver1:6573" \
            --advertised-address < server ip > \
            --metastore-uri zk://zookeeper:2181 \
            --store-config /data/store/logdevice.conf \
            --store-admin-host hstore --store-admin-port 6440 \
            --io-tasks-path /tmp/io/tasks \
            --io-tasks-network quickstart-tls \
            --tls-cert-path /data/server/hstream.crt \
            --tls-key-path /data/server/hstream.key \
            --enable-tls \
            --advertised-listeners l1:hstream://< server ip >:26572 \
            --listeners-security-protocol-map l1:plaintext
    
            # NOTE:
            # advertised-listeners ip addr should same as container addr for tls listener
    
      hserver-init:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hserver-init
        depends_on:
          - hserver0
          - hserver1
        networks:
          - quickstart-tls
        command:
          - bash
          - '-c'
          - |
            timeout=60
            until ( \
                /usr/local/bin/hadmin server --host hserver0 --port 26570 status && \
                /usr/local/bin/hadmin server --host hserver1 --port 26572 status \
            ) >/dev/null 2>&1; do
                >&2 echo 'Waiting for servers ...'
                sleep 1
                timeout=$$((timeout - 1))
                [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1;
            done; \
            /usr/local/bin/hadmin server --host hserver0 --port 26570 init
    
      hstore:
        image: hstreamdb/hstream:v0.19.3
        container_name: quickstart-tls-hstore
        networks:
          - quickstart-tls
        volumes:
          - data_store:/data/store
        command:
          - bash
          - '-c'
          - |
            set -ex
            /usr/local/bin/ld-dev-cluster --root /data/store \
            --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
            --user-admin-port 6440 \
            --no-interactive
    
      zookeeper:
        image: zookeeper:3.8.1
        container_name: quickstart-tls-zk
        expose:
          - 2181
        networks:
          - quickstart-tls
        volumes:
          - data_zk_data:/data
          - data_zk_datalog:/datalog
    
    networks:
      quickstart-tls:
        name: quickstart-tls
    
    volumes:
      data_store:
        name: quickstart_tls_data_store
      data_zk_data:
        name: quickstart_tls_data_zk_data
      data_zk_datalog:
        name: quickstart_tls_data_zk_datalog

ディレクトリ構成は以下のようになります。

bash
$ tree tls-deploy
tls-deploy
├── ca
└── docker-compose-tls.yaml

2 directories, 1 file
  1. tls-deployディレクトリに移動し、以下のシェルコマンドでHStreamDB TLSサービスを起動します。

    bash
    # 同一フォルダで起動
    env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up --build
    
    # バックグラウンド起動
    env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up -d --build
  2. HStreamDBコンテナに入り、temp_humという名前のストリームを作成します。

    TLS接続コマンドオプション

    HStreamDB TCPサービスと同様に、コマンドラインに--tls-ca [CA_PATH]オプションを追加するだけです。quickstart-tls-hserver-1ノードでコマンドを実行する場合は、docker-composeファイルで指定したポートと一致させるために--port 6572オプションも指定してください。

    HStreamDBコンテナに入ってストリームを作成するコマンド例
    bash
    $ docker container exec -it quickstart-tls-hserver-0 bash
    # ストリーム `temp_hum` を作成
    root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream create temp_hum
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+
    # ストリーム一覧を表示
    root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream list
    +-------------+---------+----------------+-------------+
    | Stream Name | Replica | Retention Time | Shard Count |
    +-------------+---------+----------------+-------------+
    | temp_hum    | 1       | 604800 seconds | 1           |
    +-------------+---------+----------------+-------------+

HStreamDBコネクターの作成

データ統合ルールを作成する前に、まずHStreamDBサーバーにアクセスするためのHStreamDBコネクターを作成する必要があります。

  1. デプロイメント画面に移動し、左ナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からHStreamDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリの中からHStreamDBを選択します。

  3. 接続情報を入力します。

    • サーバーURL:サーバーのIPアドレスとポート
    • gRPCタイムアウト:gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間(秒)。デフォルトは30秒です。
    • 必要に応じてTLS接続を有効にするためのトグルスイッチをクリックします。TLSを有効にした場合はTLS検証を無効にしてください。tls-deploy/caディレクトリで生成された証明書とキーをアップロードします。
      • ca/hstream.crtTLS証明書にアップロード
      • ca/hstream.keyTLSキーにアップロード
      • ca/certs/root_ca.crtCA証明書にアップロード
  4. 詳細設定(任意)

  5. テストボタンをクリックし、HStreamDBサービスにアクセス可能であれば成功メッセージが表示されます。

  6. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理したデータをHStreamDBに転送するアクションをルールに追加するためのルールを作成します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、temp_hum/emqxトピックからメッセージの報告時刻up_timestamp、クライアントID、ペイロードを読み取り、温度と湿度を取得しています。

    sql
    SELECT
      timestamp as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"

    TIP

    初心者の方はSQL例をクリックし、テストを有効化してSQLルールの学習とテストが可能です。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. メッセージを特定のトピックに転送するためのHStreamレコードテンプレートを以下のテンプレートで設定します。

    bash
    {
      "up_timestamp": ${up_timestamp},
      "client_id": ${client_id},
      "temp": ${temp},
      "hum": ${hum}
    }
  6. 詳細設定(任意)

  7. 確定ボタンをクリックしてルール作成を完了します。

  8. 新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温度と湿度のデータ報告をシミュレートするには、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。

  1. MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. メッセージがKafkaに転送されているか確認します。

    bash
     # Stream `temp_hum`の読み取りを中断するにはControl-Cを押します
     root@7f963b999883:/# hstream stream read-stream temp_hum
     timestamp: "1711442849073", id: 2241614080977213-21474836481-0, key: "", record: {"up_timestamp": 1711442848921, "client_id": mqttx_3f5a2868, "temp": 27.5, "hum": 41.8}
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を確認できます。