HStreamDBへのMQTTデータストリーミング
HStreamDBは、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQX PlatformとHStreamDBの連携により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現するとともに、標準SQLやマテリアライズドビューを用いたデータストリームのリアルタイム処理、監視、分析が可能になります。
本ページでは、EMQX PlatformとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作概要
HStreamDBデータ統合は、EMQX Platformのデバイス接続およびメッセージ伝送機能と、HStreamDBの堅牢なデータ保存・処理機能を組み合わせたEMQX Platformの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングおよび処理プロセスが簡素化されています。
以下の図は、EMQXとHStreamDB間の典型的なデータ統合アーキテクチャを示しています。
EMQX Platformはルールエンジンと設定されたSinkを通じてMQTTデータをHStreamDBに転送し、全体の流れは以下の通りです。
- メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルで正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Platformはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンを用いて、特定のトピックからのMQTTメッセージをトピックマッチングに基づき処理します。ルールエンジンは対応するルールをマッチングし、データ形式変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などの処理を行います。
- HStreamDBへのデータストリーミング:ルールがトリガーされると、メッセージをHStreamDBへ転送するアクションが実行されます。HStreamDBのストリーム名、パーティションキー、レコードを簡単に設定でき、後続のデータ処理や分析を容易にします。
MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。
- 特定のMQTTメッセージ受信時にHStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
- HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常検知や特定イベントパターンの検出を行い、アラート通知や対応アクションを実行。
- 複数のMQTTトピックからのデータを統合し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。
特長とメリット
HStreamDBとのデータ統合により、以下の特長と利点が得られます。
- 信頼性の高いIoTデータメッセージ配信:EMQX PlatformはMQTTメッセージをバッチで確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を可能にします。
- MQTTメッセージの変換:ルールエンジンを活用し、EMQX PlatformはMQTTメッセージの抽出、フィルタリング、付加情報の追加、変換を行い、HStreamDBに送信します。
- 大規模データストリームの保存:HStreamDBは数百万のデータストリームを分散型でフォールトトレラントなログストレージクラスターに信頼性高く保存可能です。必要に応じてリアルタイムのデータストリーム更新を再生またはプッシュできます。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータ収集、伝送、保存を実現します。
- クラスターとスケーラビリティ:クラウドネイティブアーキテクチャで構築されたEMQXとHStreamDBはオンラインスケールやクラスターの動的な拡張・縮小をサポートし、増大するビジネスニーズに柔軟に対応可能です。
- 柔軟な処理能力:HStreamDBでは馴染みのあるSQLを使って複数のデータストリームをフィルタリング、変換、集計、結合できます。標準SQLやマテリアライズドビューを用いたリアルタイム処理、監視、分析をサポートし、リアルタイムのデータインサイトを提供します。
- 高スループットシナリオでの処理能力:HStreamDBデータ統合は同期・非同期の書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。
はじめる前に
本節では、HStreamDBデータ統合の作成前に必要な準備として、HStreamDBサービスの起動方法やストリームの作成方法を説明します。
以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を解説しています。Dockerがインストールされていること、可能であればDocker Compose v2を使用していることを確認してください。その他のHStreamDBおよびHStreamDB Platformのインストール方法は、Quickstart with Docker-ComposeおよびGetting Started with HStream Platformを参照してください。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
HStreamDB TCPサービスの起動とストリーム作成
本節では、ローカルのDocker環境で単一ノードのHStreamDB TCPサービスを起動し、HStreamDBにストリームを作成する方法を説明します。
注意
HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。
以下の内容で
docker-compose-tcp.yaml
ファイルを作成してください。< server ip >
はサーバーのIPアドレスに置き換えてください。docker-compose-tcp.yaml
yamlversion: '3.9' services: hserver: image: hstreamdb/hstream:v0.17.0 container_name: quickstart-tcp-hserver depends_on: - zookeeper - hstore ports: - '127.0.0.1:6570:6570' expose: - 6570 networks: - quickstart-tcp volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6570 \ --internal-port 6571 \ --server-id 100 \ --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --store-log-level warning \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tcp hstore: image: hstreamdb/hstream:v0.17.0 container_name: quickstart-tcp-hstore networks: - quickstart-tcp volumes: - data_store:/data/store command: - bash - '-c' - | set -ex # N.B. "enable-dscp-reflection=false" is required for linux kernel which # doesn't support dscp reflection, e.g. centos7. /usr/local/bin/ld-dev-cluster --root /data/store \ --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ --user-admin-port 6440 \ --param enable-dscp-reflection=false \ --no-interactive zookeeper: image: zookeeper:3.8.1 container_name: quickstart-tcp-zk expose: - 2181 networks: - quickstart-tcp volumes: - data_zk_data:/data - data_zk_datalog:/datalog networks: quickstart-tcp: name: quickstart-tcp volumes: data_store: name: quickstart_tcp_data_store data_zk_data: name: quickstart_tcp_data_zk_data data_zk_datalog: name: quickstart_tcp_data_zk_datalog
以下のシェルコマンドを実行し、HStreamDB TCPサービスを起動します。
bashdocker compose -f docker-compose-tcp.yaml up --build
HStreamDBコンテナに入って、
temp_hum
という名前のストリームを作成します。TIP
HStreamDBの対話型SQL CLIを使ってストリームを作成することも可能です。
hstream --help
でhstream
コマンドの使用方法を確認してください。HStreamDBコンテナに入ってストリームを作成するコマンド例
bash$ docker container exec -it quickstart-tcp-hserver bash # ストリーム `temp_hum` を作成 root@ed6a64e65ac0:/# hstream stream create temp_hum +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+ # ストリーム一覧を表示 root@ed6a64e65ac0:/# hstream stream list +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+
HStreamDB TLSサービスの起動とストリーム作成
本節では、ローカルDocker環境で二ノードのHStreamDB TLSサービスを起動し、HStreamDBにストリームを作成する方法を説明します。
注意
HStreamDBリソースが接続状態の場合、HStreamDB内のストリームに対して削除や再作成などの操作を行うと、HStreamDBへの再接続が必要になります。つまり、HStreamDBリソースの再起動が必要です。
Dockerネットワーク環境と証明書ファイルについて
- 現バージョンのHStreamでは、コンテナ間通信に影響を与えるため、
http_proxy
、https_proxy
、all_proxy
などの環境変数をコンテナに設定しないでください。詳細はDocker Network Proxyを参照してください。 - ルート証明書および自己署名証明書はsmallstep/step-caコンテナを使って自動生成されます。
- 他の証明書要件がある場合は、証明書ファイルをHStreamDBコンテナにマウントするか、step-caの設定を参照してください。
- step-caのデフォルト設定で生成される証明書は有効期限が1日です。有効期限を変更したい場合は
ca
ディレクトリ内の証明書を削除し、step-ca設定オプションに従って有効期限を変更してください。
- step-caのデフォルト設定で生成される証明書は有効期限が1日です。有効期限を変更したい場合は
証明書を保存するために
tls-deploy/ca
ディレクトリを作成します。bashmkdir tls-deploy/ca
bash$ cd tls-deploy # "ca"ディレクトリに書き込み権限を付与 $ sudo chmod 777 ca
tls-deploy
配下に以下の内容でdocker-compose-tls.yaml
ファイルを作成してください。< server ip >
はサーバーのIPアドレスに置き換えてください。docker-compose-tls.yaml
yamlversion: '3.9' services: step-ca: image: smallstep/step-ca:0.23.0 container_name: quickstart-tls-step-ca networks: - quickstart-tls volumes: - ${step_ca}:/home/step environment: - DOCKER_STEPCA_INIT_NAME=HStream - DOCKER_STEPCA_INIT_DNS_NAMES=step-ca generate-hstream-cert: image: smallstep/step-ca:0.23.0 container_name: quickstart-tls-generate-hstream-cert depends_on: step-ca: condition: service_healthy networks: - quickstart-tls volumes: - ${step_ca}:/home/step command: - bash - '-c' - | sleep 1 if [ -f hstream.crt ]; then exit 0; fi step ca certificate "hstream" hstream.crt hstream.key \ --provisioner-password-file secrets/password --ca-url https://step-ca:9000 \ --root certs/root_ca.crt \ --san localhost \ --san 127.0.0.1 \ --san < server ip > hserver0: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-0 depends_on: - generate-hstream-cert - zookeeper - hstore ports: - '6570:6570' - '6571:6571' - '26570:26570' expose: - 6571 - 26570 networks: - quickstart-tls volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store - ${step_ca}:/data/server command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \ timeout=60; \ until ( \ [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for tls files ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6570 \ --internal-port 6571 \ --server-id 100 \ --seed-nodes "hserver0:6571,hserver1:6573" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tls \ --tls-cert-path /data/server/hstream.crt \ --tls-key-path /data/server/hstream.key \ --enable-tls \ --advertised-listeners l1:hstream://< server ip >:26570 \ --listeners-security-protocol-map l1:plaintext # NOTE: # advertised-listeners ip addr should same as container addr for tls listener hserver1: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-1 depends_on: - generate-hstream-cert - zookeeper - hstore ports: - '6572:6572' - '6573:6573' - '26572:26572' expose: - 6573 - 26572 networks: - quickstart-tls volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store - ${step_ca}:/data/server command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \ timeout=60; \ until ( \ [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for tls files ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6572 \ --internal-port 6573 \ --server-id 101 \ --seed-nodes "hserver0:6571,hserver1:6573" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tls \ --tls-cert-path /data/server/hstream.crt \ --tls-key-path /data/server/hstream.key \ --enable-tls \ --advertised-listeners l1:hstream://< server ip >:26572 \ --listeners-security-protocol-map l1:plaintext # NOTE: # advertised-listeners ip addr should same as container addr for tls listener hserver-init: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-init depends_on: - hserver0 - hserver1 networks: - quickstart-tls command: - bash - '-c' - | timeout=60 until ( \ /usr/local/bin/hadmin server --host hserver0 --port 26570 status && \ /usr/local/bin/hadmin server --host hserver1 --port 26572 status \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for servers ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hadmin server --host hserver0 --port 26570 init hstore: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hstore networks: - quickstart-tls volumes: - data_store:/data/store command: - bash - '-c' - | set -ex /usr/local/bin/ld-dev-cluster --root /data/store \ --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ --user-admin-port 6440 \ --no-interactive zookeeper: image: zookeeper:3.8.1 container_name: quickstart-tls-zk expose: - 2181 networks: - quickstart-tls volumes: - data_zk_data:/data - data_zk_datalog:/datalog networks: quickstart-tls: name: quickstart-tls volumes: data_store: name: quickstart_tls_data_store data_zk_data: name: quickstart_tls_data_zk_data data_zk_datalog: name: quickstart_tls_data_zk_datalog
ディレクトリ構成は以下のようになります。
$ tree tls-deploy
tls-deploy
├── ca
└── docker-compose-tls.yaml
2 directories, 1 file
tls-deploy
ディレクトリに移動し、以下のシェルコマンドでHStreamDB TLSサービスを起動します。bash# 同一フォルダで起動 env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up --build # バックグラウンド起動 env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up -d --build
HStreamDBコンテナに入り、
temp_hum
という名前のストリームを作成します。TLS接続コマンドオプション
HStreamDB TCPサービスと同様に、コマンドラインに
--tls-ca [CA_PATH]
オプションを追加するだけです。quickstart-tls-hserver-1
ノードでコマンドを実行する場合は、docker-composeファイルで指定したポートと一致させるために--port 6572
オプションも指定してください。HStreamDBコンテナに入ってストリームを作成するコマンド例
bash$ docker container exec -it quickstart-tls-hserver-0 bash # ストリーム `temp_hum` を作成 root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream create temp_hum +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+ # ストリーム一覧を表示 root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream list +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+
HStreamDBコネクターの作成
データ統合ルールを作成する前に、まずHStreamDBサーバーにアクセスするためのHStreamDBコネクターを作成する必要があります。
デプロイメント画面に移動し、左ナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの中からHStreamDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリの中からHStreamDBを選択します。
接続情報を入力します。
- サーバーURL:サーバーのIPアドレスとポート
- gRPCタイムアウト:gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間(秒)。デフォルトは
30
秒です。 - 必要に応じてTLS接続を有効にするためのトグルスイッチをクリックします。TLSを有効にした場合はTLS検証を無効にしてください。
tls-deploy/ca
ディレクトリで生成された証明書とキーをアップロードします。ca/hstream.crt
をTLS証明書にアップロードca/hstream.key
をTLSキーにアップロードca/certs/root_ca.crt
をCA証明書にアップロード
詳細設定(任意)
テストボタンをクリックし、HStreamDBサービスにアクセス可能であれば成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定し、処理したデータをHStreamDBに転送するアクションをルールに追加するためのルールを作成します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、
temp_hum/emqx
トピックからメッセージの報告時刻up_timestamp
、クライアントID、ペイロードを読み取り、温度と湿度を取得しています。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
初心者の方はSQL例をクリックし、テストを有効化してSQLルールの学習とテストが可能です。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
メッセージを特定のトピックに転送するためのHStreamレコードテンプレートを以下のテンプレートで設定します。
bash{ "up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum} }
詳細設定(任意)
確定ボタンをクリックしてルール作成を完了します。
新規ルール作成成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度と湿度のデータ報告をシミュレートするには、MQTTXの使用を推奨しますが、他のクライアントでも問題ありません。
MQTTXでデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがKafkaに転送されているか確認します。
bash# Stream `temp_hum`の読み取りを中断するにはControl-Cを押します root@7f963b999883:/# hstream stream read-stream temp_hum timestamp: "1711442849073", id: 2241614080977213-21474836481-0, key: "", record: {"up_timestamp": 1711442848921, "client_id": mqttx_3f5a2868, "temp": 27.5, "hum": 41.8}
コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を確認できます。