HStreamDBへのMQTTデータストリーミング
HStreamDBは、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQX CloudとHStreamDBの統合により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現するとともに、標準SQLやマテリアライズドビューを用いたリアルタイムのデータストリーム処理、監視、分析が可能になります。
本ページでは、EMQX CloudとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を提供します。
動作の仕組み
HStreamDBデータ統合は、EMQX Cloudのデバイス接続およびメッセージ伝送機能と、HStreamDBの堅牢なデータ保存および処理機能を組み合わせたEMQX Cloudの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理のプロセスが簡素化されています。
以下の図は、EMQXとHStreamDB間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloudはルールエンジンと設定済みのSinkを介してMQTTデータをHStreamDBに転送し、全体の処理は以下の通りです:
- メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルを通じて正常に接続し、特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Cloudはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンを用いて、特定のソースからのMQTTメッセージをトピックマッチングに基づき処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などのメッセージ処理を行います。
- HStreamDBへのデータストリーミング:ルールがトリガーされると、メッセージをHStreamDBに転送するアクションが実行されます。ここでデータはHStreamDBのストリーム名、パーティションキー、レコードに簡単に設定でき、後続のデータ処理や分析を容易にします。
MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:
- 特定のMQTTメッセージを受信した際に、HStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
- HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知や対応アクションを実行。
- 複数のMQTTトピックからのデータを一元化し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。
特長と利点
HStreamDBとのデータ統合は、以下の特長と利点をビジネスにもたらします:
- 信頼性の高いIoTデータメッセージ配信:EMQX CloudはMQTTメッセージをバッチで確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を可能にします。
- MQTTメッセージの変換:ルールエンジンを利用して、EMQX CloudはMQTTメッセージのフィルタリングや変換を行えます。メッセージはHStreamDBに送信される前にデータ抽出、フィルタリング、付加情報の追加、変換が可能です。
- 大規模データストリーム保存:HStreamDBは特別に設計された分散型のフォールトトレラントなログストレージクラスターで、数百万のデータストリームを信頼性高く保存できます。必要に応じてリアルタイムのデータストリーム更新をアプリケーションにリプレイまたはプッシュ可能です。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータの収集、伝送、保存を実現します。
- クラスターとスケーラビリティ:クラウドネイティブアーキテクチャで構築されたEMQXとHStreamDBはオンラインスケーリングとクラスターの動的な拡張・縮小をサポートし、増大するビジネスニーズに柔軟に対応可能です。
- 柔軟な処理能力:HStreamDBでは、馴染みのあるSQLを用いて複数のデータストリームのフィルタリング、変換、集計、結合が行えます。標準SQLやマテリアライズドビューを使ったリアルタイム処理、監視、分析もサポートし、リアルタイムのデータインサイトを提供します。
- 高スループットシナリオでの処理能力:HStreamDBデータ統合は同期・非同期の両書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整可能です。
はじめる前に
このセクションでは、HStreamDBデータ統合の作成を開始する前に必要な準備、具体的にはHStreamDBサービスの起動方法とストリームの作成方法について説明します。
以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を説明します。Dockerがインストールされており、可能であればDocker Compose v2を利用してください。その他のHStreamDBおよびHStreamDBプラットフォームのインストール方法については、Quickstart with Docker-ComposeおよびGetting Started with HStream Platformを参照してください。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
HStreamDB TCPサービスの起動とストリーム作成
このセクションでは、ローカルのDocker環境で単一ノードのHStreamDB TCPサービスを起動し、HStreamDBにストリームを作成する方法を説明します。
注意
HStreamDBリソースが接続状態になった後、ストリームの削除や再作成などの操作を行う場合は、HStreamDBへの再接続が必要となり、つまりHStreamDBリソースの再起動が必要です。
以下の内容で
docker-compose-tcp.yamlファイルを作成します。< server ip >はサーバーのIPアドレスに置き換えてください。docker-compose-tcp.yamlyamlversion: '3.9' services: hserver: image: hstreamdb/hstream:v0.17.0 container_name: quickstart-tcp-hserver depends_on: - zookeeper - hstore ports: - '127.0.0.1:6570:6570' expose: - 6570 networks: - quickstart-tcp volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6570 \ --internal-port 6571 \ --server-id 100 \ --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --store-log-level warning \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tcp hstore: image: hstreamdb/hstream:v0.17.0 container_name: quickstart-tcp-hstore networks: - quickstart-tcp volumes: - data_store:/data/store command: - bash - '-c' - | set -ex # N.B. "enable-dscp-reflection=false" is required for linux kernel which # doesn't support dscp reflection, e.g. centos7. /usr/local/bin/ld-dev-cluster --root /data/store \ --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ --user-admin-port 6440 \ --param enable-dscp-reflection=false \ --no-interactive zookeeper: image: zookeeper:3.8.1 container_name: quickstart-tcp-zk expose: - 2181 networks: - quickstart-tcp volumes: - data_zk_data:/data - data_zk_datalog:/datalog networks: quickstart-tcp: name: quickstart-tcp volumes: data_store: name: quickstart_tcp_data_store data_zk_data: name: quickstart_tcp_data_zk_data data_zk_datalog: name: quickstart_tcp_data_zk_datalog以下のシェルコマンドを実行してHStreamDB TCPサービスを起動します。
bashdocker compose -f docker-compose-tcp.yaml up --buildHStreamコンテナに入って、
temp_humという名前のストリームを作成します。TIP
HStreamDBのインタラクティブSQL CLIを使ってストリームを作成することも可能です。
hstream --helpでhstreamコマンドの使い方を確認してください。HStreamDBコンテナに入ってストリームを作成するコマンド
bash$ docker container exec -it quickstart-tcp-hserver bash # Stream `temp_hum`を作成 root@ed6a64e65ac0:/# hstream stream create temp_hum +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+ # すべてのストリームを一覧表示 root@ed6a64e65ac0:/# hstream stream list +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+
HStreamDB TLSサービスの起動とストリーム作成
このセクションでは、ローカルのDocker環境でデュアルノードのHStreamDB TLSサービスを起動し、HStreamDBにストリームを作成する方法を説明します。
注意
HStreamDBリソースが接続状態になった後、ストリームの削除や再作成などの操作を行う場合は、HStreamDBへの再接続が必要となり、つまりHStreamDBリソースの再起動が必要です。
Dockerネットワーク環境と証明書ファイルについて
- 現バージョンのHStreamでは、コンテナ間通信に影響を与える可能性があるため、
http_proxy、https_proxy、all_proxyなどの環境変数をコンテナに設定しないでください。詳細はDocker Network Proxyを参照してください。 - ルート証明書および自己署名証明書はsmallstep/step-caコンテナを使って自動生成されます。
- 他の証明書要件がある場合は、証明書ファイルをHStreamDBコンテナにマウントするか、Configuring step-caを参照してください。
- step-caでデフォルト設定により生成された証明書は有効期限が1日です。有効期限を変更したい場合は
caディレクトリ内の証明書を削除し、step-ca-configuration-optionsに従って設定を変更してください。
- step-caでデフォルト設定により生成された証明書は有効期限が1日です。有効期限を変更したい場合は
証明書を保存するために
tls-deploy/caディレクトリを作成します。bashmkdir tls-deploy/cabash$ cd tls-deploy # "ca"ディレクトリに書き込み権限を付与 $ sudo chmod 777 catls-deploy配下に以下の内容でdocker-compose-tcp.yamlファイルを作成します。< server ip >はサーバーのIPアドレスに置き換えてください。docker-compose-tls.yamlyamlversion: '3.9' services: step-ca: image: smallstep/step-ca:0.23.0 container_name: quickstart-tls-step-ca networks: - quickstart-tls volumes: - ${step_ca}:/home/step environment: - DOCKER_STEPCA_INIT_NAME=HStream - DOCKER_STEPCA_INIT_DNS_NAMES=step-ca generate-hstream-cert: image: smallstep/step-ca:0.23.0 container_name: quickstart-tls-generate-hstream-cert depends_on: step-ca: condition: service_healthy networks: - quickstart-tls volumes: - ${step_ca}:/home/step command: - bash - '-c' - | sleep 1 if [ -f hstream.crt ]; then exit 0; fi step ca certificate "hstream" hstream.crt hstream.key \ --provisioner-password-file secrets/password --ca-url https://step-ca:9000 \ --root certs/root_ca.crt \ --san localhost \ --san 127.0.0.1 \ --san < server ip > hserver0: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-0 depends_on: - generate-hstream-cert - zookeeper - hstore ports: - '6570:6570' - '6571:6571' - '26570:26570' expose: - 6571 - 26570 networks: - quickstart-tls volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store - ${step_ca}:/data/server command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \ timeout=60; \ until ( \ [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for tls files ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6570 \ --internal-port 6571 \ --server-id 100 \ --seed-nodes "hserver0:6571,hserver1:6573" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tls \ --tls-cert-path /data/server/hstream.crt \ --tls-key-path /data/server/hstream.key \ --enable-tls \ --advertised-listeners l1:hstream://< server ip >:26570 \ --listeners-security-protocol-map l1:plaintext # NOTE: # advertised-listeners ip addr should same as container addr for tls listener hserver1: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-1 depends_on: - generate-hstream-cert - zookeeper - hstore ports: - '6572:6572' - '6573:6573' - '26572:26572' expose: - 6573 - 26572 networks: - quickstart-tls volumes: - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp - data_store:/data/store - ${step_ca}:/data/server command: - bash - '-c' - | set -e /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600; \ timeout=60; \ until ( \ [ -f /data/server/hstream.crt ] && [ -f /data/server/hstream.key ] \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for tls files ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hstream-server \ --bind-address 0.0.0.0 --port 6572 \ --internal-port 6573 \ --server-id 101 \ --seed-nodes "hserver0:6571,hserver1:6573" \ --advertised-address < server ip > \ --metastore-uri zk://zookeeper:2181 \ --store-config /data/store/logdevice.conf \ --store-admin-host hstore --store-admin-port 6440 \ --io-tasks-path /tmp/io/tasks \ --io-tasks-network quickstart-tls \ --tls-cert-path /data/server/hstream.crt \ --tls-key-path /data/server/hstream.key \ --enable-tls \ --advertised-listeners l1:hstream://< server ip >:26572 \ --listeners-security-protocol-map l1:plaintext # NOTE: # advertised-listeners ip addr should same as container addr for tls listener hserver-init: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hserver-init depends_on: - hserver0 - hserver1 networks: - quickstart-tls command: - bash - '-c' - | timeout=60 until ( \ /usr/local/bin/hadmin server --host hserver0 --port 26570 status && \ /usr/local/bin/hadmin server --host hserver1 --port 26572 status \ ) >/dev/null 2>&1; do >&2 echo 'Waiting for servers ...' sleep 1 timeout=$$((timeout - 1)) [ $$timeout -le 0 ] && echo 'Timeout!' && exit 1; done; \ /usr/local/bin/hadmin server --host hserver0 --port 26570 init hstore: image: hstreamdb/hstream:v0.19.3 container_name: quickstart-tls-hstore networks: - quickstart-tls volumes: - data_store:/data/store command: - bash - '-c' - | set -ex /usr/local/bin/ld-dev-cluster --root /data/store \ --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ --user-admin-port 6440 \ --no-interactive zookeeper: image: zookeeper:3.8.1 container_name: quickstart-tls-zk expose: - 2181 networks: - quickstart-tls volumes: - data_zk_data:/data - data_zk_datalog:/datalog networks: quickstart-tls: name: quickstart-tls volumes: data_store: name: quickstart_tls_data_store data_zk_data: name: quickstart_tls_data_zk_data data_zk_datalog: name: quickstart_tls_data_zk_datalog
ディレクトリ構成は以下のようになります:
$ tree tls-deploy
tls-deploy
├── ca
└── docker-compose-tls.yaml
2 directories, 1 filetls-deployディレクトリに入り、以下のシェルコマンドを実行してHStreamDB TLSサービスを起動します。bash# 同じフォルダで起動 env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up --build # バックグラウンド起動 env step_ca=$PWD/ca docker compose -f docker-compose-tls.yaml up -d --buildHStreamDBコンテナに入り、
temp_humという名前のストリームを作成します。TLS接続コマンドオプションについて
TCPサービスと同様に、コマンドラインに
--tls-ca [CA_PATH]オプションを追加するだけで接続可能です。quickstart-tls-hserver-1ノードでコマンドを実行する場合は、docker-composeファイルで指定したポートと一致させるために--port 6572オプションも指定してください。HStreamDBコンテナに入ってストリームを作成するコマンド
bash$ docker container exec -it quickstart-tls-hserver-0 bash # Stream `temp_hum`を作成 root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream create temp_hum +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+ # すべてのストリームを一覧表示 root@9aa62aef0910:/# hstream --tls-ca /data/server/certs/root_ca.crt stream list +-------------+---------+----------------+-------------+ | Stream Name | Replica | Retention Time | Shard Count | +-------------+---------+----------------+-------------+ | temp_hum | 1 | 604800 seconds | 1 | +-------------+---------+----------------+-------------+
HStreamDBコネクターの作成
データ統合ルールを作成する前に、まずHStreamDBサーバーにアクセスするためのHStreamDBコネクターを作成する必要があります。
ご自身のデプロイメントにアクセスし、左側ナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの下にあるHStreamDBを選択します。すでにコネクターを作成済みの場合は、新規コネクターを選択し、続いてデータ永続化カテゴリの下のHStreamDBを選択します。
接続情報を入力します:
- サーバーURL:サーバーのIPアドレスとポート。
- gRPCタイムアウト:gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間を秒単位で指定します。デフォルトは
30秒です。 - 必要に応じてTLS接続を有効にするためのトグルスイッチをクリックします。TLSを有効にした場合はTLS検証を無効にしてください。
tls-deploy/caディレクトリで生成された証明書とキーをアップロードします:ca/hstream.crtをTLS証明書にアップロード。ca/hstream.keyをTLSキーにアップロード。ca/certs/root_ca.crtをCA証明書にアップロード。
詳細設定(任意)。
テストボタンをクリックします。HStreamDBサービスにアクセス可能であれば成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込み対象のデータを指定し、処理済みデータをHStreamDBに転送するためのアクションをルールに追加する必要があります。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
SQLエディターにルールのマッチングSQL文を入力します。以下のルールでは、
temp_hum/emqxトピックからメッセージの報告時刻up_timestamp、クライアントID、ペイロードを読み取り、温度と湿度を取得しています。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"TIP
初心者の方は、SQL例をクリックしてTry It OutでSQLルールを学習・テストしてください。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
メッセージを特定トピックに転送するためのHStreamレコードテンプレートを以下のテンプレートで設定します。
bash{ "up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum} }詳細設定(任意)。
確定ボタンをクリックしてルールの作成を完了します。
新規ルール成功のポップアップでルールに戻るをクリックし、データ統合の設定チェーンを完了します。
ルールのテスト
温度・湿度データの報告をシミュレートするためにMQTTXの使用を推奨しますが、他の任意のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqxペイロード:
json{ "temp": "27.5", "hum": "41.8" }
メッセージがKafkaに転送されているか確認します。
bash# Stream `temp_hum`の読み取り後、Control-Cで停止 root@7f963b999883:/# hstream stream read-stream temp_hum timestamp: "1711442849073", id: 2241614080977213-21474836481-0, key: "", record: {"up_timestamp": 1711442848921, "client_id": mqttx_3f5a2868, "temp": 27.5, "hum": 41.8}コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルール下のすべてのアクションの統計情報が表示されます。