Flowデザイナー
Flowデザイナーは、従来のビジュアルツールであるFlowsの機能を拡張し、データ処理ワークフロー(Flows)の作成および編集機能を追加した強力なビジュアルツールです。この拡張により、データ処理および統合の設定が簡素化され、効率化されます。EMQX v5.8.0以降では、作成したデータ処理ワークフローのテストも可能です。
Data IntegrationとFlowデザイナーで作成されたルールは相互運用可能です。つまり、Flowデザイナーでルールを作成すると、そのSQLや関連設定をData Integrationで確認でき、またData IntegrationのSQLエディターでルールを作成すると、そのデータフロープロセスの設定をFlowデザイナーで確認できます。

主な機能
EMQXダッシュボードのFlowsページにアクセスするには、左メニューのIntegrations -> Flow Designerをクリックします。既にルールやデータ統合を作成している場合は、複数のノードで構成された有向非巡回グラフが表示されます。各ノードは、トピックやイベント、Sourceからのデータ読み取り、ルールによるデータ変換、アクションやSinksを使ったデータ転送などのデータ処理ステップを表します。
Flowsページには、Rules、Webhook、Flowデザイナーで作成されたすべてのデータ処理ワークフローが表示されます。Flowsを通じて、デバイスやクライアントからルール処理を経て外部データシステムへ、またはその逆に外部データシステムからルール処理を経てデバイスへとデータがどのように流れるかを可視化できます。ページを更新すると、ルールやデータ統合の最新の変更が反映されます。
Create Flowボタンをクリックすると、Flow作成ページに入り、ビジュアル設定が可能です。各ステップに必要なノードをドラッグ&ドロップで選択し、それらを接続してワークフローを実装できます。

Source
データ入力は、メッセージ、イベント、または外部データシステムから流れるメッセージをサポートします。Flowには少なくとも1つのSourceノードが必要で、複数のデータ入力ノードを同時にサポート可能です。現在サポートされているSourceは以下の通りです。
- Messages: クライアントがパブリッシュするメッセージのトピックおよびトピックワイルドカードで指定。
- Event: EMQX内のすべてのクライアントイベントをサポート。詳細はClient Eventsを参照。
- 外部データシステム:
Processing
データ処理およびフィルタリングには、関数ノードとフィルタノードを使用します。このステップは任意であり、Flowは最大で1つの関数ノードと1つのフィルタノードをサポートします。
- データ処理: ルールエンジンのすべてのSQL組み込み関数をサポート。
- フィルタ: Sourceからのデータフィールドに対する比較フィルタリングをサポート。サポートされる比較演算子は
>, <, <=, >=, <>, !=, =, =~です。
ビジュアルフォーム編集に加え、Processingノードは式モードに切り替えてRule SQL構文で編集することも可能です。フィルタノードは関数ノードの後にのみ接続できるため、データはまず処理され、その後フィルタリングされます。
Sink
データソースおよび処理結果を特定のノードや外部データシステムに出力します。Flowには少なくとも1つのSinkノードが必要で、サポートされるSinkノードは以下の通りです。
- Republish: ローカルの指定されたMQTTトピックにメッセージをパブリッシュ。
- Console Output: デバッグ用にメッセージをログに出力。
- 外部データシステム: MySQLやKafkaなど40種類以上のデータシステムをサポート。詳細はData Integrationを参照。
Flowの編集とテスト
Flow作成時、システムは自動的にIDを生成します。IDの横にある編集アイコンをクリックすると、Flowの名前や説明を変更できます。
Flow内のノードを削除するには、ノードにカーソルを合わせて右上の削除アイコンをクリックします。ノードをクリックすると編集モードに入り、設定内容を変更して保存できます。全体のFlowはSaveをクリックして保存します。Start Testボタンをクリックすると、シミュレーションデータの入力や実際のクライアントを使ったテストが可能で、正しく実行されるか検証できます。
利点
Flowデザイナーは機能豊富で使いやすいツールであり、ユーザーがより効率的にデータ処理と統合を行い、ビジネスのイノベーションを促進し、データ管理の可視化と制御を向上させることを支援します。主な特徴と利点は以下の通りです。
- 直感的なビジュアルインターフェース: ユーザーはドラッグ&ドロップの簡単な操作でデータ処理ワークフローを作成・調整・カスタマイズでき、プログラミング経験がなくても複雑なデータ統合ロジックを扱えます。
- 高速なリアルタイム処理: Flowデザイナーにより、メッセージやイベントのリアルタイム処理ワークフローを数分で構築可能。これにより、ビジネスは新たなデータやイベントに迅速に対応でき、リアルタイムのビジネスニーズを支援します。
- 幅広い統合機能: 40種類以上のデータシステムとシームレスに統合し、柔軟なデータ接続と交換を実現します。
- 統合管理と監視: ユーザーは統一ビューでデータ統合プロセス全体を明確に管理でき、各処理ノードの状態とパフォーマンスを把握可能。これによりリアルタイムの監視とトラッキングが可能となり、高い信頼性とデータの完全性を確保します。
- EMQXのデータ処理能力: EMQXのルールSQLおよびSink/Source機能を活用し、堅牢なデータ処理とパフォーマンスの利点を継承。UIとSQLエディターを切り替えながら、SQL編集の柔軟性とより簡単かつ高速なユーザー体験を両立し、EMQXルールSQL構文の深い知識なしにビジネスのイノベーションとデータ駆動型意思決定を促進します。
クイックスタート
このセクションでは、サンプルユースケースを通じてFlowデザイナーでのFlowの迅速な作成とテスト方法を説明します。
このデモでは、高温アラートを処理するデータ処理ワークフローの作成方法を紹介します。ワークフローはMQTTトピック経由で温度・湿度センサーからデータを受信し、データのフィルタリングと変換ルールを設定し、温度が40℃を超えた場合にアラートメッセージを新しいトピックalertにパブリッシュします。また、テストを通じてルールの有効性とデータ処理結果の検証方法も示します。
シナリオ説明
デバイスに温度・湿度センサーが搭載されており、5秒ごとにMQTTトピックsensor/temperatureにデータを送信すると仮定します。EMQXルールエンジンはこのデータを処理し、以下のステップを含みます。
- データフィルタリング: 温度が40℃を超えるデータのみ処理。
- データ変換:
- デバイスIDを抽出。
- 温度情報を抽出。
- ペイロード内のタイムスタンプを組み込み関数で読みやすい日付形式に変換。
- メッセージ再パブリッシュ: 処理済みデータをアラートメッセージに整形し、新しいトピック
alertにパブリッシュ。
再パブリッシュされるサンプルデータ:
{
"device_id": "device123",
"temperature": 22.5,
"humidity": 60
}Flowの作成
FlowsページでCreate Flowボタンをクリックします。
SourceセクションからMessagesノードをキャンバスにドラッグし、メッセージソースのトピック(例:
sensor/temperature)を設定してSaveをクリックします。これはクライアントがパブリッシュするメッセージのソースを指定するステップです。
ProcessingセクションからData Processingノードをキャンバスにドラッグし、メッセージから以下のフィールドを抽出するデータ処理ルールを設定します。
payload.device_id:device_idとしてエイリアス。payload.temperature:temperatureとしてエイリアス。timestamp:format_date関数を使い、メッセージのタイムスタンプを読みやすい日時形式に変換し、dateとしてエイリアス。Time Unit:millisecondを選択。Time Offset:+08:00を入力。Data Format:%Y-%m-%d %H:%M:%S.%6N%zを入力。詳細は日時変換関数を参照。Timestamp:timestampを入力。
設定完了後、Saveをクリックします。

ProcessingからFilterノードを選択し、データフィルタリングルールを設定します。フィルタ項目を追加し、
payload.temperatureを入力、演算子に>=を選択、値に40を入力してSaveをクリックします。
SinkからRepublishノードを選択し、メッセージ転送先のトピックを
alertに設定します。処理・変換済みデータを以下のペイロード形式でアラートメッセージに整形します。bash${device_id} device reported a high temperature of ${temperature}°C at ${date}.Saveをクリックします。

新たに作成されたFlowがページに表示されます。右上のSaveをクリックしてFlowを保存します。

Flowとフォームルールは相互運用可能です。以前に作成したルールのSQLや関連設定はルールページでも確認できます。

Flowのテスト
Flowデザイナーで任意のFlowノードをクリックし、編集パネルを開きます。パネル下部のEdit Flowボタンをクリックします。
Saveボタンの隣にあるStart Testをクリックすると、下部にポップアップが表示されます。
ポップアップ内のInput Simulated Dataをクリックしてシミュレーションデータを入力するか、実際のクライアントからメッセージをパブリッシュして結果を確認できます。このデモではMQTTXを使って実データをパブリッシュします。

MQTTX Webを開き、New Connectionをクリックしてパブリッシャーとしてクライアント接続を作成します。以下の項目を設定します。
- Name:
device1と入力。 - Host: EMQXサーバーの接続アドレスを入力。
- Port:
8084を入力。 - UsernameおよびPassword: Access Control -> Authenticationページで設定した認証情報を入力。
他の設定はデフォルトのままにしてConnectをクリック。
- Name:
新規サブスクリプションを作成し、トピックを
alertに設定。温度が40℃未満のメッセージをパブリッシュします。条件を満たさないため、ルールSQLは実行されません。

温度が40℃以上のメッセージをパブリッシュします。
alertトピックでアラートメッセージを受信できます。
テストページに戻り、テスト成功の結果を確認します。

テストが失敗した場合は、エラーメッセージが表示されます。
