Skip to content

HStreamDBへのMQTTデータストリーミング

HStreamDBは、リアルタイムのメッセージ、イベント、その他のデータストリームを効率的に取り込み、保存、処理、配信できるオープンソースのストリーミングデータプラットフォームです。EMQXとHStreamDBの統合により、MQTTメッセージやクライアントイベントをHStreamDBに保存でき、大規模なIoTデータの収集、伝送、保存を実現し、標準SQLやマテリアライズドビューを用いたデータストリームのリアルタイム処理、監視、分析が可能になります。

本ページでは、EMQXとHStreamDB間のデータ統合について包括的に紹介し、データ統合の作成と検証に関する実践的な手順を提供します。

TIP

HStreamDBデータ統合はEMQX 5.2.0以降でのみサポートされています。

動作の仕組み

HStreamDBデータ統合は、EMQXのデバイス接続およびメッセージ伝送機能とHStreamDBの堅牢なデータ保存・処理機能を組み合わせたEMQXの標準機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理が簡素化されています。

以下の図は、EMQXとHStreamDB間のデータ統合の典型的なアーキテクチャを示しています:

EMQX Integration HStreamDB

EMQXはルールエンジンと設定されたSinkを通じてMQTTデータをHStreamDBに転送し、処理の流れは以下の通りです:

  1. メッセージのパブリッシュと受信:IoTデバイスはMQTTプロトコルで正常に接続し、特定のトピックにテレメトリや状態データをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. ルールエンジンによるメッセージ処理:組み込みのルールエンジンを使い、特定のソースからのMQTTメッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールをマッチさせ、データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  3. HStreamDBへのデータストリーミング:ルールがトリガーされると、メッセージをHStreamDBに転送するアクションが実行されます。データはHStreamDBのストリーム名、パーティションキー、レコードに簡単に設定でき、その後のデータ処理や分析を容易にします。

MQTTメッセージデータがApache HStreamDBに書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です:

  • 特定のMQTTメッセージ受信時にHStreamDBのルールエンジンコンポーネントを使って対応するアクションやイベントをトリガーし、システム間やアプリケーション間のイベント駆動機能を実現。
  • HStreamDB内でMQTTデータストリームをリアルタイムに分析し、異常や特定のイベントパターンを検出してアラート通知や対応アクションを実行。
  • 複数のMQTTトピックからのデータを統合し、HStreamDBの計算機能を活用してリアルタイム集計、計算、分析を行い、より包括的なデータインサイトを獲得。

特徴と利点

HStreamDBとのデータ統合は、ビジネスに以下の特徴と利点をもたらします:

  • 信頼性の高いIoTデータメッセージ配信:EMQXはMQTTメッセージをバッチ処理で確実にHStreamDBに送信でき、IoTデバイスとHStreamDBおよびアプリケーションシステムの統合を実現します。
  • MQTTメッセージの変換:ルールエンジンを用いて、EMQXはMQTTメッセージの抽出、フィルタリング、付加、変換を行い、HStreamDBに送信します。
  • 大規模データストリーム保存:HStreamDBは分散型でフォールトトレラントなログストレージクラスターを備え、数百万のデータストリームを信頼性高く保存します。必要に応じてリアルタイムのデータストリーム更新をアプリケーションに再生またはプッシュ可能です。EMQXのメッセージモデルと完全に統合し、大規模なIoTデータ収集、伝送、保存を実現します。
  • クラスターとスケーラビリティ:クラウドネイティブアーキテクチャを採用し、EMQXとHStreamDBはオンラインスケーリングやクラスターの動的な拡張・縮小をサポートし、増大するビジネス需要に柔軟に対応します。
  • 柔軟な処理能力:HStreamDBではおなじみのSQLを使って複数のデータストリームのフィルタリング、変換、集約、結合が可能です。標準SQLやマテリアライズドビューを使ったリアルタイム処理、監視、分析をサポートし、リアルタイムのデータインサイトを提供します。
  • 高スループットシナリオでの処理能力:HStreamDBデータ統合は同期・非同期の両書き込みモードをサポートし、シナリオに応じてレイテンシとスループットのバランスを柔軟に調整できます。

はじめる前に

このセクションでは、HStreamDBデータ統合の作成を始める前に必要な準備、HStreamDBサービスの起動方法とストリームの作成方法について説明します。

以下のサブセクションでは、Linux/MacOS環境でDockerイメージを使ってHStreamDBをインストールし接続する方法を説明します。Dockerがインストールされており、可能であればDocker Compose v2を使用してください。その他のHStreamDBおよびHStreamDBプラットフォームのインストール方法は、Quickstart with Docker-ComposeおよびGetting Started with HStream Platformを参照してください。

前提条件

HStreamDBサービスの起動とストリームの作成

コネクターの作成

このセクションでは、SinkをHStreamDBサーバーに接続するためのコネクターの作成方法を示します。

以下の手順は、EMQXとHStreamDBをローカルマシンで実行していることを前提としています。リモートで実行している場合は設定を適宜調整してください。

  1. EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでHStreamDBを選択し、Nextをクリックします。
  4. Configurationステップで以下の情報を設定します(*印のある項目は必須です):
    • Connector name:コネクター名を英数字の組み合わせで入力します。例:my_hstreamdb
    • HStreamDB Server URLhstream://127.0.0.1:6570または実際のHStreamDBのアドレスとポートを入力します。
      • スキームはhttphttpshstreamhstreamsをサポートします。
      • TLS接続の場合はスキームをhstreamsまたはhttpsにします。例:hstreams://127.0.0.1:6570
    • HStreamDB Stream Name:事前に作成したストリーム名を入力します。
      • クライアントメッセージ保存用はmqtt_message
      • イベント記録用はmqtt_connect
    • HStreamDB Partition Key:HStreamDB内のパーティションやノードのどこにデータを保存するかを決定するためのパーティションキーを指定します。例として${topic}を指定すると、同じトピックのメッセージがHStreamDB内で順序を保って書き込まれます。未指定の場合はデフォルトキーが使用され、データはデフォルトのシャードにマッピングされます。
    • HStreamDB gRPC Timeout:gRPCリクエスト時にHStreamDBサーバーからの応答を待つ最大時間(秒)を指定します。デフォルトは30秒です。
    • Enable TLS:必要に応じてTLS接続を有効にするためにトグルスイッチをクリックします。TLS有効時はTLS Verifyを無効にします。tls-deploy/caディレクトリで生成した証明書とキーをアップロードします:
      • ca/hstream.crtTLS Certにアップロード
      • ca/hstream.keyTLS Keyにアップロード
      • ca/certs/root_ca.crtCA Certにアップロード
  5. 詳細設定(任意):詳細はFeatures of Sinkを参照してください。
  6. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがHStreamDBサーバーに接続できるかテストできます。
  7. ページ下部のCreateボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてSink付きのルール作成に進み、HStreamDBに転送するデータやクライアントイベントの記録を指定します。詳細はCreate a Rule with HStreamDB Sink for Message StorageおよびCreate a Rule with HStreamDB Sink for Events Recordingを参照してください。

HStreamDB Sinkを使ったメッセージ保存ルールの作成

このセクションでは、ダッシュボード上でソースMQTTトピックt/#からのメッセージを処理し、処理済みデータを設定したSink経由でHStreamDBストリームmqtt_messageに書き込むルールの作成方法を示します。

  1. EMQXダッシュボードにアクセスし、Integration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力し、SQL Editorで以下のステートメントを設定します。これはトピックt/#以下のMQTTメッセージをHStreamDBに保存することを意味します。

    注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドをSELECT句に含めていることを確認してください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方はSQL Examplesをクリックし、Enable Testを有効にしてSQLルールを学習・テストできます。

    • Add Actionボタンをクリックし、ルールがトリガーするアクションを定義します。このアクションにより、EMQXはルールで処理したデータをHStreamDBに送信します。
  4. Type of ActionドロップダウンリストからHStreamDBを選択します。ActionドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのSinkがあれば選択可能ですが、このデモでは新しいSinkを作成します。

  5. Sinkの名前を入力します。名前は英数字の組み合わせにしてください。

  6. Connectorドロップダウンから先ほど作成したmy_hstreamdbを選択します。新しいコネクターを作成する場合はドロップダウン横のボタンをクリックしてください。設定パラメータはCreate a Connectorを参照してください。

  7. メッセージを特定のトピックに転送するためのHStream Record Templateを以下のテンプレートで設定します:

    json
    {"id": ${id}, "topic": "${topic}", "qos": ${qos}, "payload": "${payload}"}
  8. Fallback Actions(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はFallback Actionsを参照してください。

  9. 詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はFeatures of Sinkを参照してください。

  10. Createをクリックする前に、Test ConnectivityをクリックしてSinkがHStreamDBサーバーに接続できるかテストします。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認してCreateをクリックしルールを生成します。

これでHStreamDB Sinkを通じたデータ転送とオンライン/オフライン状態の記録ルールが正常に作成されました。Integration -> Rulesページで新規作成ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいHStreamDB Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#のメッセージがルールmy_ruleで解析されHStreamDBに送信・保存されている様子が確認できます。

HStreamDB Sinkを使ったイベント記録ルールの作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定したSink経由でHStreamDBストリームmqtt_connectに書き込むルールの作成方法を示します。

ルール作成手順はメッセージ保存用のStream Sinkでルールを作成とほぼ同様ですが、SQLルール構文とStream Recordテンプレートが異なります。

オンライン/オフライン状態記録用のSQLルール構文は以下の通りです:

sql
SELECT
  *
FROM
  "$events/client_connected", "$events/client_disconnected"

SinkのStream Record Templateは以下の通りです:

sql
{"clientid": "${clientid}", "event_type": "${event}", "event_time": ${timestamp}}

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello HStreamDB" }'

2つのSinkの稼働状況を確認します。

  • メッセージ保存用Sinkでは、新規の受信メッセージと送信メッセージが1件ずつあるはずです。ストリームmqtt_messageにデータが書き込まれているか確認します:
bash
# ストリーム`mqtt_message`の読み取り後、Control-Cで停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_message
timestamp: "1693903488278", id: 1947758763121538-8589934593-0, key: "", record: {"id": 00060498A3B3C4F8F4400100127E0002, "topic": "t/1", "qos": 0, "payload": { "msg": "Hello HStreamDB" }}
^CRead Done.
  • オンライン/オフライン状態記録用Sinkでは、新たに2件のイベント(クライアント接続・切断)が記録されているはずです。ストリームmqtt_connectに状態記録が書き込まれているか確認します:
bash
# ストリーム`mqtt_connect`の読み取り後、Control-Cで停止
root@9c7ce2f51860:/# hstream stream read-stream mqtt_connect
timestamp: "1693903488274", id: 1947758827604597-8589934593-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.connected", "event_time": 1693903488266}
timestamp: "1693903488294", id: 1947758827604597-8589934594-0, key: "", record: {"clientid": "emqx_c", "event_type": "client.disconnected", "event_time": 1693903488271}
^CRead Done.