Skip to content

Flowデザイナー

注意

FlowデザイナーはEMQX Serverlessエディションでは利用できません。

Flowデザイナーは、データ処理ワークフロー(Flow)を作成・編集するための強力なビジュアルツールです。データ処理と統合の設定を簡素化し効率化します。作成したデータ処理ワークフローのテストも可能です。

Data IntegrationとFlowデザイナーで作成したルールは相互運用可能です。つまり、Flowデザイナーでルールを作成するとData IntegrationでSQLや関連設定を確認でき、逆にData IntegrationのSQLエディターでルールを作成するとFlowデザイナーでそのデータフロー処理設定を確認できます。

flow_designer_overview

主な機能

EMQXプラットフォームコンソールでFlowデザイナーにアクセスするには、デプロイメントを開き、左メニューから Data Integration -> Flow Designer をクリックします。既にルールやデータ統合を作成している場合は、複数のノードで構成された有向非巡回グラフが表示されます。各ノードは、トピックやイベント、Sourceからのデータ読み取り、ルールによるデータ変換、Sinkを使ったデータ転送などの処理ステップを表します。

このページには、ルールとFlowデザイナーで作成されたすべてのデータ処理ワークフローが表示されます。デバイスやクライアントからルール処理を経て外部データシステムへ、またはその逆に外部データシステムからルール処理を経てデバイスへとデータがどのように流れるかを可視化できます。ページを更新すると、ルールやデータ統合の最新の変更が反映されます。

New Flow ボタンをクリックするとFlow作成画面に入り、ビジュアル設定が可能です。必要なノードをドラッグ&ドロップで選択し、それらを接続してワークフローを実装します。

drag_node

Source

Sourceはメッセージ、イベント、または外部データシステムから流れるメッセージをサポートします。Flowは少なくとも1つのSourceノードを含む必要があり、複数のデータ入力ノードを同時にサポート可能です。現在サポートされているSourceノードは以下の通りです:

  • Messages: クライアントがパブリッシュするメッセージのトピックおよびトピックワイルドカードで指定。
  • Event: EMQX内のすべてのクライアントイベントをサポート。詳細はClient Eventsを参照。
  • 外部データシステム:

Processing

Processingでは、関数ノードやフィルターノードを使ってデータ処理やフィルタリングが可能です。このステップは任意で、Flowは最大で1つの関数ノードと1つのフィルターノードをサポートします:

  • データ処理: ルールエンジンのすべての組み込みSQL関数をサポート。
  • フィルター: Sourceからのデータフィールドに対する比較フィルタリングをサポート。サポートされる比較演算子は >, <, <=, >=, <>, !=, =, =~ です。

ビジュアルフォーム編集に加え、ProcessingノードはRule SQL構文で編集可能な式モードへの切り替えもサポートします。フィルターノードは関数ノードの後にのみ接続可能で、つまりデータはまず処理されてからフィルタリングされます。

Sink

Sinkはデータソースや処理結果を特定のノードや外部データシステムに出力します。Flowは少なくとも1つのSinkノードを含む必要があります。サポートされているSinkノードは以下の通りです:

  • Republish: ローカルで指定したMQTTトピックにメッセージをパブリッシュ。
  • 外部データシステム: MySQLやKafkaなど40種類以上のデータシステムをサポート。詳細はData Integrationを参照。

Flowの編集とテスト

Flow作成時、システムはランダムにIDを生成します。ID横の編集アイコンをクリックするとFlowの説明を変更できます。

Flow内のノードを削除するには、ノードにカーソルを合わせて右上の削除アイコンをクリックします。ノードをクリックすると編集モードに入り、設定内容を変更して保存できます。全体のFlowはSaveをクリックして保存します。Start Testボタンをクリックすると、シミュレートデータの入力や実際のクライアントを使ったFlowの動作検証が可能です。

利点

Flowデザイナーは機能豊富で使いやすいツールであり、ユーザーがより効率的にデータ処理と統合を行い、ビジネスのイノベーションを促進し、データ管理の可視性と制御性を向上させます。主な特徴と利点は以下の通りです:

  • 直感的なビジュアルインターフェース: ユーザーはドラッグ&ドロップの簡単な操作でデータ処理ワークフローを作成・調整・カスタマイズでき、プログラミング経験がなくても複雑なデータ統合ロジックを扱えます。
  • 高速なリアルタイム処理: Flowデザイナーにより、メッセージやイベントのリアルタイム処理ワークフローを数分で構築可能。これにより、ビジネスは新たなデータやイベントに迅速に対応でき、リアルタイムのビジネスニーズを支えます。
  • 豊富な統合機能: 40種類以上のデータシステムとシームレスに統合し、柔軟なデータ接続と交換を実現します。
  • 統合管理と監視: ユーザーは統一されたビューでデータ統合プロセス全体を管理でき、各処理ノードの状態やパフォーマンスを把握可能。リアルタイムでデータフローを監視・追跡し、高い信頼性とデータの完全性を確保します。
  • EMQXのデータ処理能力: EMQXのルールSQLやSink/Source機能を活用し、その堅牢なデータ処理性能を継承。UIとSQLエディターを切り替えながら利用でき、SQL編集の柔軟性とより簡単で高速なユーザー体験を両立。EMQXルールSQLの詳細知識がなくてもビジネスのイノベーションやデータ駆動型の意思決定を支援します。

クイックスタート

このセクションでは、サンプルユースケースを通じてFlowデザイナーでのFlowの迅速な作成とテスト方法を説明します。

このデモでは、高温アラートを処理するデータ処理ワークフローを作成します。温度・湿度センサーからMQTTトピック経由でデータを受信し、データのフィルタリングと変換ルールを設定。温度が40℃を超えた場合にアラートメッセージを新しいトピック alert にパブリッシュします。また、ルールの有効性とデータ処理結果をテストで検証する方法も示します。

シナリオ説明

デバイスに温度・湿度センサーがあり、5秒ごとにMQTTトピック sensor/temperature にデータを送信すると仮定します。EMQXプラットフォームのルールエンジンは以下のステップでこのデータを処理します:

  1. データフィルタリング: 温度が40℃を超えるデータのみ処理。
  2. データ変換:
    • デバイスIDを抽出。
    • 温度情報を抽出。
    • ペイロード内のタイムスタンプを組み込み関数で読みやすい日付形式に変換。
  3. メッセージ再パブリッシュ: 処理済みデータをアラートメッセージとして整形し、新しいトピック alert にパブリッシュ。

再パブリッシュされるサンプルデータ:

json
{
  "device_id": "device123",
  "temperature": 22.5,
  "humidity": 60
}

Flowの作成

  1. Flowデザイナー画面で New Flow ボタンをクリック。

  2. Source セクションから Message ノードをキャンバスにドラッグし、メッセージソースのトピック(例:sensor/temperature)を設定。Save をクリック。これはクライアントがパブリッシュするメッセージのソースを指定します。

    message_node

  3. Processing セクションから Data Processing ノードをドラッグし、メッセージから以下のフィールドを抽出するデータ処理ルールを設定:

    • payload.device_id: エイリアスを device_id に設定。
    • payload.temperature: エイリアスを temperature に設定。
    • timestamp: format_date 関数を使い、メッセージのタイムスタンプを読みやすい日時形式に変換。エイリアスを date に設定。
      • Time Unit: millisecond を選択。
      • Time Offset: +08:00 を入力。
      • Data Format: %Y-%m-%d %H:%M:%S.%6N%z を入力。詳細は日時変換関数を参照。
      • Timestamp: timestamp を入力。

    設定後、Save をクリック。

    data_processing_node

  4. Processing から Filter ノードをドラッグし、データフィルタリングルールを設定。フィルター項目を追加し、payload.temperature を入力、演算子に >= を選択、値に 40 を入力して Save

    filter_rule

  5. Sink から Republish ノードを選択し、メッセージ転送先のトピックを alert に設定。処理・変換済みデータを以下のペイロード形式でアラートメッセージに整形:

    bash
    ${device_id} device reported a high temperature of ${temperature}°C at ${date}.

    Save をクリック。

    republish_node

  6. 作成したFlowがページに表示されます。右上の Save をクリックしてFlowを保存。

    flow_created

    Flowとフォームルールは相互運用可能です。Data Integrationページの Rules セクションにも作成したルールが表示され、ルールIDをクリックするとSQLや関連設定を確認できます。

    rule_in_sql_editor

Flowのテスト

  1. FlowデザイナーでFlow内の任意のノードをクリックし、編集パネルを開きます。パネル下部の Edit Flow ボタンをクリック。

  2. Save ボタン横の Start Test をクリックすると下部にポップアップが表示されます。

    ポップアップで Input Simulated Data をクリックしシミュレートデータを入力するか、実際のクライアントからメッセージをパブリッシュして結果を確認できます。このデモではMQTTXを使って実データをパブリッシュします。

    start_test

  3. MQTTX Webを開き、New Connection をクリックしてパブリッシャー用クライアント接続を作成。以下を設定:

    • Name: device1 と入力。
    • Host: デプロイメントの Overview ページの MQTT Connection Information セクションにあるアドレスを入力。
    • Port: 8084 を入力。
    • UsernamePassword: デプロイメントの Access Control -> Authentication ページで設定した認証情報を入力。

    他の設定はデフォルトのままにして Connect をクリック。

  4. 新規サブスクライブを作成し、トピックを alert に設定。

  5. 温度が40℃未満のメッセージをパブリッシュすると、条件を満たさずルールSQLは実行されません。

    message_publish_1

  6. 温度が40℃以上のメッセージをパブリッシュすると、alert トピックでアラートメッセージを受信できます。

    message_publish_2

  7. テスト画面に戻ると成功したテスト結果が表示されます。

    test_success

    テストが失敗した場合は、エラーメッセージが表示されます。

    test_fail