Flowデザイナー
注意
FlowデザイナーはEMQXサーバレスエディションでは利用できません。
Flowデザイナーは、データ処理ワークフロー(Flow)を作成および編集するための強力なビジュアルツールです。データ処理と統合の設定を簡素化し効率化します。また、作成したデータ処理ワークフローのテストも可能です。
データ統合とFlowデザイナーで作成したルールは相互運用可能です。つまり、Flowデザイナーでルールを作成し、そのSQLや関連設定をデータ統合で確認したり、データ統合のSQLエディターでルールを作成し、そのデータフロー処理設定をFlowデザイナーで確認したりできます。

主な機能
EMQX CloudコンソールでFlowデザイナーにアクセスするには、デプロイメントに移動し、左メニューの Data Integration -> Flow Designer をクリックします。既にルールやデータ統合を作成している場合は、複数のノードで構成された有向非巡回グラフが表示されます。各ノードは、トピックやイベント、Sourceからのデータ読み取り、ルールによるデータ変換、Sinkを使ったデータ転送などのデータ処理ステップを表しています。
このページには、ルールおよびFlowデザイナーで作成されたすべてのデータ処理ワークフローが表示されます。デバイスやクライアントからルール処理を経て外部データシステムへ、またはその逆に外部データシステムからルール処理を経てデバイスへとデータがどのように流れるかを可視化できます。ページを更新すると、ルールやデータ統合の最新の変更が反映されます。
New Flow ボタンをクリックすると、Flow作成ページに入り、ビジュアル設定が可能です。必要な各ステップのノードをドラッグ&ドロップで選択し、接続してワークフローを実装できます。

Source
Sourceは、メッセージ、イベント、または外部データシステムから流れるメッセージをサポートします。Flowには少なくとも1つのSourceノードが必要で、複数のデータ入力ノードを同時にサポートできます。現在サポートされているSourceノードは以下の通りです:
- Messages: クライアントがパブリッシュしたメッセージのトピックおよびトピックワイルドカードで指定。
- Event: EMQX内のすべてのクライアントイベントをサポート。詳細はクライアントイベントを参照。
- 外部データシステム:
Processing
Processingでは、関数ノードとフィルターノードを使ってデータ処理およびフィルタリングが可能です。このステップは任意で、Flowは最大で1つの関数ノードと1つのフィルターノードをサポートします:
- データ処理: ルールエンジンのすべての組み込みSQL関数をサポート。
- フィルター: Sourceから来るデータフィールドに対する比較フィルタリングをサポート。サポートされる比較演算子は
>, <, <=, >=, <>, !=, =, =~です。
ビジュアルフォーム編集に加え、ProcessingノードはルールSQL構文で編集可能な式モードへの切り替えもサポートします。フィルターノードは関数ノードの後にのみ接続可能で、データはまず処理されてからフィルターされます。
Sink
Sinkはデータソースおよび処理結果を特定のノードや外部データシステムに出力します。Flowには少なくとも1つのSinkノードが必要です。サポートされているSinkノードは以下の通りです:
- Republish: ローカルで指定したMQTTトピックにメッセージをパブリッシュ。
- 外部データシステム: MySQLやKafkaなど40種類以上のデータシステムをサポート。詳細はデータ統合を参照。
Flowの編集とテスト
Flow作成時にシステムがランダムにIDを生成します。ID横の編集アイコンをクリックするとFlowの説明を変更できます。
Flow内のノードを削除するには、ノードにカーソルを合わせて右上の削除アイコンをクリックします。ノードをクリックすると編集モードに入り、設定内容を変更して保存できます。全体のFlowは画面右上の Save ボタンで保存します。Start Test ボタンをクリックすると、シミュレーションデータを入力したり、実際のクライアントでFlowの動作を検証できます。
利点
Flowデザイナーは機能豊富で使いやすいツールであり、ユーザーがより効率的にデータ処理と統合を行い、ビジネスのイノベーションを促進し、データ管理の可視性と制御性を向上させます。主な特徴と利点は以下の通りです:
- 直感的なビジュアルインターフェース: ユーザーはドラッグ&ドロップの簡単な操作でデータ処理ワークフローを作成・調整・カスタマイズでき、プログラミング経験がなくても複雑なデータ統合ロジックを扱えます。
- 高速なリアルタイム処理: Flowデザイナーはメッセージやイベントのリアルタイム処理ワークフローを数分で構築可能。これによりビジネスは新たなデータやイベントに迅速に対応でき、リアルタイムのビジネスニーズを支援します。
- 豊富な統合機能: 40種類以上のデータシステムとシームレスに統合し、柔軟なデータ接続と交換を実現します。
- 統合管理と監視: ユーザーは統一されたビューでデータ統合全体を明確に管理でき、各処理ノードの状態やパフォーマンスを把握可能。これによりデータフローのリアルタイム監視と追跡ができ、高い信頼性とデータの完全性を確保します。
- EMQXのデータ処理能力: ルールSQLおよびSink/Source機能を活用し、EMQXの堅牢なデータ処理性能を継承。UIとSQLエディターの切り替えが可能で、SQL編集の柔軟性とシンプルかつ高速なユーザー体験を両立し、EMQXルールSQL構文の深い知識がなくてもビジネスのイノベーションとデータ駆動型の意思決定を促進します。
クイックスタート
このセクションでは、サンプルユースケースを通じてFlowデザイナーでのFlowの素早い作成とテスト方法を示します。
このデモでは、高温アラートを処理するデータ処理ワークフローの作成方法を紹介します。ワークフローは温度・湿度センサーからMQTTトピック経由でデータを受信し、データのフィルタリングと変換ルールを設定し、温度が40℃を超えた場合にアラートメッセージを新しいトピック alert にパブリッシュします。また、ルールの有効性とデータ処理結果をテストで検証する方法も示します。
シナリオ説明
デバイスに温度・湿度センサーがあり、5秒ごとにMQTTトピック sensor/temperature にデータを送信するとします。EMQX Cloudのルールエンジンは以下のステップでこのデータを処理します:
- データフィルタリング: 温度が40℃を超えるデータのみ処理。
- データ変換:
- デバイスIDを抽出。
- 温度情報を抽出。
- ペイロード内のタイムスタンプを組み込み関数で読みやすい日時形式に変換。
- メッセージ再パブリッシュ: 処理済みデータをアラートメッセージに整形し、新しいトピック
alertにパブリッシュ。
再パブリッシュされるサンプルデータ:
{
"device_id": "device123",
"temperature": 22.5,
"humidity": 60
}Flowの作成
Flowデザイナー画面で New Flow ボタンをクリック。
Source セクションから Message ノードをキャンバスにドラッグし、メッセージのソーストピック(例:
sensor/temperature)を設定して Save をクリック。これはクライアントがパブリッシュするメッセージのソースを指定するステップです。
Processing セクションから Data Processing ノードをドラッグし、メッセージから以下のフィールドを抽出するデータ処理ルールを設定:
payload.device_id: エイリアスをdevice_idに設定。payload.temperature: エイリアスをtemperatureに設定。timestamp:format_date関数を使い、メッセージのタイムスタンプを読みやすい日時形式に変換。エイリアスをdateに設定。Time Unit:millisecondを選択。Time Offset:+08:00を入力。Data Format:%Y-%m-%d %H:%M:%S.%6N%zを入力。詳細は日時変換関数を参照。Timestamp:timestampを入力。
設定完了後、Save をクリック。

Processing から Filter ノードをドラッグし、データフィルタリングルールを設定。フィルター項目を追加し、
payload.temperatureを入力、演算子>=を選択、値に40を入力して Save をクリック。
Sink から Republish ノードを選択し、メッセージ転送先のトピックを
alertに設定。処理・変換済みデータを以下のペイロードでアラートメッセージに整形:bash${device_id} device reported a high temperature of ${temperature}°C at ${date}.Save をクリック。

作成したFlowがページに表示されます。画面右上の Save をクリックしてFlowを保存。

Flowとフォームルールは相互運用可能です。作成したルールはデータ統合ページの Rules セクションにも表示され、ルールIDをクリックするとSQLや関連設定を確認できます。

Flowのテスト
FlowデザイナーでFlow内の任意のノードをクリックし、編集パネルを開きます。パネル下部の Edit Flow ボタンをクリック。
Save ボタン横の Start Test をクリックすると下部にポップアップが表示されます。
ポップアップ内の Input Simulated Data をクリックしてシミュレーションデータを入力するか、実際のクライアントからメッセージをパブリッシュして結果を確認できます。このデモではMQTTXを使って実データをパブリッシュします。

MQTTX Webを開き、New Connection をクリックしてパブリッシャークライアント接続を作成。以下の項目を設定:
- Name:
device1と入力。 - Host: デプロイメントの Overview ページの MQTT Connection Information セクションにあるアドレスを入力。
- Port:
8084を入力。 - Username と Password: デプロイメントの Access Control -> Authentication ページで設定した認証情報を入力。
他の設定はデフォルトのままにして Connect をクリック。
- Name:
新しいサブスクリプションを作成し、トピックを
alertに設定。温度が40℃未満のメッセージをパブリッシュすると、条件を満たさずルールSQLは実行されません。

温度が40℃以上のメッセージをパブリッシュすると、
alertトピックでアラートメッセージを受信できます。
テストページに戻り、成功したテスト結果を確認します。

テストが失敗した場合は、エラーメッセージが表示されます。
