Skip to content

Apache Doris に MQTT データを取り込む

注意

Apache Doris とのデータ統合は、EMQX バージョン 5.91 以降の専用版で利用可能です。

Apache Doris は、高い同時接続性、高性能、使いやすさで知られる最新の大規模並列処理(MPP)分析データベースシステムです。リアルタイム分析やデータウェアハウジングのシナリオに特に適しています。EMQX 5.10.0 以降では、MQTT データを Apache Doris と統合でき、効率的な保存、リアルタイム分析、強力なデータ可視化を実現します。

本ガイドでは、EMQX プラットフォームと Apache Doris 間のデータ統合の設定および検証方法について実践的に説明します。

注意

EMQX における Apache Doris データ統合は、Apache Doris バージョン 2.1.7 以降をサポートしています。

動作概要

Apache Doris データ統合は EMQX プラットフォームの標準機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的な IoT アプリケーションでは、EMQX プラットフォームがデバイス接続とメッセージの転送を担当し、Apache Doris はデバイスの状態やメタデータ、メッセージデータの保存および分析を担当します。

doris-integration

EMQX プラットフォームはルールエンジンとアクションを通じてデバイスイベントやデータを Apache Doris に転送します。アプリケーションは Apache Doris のデータを読み取り、デバイスの状態を把握したり、オンライン・オフラインの記録を取得したり、デバイスデータを分析したりできます。具体的なワークフローは以下の通りです。

  • IoT デバイスが EMQX プラットフォームに接続: IoT デバイスが MQTT プロトコルを通じて正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイス ID、送信元 IP アドレスなどの情報が含まれます。
  • メッセージのパブリッシュと受信: デバイスは特定のトピックにテレメトリや状態データをパブリッシュします。EMQX プラットフォームがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  • ルールエンジンによるメッセージ処理: 組み込みのルールエンジンにより、特定のソースからのメッセージやイベントをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  • Apache Doris への書き込み: ルールによりメッセージを Apache Doris に書き込む処理がトリガーされます。SQL テンプレートを利用して、ルール処理結果からデータを抽出し SQL を構築、Apache Doris に送信して実行することで、メッセージの特定フィールドを対応するテーブルやカラムに書き込んだり更新したりします。

イベントおよびメッセージデータが Apache Doris に書き込まれた後は、Apache Doris に接続してデータを読み取り、以下のような柔軟なアプリケーション開発が可能です。

  • Grafana などの可視化ツールに接続し、データに基づくチャートを生成してデータの変化を表示。
  • デバイス管理システムに接続し、デバイス一覧や状態を確認、異常なデバイス動作を検知して潜在的な問題をタイムリーに解消。

特長とメリット

Apache Doris とのデータ統合により、以下の特長と利点をビジネスにもたらします。

  • 柔軟なイベント処理: EMQX ルールエンジンを通じて、Apache Doris はデバイスのライフサイクルイベントを処理可能であり、IoT アプリケーション実装に必要な各種管理・監視タスクの開発を大幅に容易にします。イベントデータを分析することで、デバイス故障や異常動作、トレンド変化を迅速に検知し、適切な対策を講じられます。
  • メッセージ変換: メッセージは EMQX ルールを通じて多様な処理・変換が可能であり、Apache Doris への保存や利用をより便利にします。
  • リアルタイムデータ取り込み: Apache Doris は HTTP および JDBC インターフェースによるリアルタイムデータ取り込みをサポートします。EMQX と統合することで、MQTT データを低レイテンシで直接 Doris テーブルに書き込め、即時のクエリや分析が必要なシナリオに最適です。
  • ストリーミング同期: Apache Doris は Flink、Kafka、トランザクションデータベースなどのリアルタイムストリームの取り込みもサポートします。これにより、EMQX の MQTT データと他のストリーミングデータソースを統合した統一パイプラインを構築し、包括的なリアルタイム分析が可能です。
  • 標準 SQL とエコシステム互換性: Doris は MySQL 構文に完全互換で標準 SQL をサポートし、新たな言語を学ぶことなく強力な分析クエリを実行できます。BI ツールやクライアントアプリケーションと容易に統合でき、ダッシュボード、レポート、自動化ワークフローに活用できます。
  • 実行時メトリクス: 各アクションの実行時メトリクス(総メッセージ数、成功/失敗数、現在のレートなど)を閲覧可能です。

柔軟なイベント処理、多彩なメッセージ変換、柔軟なデータ操作、リアルタイム監視・分析機能により、効率的で信頼性が高くスケーラブルな IoT アプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。

はじめる前に

このセクションでは、EMQX プラットフォームコンソールで Apache Doris データ統合を作成する前に必要な準備(Apache Doris サーバーのインストールやデータテーブルの作成)について説明します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Apache Doris サーバーのインストール

公式ガイドに従い、Docker Compose を使ってローカルで Doris を起動してください。

データテーブルの作成

MySQL クライアントを使って Doris Frontend に接続し、コマンドを発行します。詳細は公式ドキュメントを参照してください。

例:

sh
mysql -uroot -P9030 -h127.0.0.1

Apache Doris に以下のデータベースと2つのテーブルを作成してください。

  • emqx_messages テーブルは、クライアント ID、トピック、ペイロード、作成日時を保存します。
  • emqx_client_events テーブルは、クライアント ID、イベントタイプ、作成日時を保存します。
sql
create database mqtt;
use mqtt;

create table if not exists
  emqx_messages(
    clientid varchar,
    topic string,
    payload string,
    created_at datetime
  )
  properties (replication_num = 1);

create table if not exists
  emqx_client_events(
    clientid varchar,
    event varchar,
    created_at datetime)
  properties (replication_num = 1);

コネクターの作成

このセクションでは、EMQX プラットフォームが Apache Doris サーバーにデータを送信できるようにコネクターを作成する方法を説明します。

  1. デプロイメントに移動し、左側メニューから データ統合 をクリックします。
  2. 初めてコネクターを作成する場合は、データ永続化 カテゴリの下にある Doris を選択します。すでにコネクターを作成済みの場合は、新規コネクター を選択し、続いて データ永続化 カテゴリの下の Doris を選択します。
  3. 新規コネクター ページで以下の情報を設定します。
    • コネクター名: システムが自動生成します。
    • サーバーホスト: 127.0.0.1:9030 または Apache Doris サーバーがリモートの場合は実際のホスト名を入力します。
    • データベース名: mqtt を入力します。
    • ユーザー名: root を入力します。
    • パスワード: public を入力します。
  4. ビジネスニーズに応じて詳細設定(任意)を行います。
  5. 新規 ボタンをクリックする前に、テスト をクリックしてコネクターが Apache Doris サーバーに接続できるか確認できます。
  6. ページ下部の 新規 ボタンをクリックしてコネクターの作成を完了します。

メッセージ保存用ルールの作成

このセクションでは、ソース MQTT トピック t/# からのメッセージを処理し、設定済みアクションを通じて Apache Doris のデータテーブル emqx_messages に保存するルールの作成方法を説明します。

  1. ルールエリアの 新規ルール をクリックするか、作成したコネクターの アクション 列にある新規ルールアイコンをクリックします。

  2. ルール ID に my_rule を入力し、SQL エディター に以下のステートメントを設定します。これはトピック t/# 以下の MQTT メッセージを Apache Doris に保存することを意味します。

    注意: 独自の SQL 構文を指定する場合は、アクションで必要なすべてのフィールドを SELECT 部分に含めてください。

    sql
    SELECT
      *
    FROM
      "t/#"

    TIP

    初心者の方は SQL Examples をクリックし、Enable Test を有効にして SQL ルールを学習・テストできます。

  3. 次へ をクリックしてアクションを追加します。

  4. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  5. 利用する機能に応じて SQL テンプレート を設定します。

    注意: これは事前処理された SQL なので、フィールドは引用符で囲まず、文末にセミコロンを付けないでください。

    sql
    INSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES(
      ${clientid},
      ${topic},
      ${payload},
      FROM_UNIXTIME(${timestamp}/1000)
    )

    SQL テンプレート内でプレースホルダ変数が未定義の場合、SQL テンプレート 上部の Undefined Vars as Null スイッチでルールエンジンの動作を切り替えられます。

    • 無効(デフォルト): ルールエンジンは文字列 undefined をデータベースに挿入します。

    • 有効: 変数が未定義の場合、ルールエンジンは NULL を挿入します。

      TIP

      可能な限りこのオプションは常に有効にしてください。無効にするのは後方互換性を確保する場合のみです。

  6. フォールバックアクション(任意): メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細は フォールバックアクション を参照してください。

  7. 確認 ボタンをクリックしてアクション設定を完了します。

  8. 新規ルール作成成功 ポップアップで ルールに戻る をクリックし、データ統合設定の一連の流れを完了します。

イベント記録用ルールの作成

このセクションでは、クライアントのオンライン/オフライン状態を記録し、設定済みアクションを通じて Apache Doris テーブル emqx_client_events にイベントデータを保存するルールの作成方法を説明します。

ルール作成手順はメッセージ保存用ルールの作成とほぼ同様ですが、SQL ルール構文と SQL テンプレートが異なります。

オンライン/オフライン状態記録用のルールは、SQL エディター に以下のステートメントを入力します。

sql
SELECT
  *
FROM
  "$events/client/connected", "$events/client/disconnected"

クライアントイベントデータをデータテーブルに挿入するための SQL テンプレートは以下の通りです。

sql
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  ${clientid},
  ${event},
  FROM_UNIXTIME(${timestamp}/1000)
)

ルールのテスト

MQTTX を使ってトピック t/1 にメッセージを送信し、オンライン/オフラインイベントをトリガーします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Apache Doris" }'

2つのアクションの稼働状況を確認してください。新しい受信メッセージと送信メッセージがそれぞれ1件ずつ、イベントレコードが2件あるはずです。

emqx_messages データテーブルにデータが書き込まれているか確認します。

bash
mysql> select * from emqx_messages;
+----------+-------+--------------------------+---------------------+
| clientid | topic | payload                  | created_at          |
+----------+-------+--------------------------+---------------------+
| emqx_c   | t/1   | { "msg": "hello Apache Doris" } | 2022-12-09 08:44:07 |
+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)

emqx_client_events テーブルにデータが書き込まれているか確認します。

bash
mysql> select * from emqx_client_events;
+----------+---------------------+---------------------+
| clientid | event               | created_at          |
+----------+---------------------+---------------------+
| emqx_c   | client.connected    | 2022-12-09 08:44:07 |
| emqx_c   | client.disconnected | 2022-12-09 08:44:07 |
+----------+---------------------+---------------------+
2 rows in set (0.00 sec)