Apache Doris に MQTT データを取り込む
注意
Apache Doris とのデータ統合は、EMQX バージョン 5.91 以降の Dedicated Flex エディションで利用可能です。
Apache Doris は、高い同時接続性、高性能、使いやすさで知られる最新の大規模並列処理(MPP)分析データベースシステムです。リアルタイム分析やデータウェアハウジングのシナリオに特に適しています。EMQX 5.10.0 以降では、MQTT データを Apache Doris と統合でき、効率的なストレージ、リアルタイム分析、強力なデータ可視化を実現します。
本ガイドでは、EMQX Cloud と Apache Doris 間のデータ統合の設定および検証方法について実践的に説明します。
注意
EMQX における Apache Doris データ統合は、Apache Doris バージョン 2.1.7 以降をサポートしています。
動作概要
Apache Doris データ統合は EMQX Cloud の標準機能であり、シンプルな設定で複雑なビジネス開発を可能にします。典型的な IoT アプリケーションでは、EMQX Cloud がデバイス接続とメッセージの送受信を担当し、Apache Doris はデータストレージプラットフォームとして、デバイス状態やメタデータ、メッセージデータの保存および分析を担当します。

EMQX Cloud はルールエンジンとアクションを通じてデバイスイベントやデータを Apache Doris に転送します。アプリケーションは Apache Doris のデータを読み取り、デバイス状態の把握、オンライン・オフライン記録の取得、デバイスデータの分析を行えます。具体的なワークフローは以下の通りです。
- IoT デバイスが EMQX Cloud に接続:MQTT プロトコルを介して IoT デバイスが正常に接続されると、オンラインイベントがトリガーされます。イベントにはデバイス ID、送信元 IP アドレスなどの属性情報が含まれます。
- メッセージのパブリッシュと受信:デバイスは特定のトピックにテレメトリやステータスデータをパブリッシュします。EMQX Cloud はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、特定のトピックに基づいてメッセージやイベントを処理します。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
- Apache Doris への書き込み:ルールによりメッセージの Apache Doris への書き込みがトリガーされます。SQL テンプレートを用いて、ルール処理結果からデータを抽出し、SQL を構築して Apache Doris に送信、メッセージの特定フィールドを対応するテーブル・カラムに書き込むか更新します。
イベントおよびメッセージデータが Apache Doris に書き込まれた後は、Apache Doris に接続してデータを読み取り、柔軟なアプリケーション開発が可能です。例えば:
- Grafana などの可視化ツールに接続し、データに基づくグラフ作成やデータ変化の表示。
- デバイス管理システムに接続し、デバイス一覧や状態の確認、異常検知、潜在的問題の早期解消。
特長と利点
Apache Doris とのデータ統合により、以下の特長と利点が得られます。
- 柔軟なイベント処理:EMQX のルールエンジンを通じて、Apache Doris はデバイスのライフサイクルイベントを処理でき、IoT アプリケーションに必要な各種管理・監視タスクの開発を大幅に支援します。イベントデータを分析することで、デバイスの故障や異常動作、トレンド変化を迅速に検知し、適切な対応が可能です。
- メッセージ変換:メッセージは EMQX のルールで多様な処理・変換を経て Apache Doris に書き込まれるため、保存や利用がより便利になります。
- リアルタイムデータ取り込み:Apache Doris は HTTP や JDBC インターフェースによるリアルタイムデータ取り込みをサポートします。EMQX と統合することで、MQTT データを低レイテンシで直接 Doris テーブルに書き込め、即時クエリや分析が必要なシナリオに最適です。
- ストリーミング同期:Apache Doris は Flink、Kafka、トランザクションデータベースなどのリアルタイムデータストリーム取り込みもサポートし、EMQX の MQTT データと他のストリーミングデータを統合した包括的なリアルタイム分析パイプライン構築に適しています。
- 標準 SQL とエコシステム互換性:Doris は MySQL 構文に完全対応し、標準 SQL をサポートするため、ユーザーは新たな言語を学ぶことなく強力な分析クエリを実行できます。BI ツールやクライアントアプリケーションとの連携も容易で、ダッシュボードやレポート、ワークフロー自動化に活用できます。
- ランタイムメトリクス:各アクションの実行時メトリクス(総メッセージ数、成功/失敗数、現在のレートなど)の閲覧をサポートします。
柔軟なイベント処理、多様なメッセージ変換、柔軟なデータ操作、リアルタイム監視・分析機能により、効率的で信頼性が高くスケーラブルな IoT アプリケーションを構築し、ビジネスの意思決定や最適化に貢献します。
はじめる前に
このセクションでは、EMQX Cloud コンソールで Apache Doris データ統合を作成する前に必要な準備について説明します。Apache Doris サーバーのインストールやデータテーブルの作成が含まれます。
前提条件
ネットワーク設定
データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。
Dedicated Flexデプロイメントの場合:
EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。
パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。
BYOC(Bring Your Own Cloud)デプロイメントの場合:
BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。
対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。
Apache Doris サーバーのインストール
公式ガイド に従い、Docker Compose を使ってローカルで Doris を起動してください。
データテーブルの作成
MySQL クライアントを使って Doris Frontend に接続し、コマンドを発行できます。詳細は公式ドキュメントを参照してください。
例:
mysql -uroot -P9030 -h127.0.0.1Apache Doris に以下のデータベースと2つのテーブルを作成する必要があります。
emqx_messagesテーブル:クライアント ID、トピック、ペイロード、作成日時を保存します。emqx_client_eventsテーブル:クライアント ID、イベントタイプ、作成日時を保存します。
create database mqtt;
use mqtt;
create table if not exists
emqx_messages(
clientid varchar,
topic string,
payload string,
created_at datetime
)
properties (replication_num = 1);
create table if not exists
emqx_client_events(
clientid varchar,
event varchar,
created_at datetime)
properties (replication_num = 1);コネクターの作成
このセクションでは、EMQX Cloud が Apache Doris サーバーにデータを送信できるようにコネクターを作成する方法を説明します。
- デプロイメントに移動し、左側のナビゲーションメニューから データ統合 をクリックします。
- 初めてコネクターを作成する場合は、データ永続化 カテゴリの中から Doris を選択します。すでにコネクターを作成している場合は、新規コネクター を選択し、続いて データ永続化 カテゴリの中から Doris を選択します。
- 新規コネクター ページで以下の情報を設定します。
- コネクター名:システムが自動的に名前を生成します。
- サーバーホスト:
127.0.0.1:9030または Apache Doris サーバーがリモートの場合は実際のホスト名を入力します。 - データベース名:
mqttを入力します。 - ユーザー名:
rootを入力します。 - パスワード:
publicを入力します。
- ビジネスニーズに応じて詳細設定(任意)を行います。
- 新規作成 をクリックする前に、テスト をクリックしてコネクターが Apache Doris サーバーに接続できるか確認できます。
- ページ下部の 新規作成 ボタンをクリックしてコネクター作成を完了します。
メッセージ保存用ルールの作成
このセクションでは、ソース MQTT トピック t/# からのメッセージを処理し、処理結果を設定済みアクション経由で Apache Doris の emqx_messages テーブルに保存するルールの作成方法を説明します。
ルールエリアの 新規ルール をクリックするか、作成したコネクターの アクション 列にある新規ルールアイコンをクリックします。
ルール ID に
my_ruleを入力し、SQL エディター に以下のステートメントを設定します。これはトピックt/#以下の MQTT メッセージを Apache Doris に保存することを意味します。注意:独自の SQL 構文を指定する場合は、アクションで必要なすべてのフィールドを
SELECT部分に含めていることを確認してください。sqlSELECT * FROM "t/#"TIP
初心者の方は、SQL Examples と Try It Out をクリックして SQL ルールを学習・テストしてください。
次へ をクリックしてアクションを追加します。
コネクター ドロップダウンから先ほど作成したコネクターを選択します。
利用機能に応じて SQL テンプレート を設定します。
注意:これは事前処理された SQL なので、フィールドは引用符で囲まず、ステートメントの末尾にセミコロンを付けないでください。
sqlINSERT INTO emqx_messages(clientid, topic, payload, created_at) VALUES( ${clientid}, ${topic}, ${payload}, FROM_UNIXTIME(${timestamp}/1000) )SQL テンプレート内でプレースホルダ変数が未定義の場合、SQL テンプレート 上部の Undefined Vars as Null スイッチでルールエンジンの動作を切り替えられます。
無効(デフォルト):ルールエンジンは文字列
undefinedをデータベースに挿入します。有効:変数が未定義の場合、ルールエンジンは
NULLをデータベースに挿入します。TIP
可能な限りこのオプションは有効にしてください。無効にするのは後方互換性を保つ場合のみです。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細は フォールバックアクション を参照してください。
確認 ボタンをクリックしてアクション設定を完了します。
新規ルール成功 ポップアップで ルールに戻る をクリックし、データ統合設定の一連の流れを完了します。
イベント記録用ルールの作成
このセクションでは、クライアントのオンライン/オフライン状態を記録し、イベントデータを設定済みアクション経由で Apache Doris の emqx_client_events テーブルに保存するルールの作成方法を説明します。
ルール作成手順はメッセージ保存用ルールの作成とほぼ同様ですが、SQL ルール構文と SQL テンプレートが異なります。
オンライン/オフライン状態記録用の SQL エディターには以下のステートメントを入力します。
SELECT
*
FROM
"$events/client/connected", "$events/client/disconnected"クライアイベントデータをテーブルに挿入する SQL テンプレートは以下の通りです。
INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
${clientid},
${event},
FROM_UNIXTIME(${timestamp}/1000)
)ルールのテスト
MQTTX を使ってトピック t/1 にメッセージを送信し、オンライン/オフラインイベントをトリガーします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello Apache Doris" }'2つのアクションの実行状況を確認してください。新規の受信メッセージと送信メッセージがそれぞれ1件ずつ、イベントレコードが2件あるはずです。
emqx_messages テーブルにデータが書き込まれているか確認します。
mysql> select * from emqx_messages;
+----------+-------+--------------------------+---------------------+
| clientid | topic | payload | created_at |
+----------+-------+--------------------------+---------------------+
| emqx_c | t/1 | { "msg": "hello Apache Doris" } | 2022-12-09 08:44:07 |
+----------+-------+--------------------------+---------------------+
1 row in set (0.01 sec)emqx_client_events テーブルにデータが書き込まれているか確認します。
mysql> select * from emqx_client_events;
+----------+---------------------+---------------------+
| clientid | event | created_at |
+----------+---------------------+---------------------+
| emqx_c | client.connected | 2022-12-09 08:44:07 |
| emqx_c | client.disconnected | 2022-12-09 08:44:07 |
+----------+---------------------+---------------------+
2 rows in set (0.00 sec)