LLMベースのMQTTデータ処理
注意
LLMベースのMQTTデータ処理は、EMQXバージョン5.91以降の専用エディションで利用可能です。
Flowデザイナーは、OpenAI GPTやAnthropic Claudeなどの大規模言語モデル(LLM)との統合をサポートしています。この機能により、ログの要約、センサー情報の分類、MQTTメッセージの拡充、リアルタイムインサイトの生成など、自然言語プロンプトを使ったインテリジェントなメッセージフローを構築できます。
機能概要
FlowデザイナーのLLMベース処理ノードは、外部のLLM APIに接続してメッセージ内容を処理するAI搭載コンポーネントです。これらのノードを使うことで、MQTTデータをgpt-4o
やclaude-3-sonnet
などのモデルに送信し、応答を受け取り、フローの下流に渡すことができます。
注意
LLMの呼び出しとデータ処理には時間がかかります。モデルの応答速度によっては、処理全体で数秒から10秒以上かかる場合があります。そのため、LLM処理ノードは高いメッセージスループット(TPS)が求められるシナリオには適していません。
主要な概念
- LLMプロバイダー:AIサービス(OpenAI / Anthropic)の名前付き設定
- Completion Profile:LLMモデルのパラメーター(モデルID、システムプロンプト、トークン制限など)をまとめた再利用可能な設定
- AI Completion Node:入力をLLMに送信し、その結果をユーザー定義のエイリアスとして保存するフローコンポーネント
ai_completion
:テキストやバイナリデータをLLMに送信し応答を返すRule SQL関数
動作の仕組み
FlowデザイナーでMQTTメッセージを受信すると、AI Completion Nodeは内部的に組み込みのSQL関数ai_completion/2,3
を呼び出して、設定されたLLMにデータを送信します。
メッセージはMessagesノード(例:トピックのサブスクライブ)を経由してフローに入ります。
Data Processingノード(任意)で
device_id
、payload
、timestamp
などのフィールドを抽出または変換できます。OpenAIまたはAnthropicノードは、背後で
ai_completion
関数を使用して以下を行います:- プロバイダー情報、モデル名、システムメッセージなどを含む選択されたCompletion Profileを参照
- 選択された入力(例:
payload
)をLLMに送信 - LLM APIからの応答(例:要約や分類結果)を受信
応答はOutput Result Aliasに保存され、以下のような任意の下流ノードで利用可能になります:
Republish(別のトピックにパブリッシュする)
Database(PostgreSQL、MongoDBなどに結果を挿入)
Bridge(リモートブローカーやクラウドサービスに転送)
対応LLMプロバイダー
EMQX 5.10.0では以下のプロバイダーをサポートしています:
- OpenAI:GPT-3.5、GPT-4、GPT-4oなど
- Anthropic:Claude 3モデル
LLMベース処理ノードの設定
FlowデザイナーでLLMを利用するには、OpenAIノードまたはAnthropicノードのいずれかを選択して専用の処理ノードを設定する必要があります。各ノードでは、MQTTメッセージのどのフィールドをLLMに送るか、システムプロンプトでモデルの振る舞いをどう制御するか、AI生成結果をどこに保存するかを定義できます。設定後、これらのノードは背後でai_completion
関数をシームレスに呼び出し、選択したLLMでデータを処理します。
OpenAIノードの設定
OpenAIノードを使用するには:
ProcessingパネルからOpenAIノードをドラッグします。
ソースまたは前処理ノードに接続します。
以下の項目を設定します:
Input:入力フィールドのタイプまたはソースを選択または入力します。選択肢は
event
、id
、clientid
、username
、payload
などです。System Message:AIモデルに期待する出力を生成させるためのプロンプトメッセージを入力します。例:「入力JSONデータの数値キーの値を合計し、その結果のみを出力してください」。
Model:LLMプロバイダーのモデルを選択します。例:
gpt-4o
、gpt-3.5-turbo
。API Key:OpenAIのAPIキーを入力します。
Base URL:カスタムエンドポイントを任意で入力します。空欄の場合はOpenAIのデフォルトエンドポイントが使用されます。
Output Result Alias:LLMの出力を格納する変数名です。アクションや後続処理で参照するために使用します。例:
summary
。TIP
エイリアスに英数字とアンダースコア以外の文字が含まれる場合、数字で始まる場合、またはSQLキーワードの場合は、エイリアスをダブルクォーテーションで囲んでください。
保存をクリックして設定を適用します。
Anthropicノードの設定
Anthropicノードを使用するには:
ProcessingパネルからAnthropicノードをドラッグします。
メッセージ入力ノードまたはData Processingノードに接続します。
以下の項目を入力します:
Input:入力フィールドのタイプまたはソースを選択または入力します。選択肢は
event
、id
、clientid
、username
、payload
などです。System Message:AIモデルに期待する出力を生成させるためのプロンプトメッセージを入力します。例:「入力JSONデータの数値キーの値を合計し、その結果のみを出力してください」。
Model:LLMプロバイダーのモデルを選択します。例:
claude-3-sonnet-20240620
。Max Tokens:応答の長さを制御します(デフォルト:
100
)。Anthropic Version:Anthropicのバージョンを選択します(デフォルト:
2023-06-01
)。API Key:AnthropicのAPIキーを入力します。
Base URL:カスタムエンドポイントを任意で入力します。空欄の場合はAnthropicのデフォルトエンドポイントが使用されます。
Output Result Alias:LLMの出力を格納する変数名です。アクションや後続処理で参照するために使用します。例:
summary
。TIP
エイリアスに英数字とアンダースコア以外の文字が含まれる場合、数字で始まる場合、またはSQLキーワードの場合は、エイリアスをダブルクォーテーションで囲んでください。
保存をクリックして設定を適用します。
クイックスタート
以下の2つの例では、FlowデザイナーでLLMベース処理ノードを使ったフローの迅速な構築とテスト方法を示しています:
- OpenAIノードを使ったフローの作成:GPTモデルを使ってMQTTメッセージを要約または変換します。
- Anthropicノードを使ったフローの作成:Claudeモデルを使ってMQTTメッセージ内の数値を処理します。