Subscription Filterの使い方
このページでは、EMQXでSubscription Filter機能を有効化し、実際に動作を確認する手順を説明します。MQTTX CLIを使ってパブリッシャーと複数のサブスクライバーをシミュレートし、フィルター式が各サブスクライバーに届くメッセージをどのように制御するかを観察します。
前提条件
開始する前に、以下を準備してください。
- EMQX 6.2以上が稼働していること
- MQTTX CLI がインストールされていること
ステップ1: Subscription Filterを有効化する
Subscription Filterはデフォルトで無効化されています。無効時は、?文字はトピック文字列の通常の一部として扱われ、既存のサブスクリプションとの完全な互換性が保たれます。
ダッシュボードから有効化
- Management -> MQTT Settings -> Generalタブに移動します。
- Subscription Message Filterフィールドを見つけて、enableに設定します。
- Save Changesをクリックします。
変更はブローカーの再起動なしに即時反映されます。
設定ファイルから有効化
emqx.confに以下を追加します。
mqtt.subscription_message_filter = enable変更を反映するにはEMQXを再起動するか、デプロイ環境が対応していれば設定のリロードを行ってください。
REST APIから有効化
curl -s -u key:secret -X PUT \
-H "Content-Type: application/json" \
http://localhost:18083/api/v5/configs/mqtt \
-d '{"subscription_message_filter": "enable"}'有効化後、クライアントはサブスクリプションにフィルター式を付加できます。構文の詳細や例は、Subscription Filter概要のFilter Syntaxを参照してください。
ステップ2: サブスクライバーを起動する
この例では、センサーがsensor/1/temperatureに温度データをパブリッシュし、各メッセージにはlocationというUser Propertyが含まれています。3つのサブスクライバーが同じトピックを異なるフィルター式でサブスクライブします。
| サブスクライバー | サブスクリプション | 受信するメッセージ条件 |
|---|---|---|
sub-roomA | sensor/+/temperature?location=roomA | location=roomA |
sub-roomB | sensor/+/temperature?location=roomB | location=roomB |
sub-all | sensor/+/temperature | 全メッセージ(フィルターなし) |
3つのターミナルを開き、それぞれのサブスクライバーを起動します。
ターミナル1: roomAサブスクライバー
mqttx sub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id sub-roomA \
-t "sensor/+/temperature?location=roomA"ターミナル2: roomBサブスクライバー
mqttx sub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id sub-roomB \
-t "sensor/+/temperature?location=roomB"ターミナル3: フィルターなしサブスクライバー
mqttx sub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id sub-all \
-t "sensor/+/temperature"TIP
--mqtt-version 5フラグは必須です。Subscription FilterはMQTT 5.0の機能に依存しています。
ステップ3: Room A向けメッセージをパブリッシュする
4つ目のターミナルで、User Propertiesにlocation=roomAを含むメッセージをパブリッシュします。
mqttx pub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id publisher \
-t "sensor/1/temperature" \
-m '{"value": 23.5}' \
--user-properties "location: roomA"期待される結果:
| サブスクライバー | メッセージ受信の有無 |
|---|---|
sub-roomA | 受信(location=roomAが一致) |
sub-roomB | 非受信(location値が一致しない) |
sub-all | 受信(フィルターなし) |
ステップ4: Room B向けメッセージをパブリッシュする
mqttx pub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id publisher \
-t "sensor/1/temperature" \
-m '{"value": 19.1}' \
--user-properties "location: roomB"期待される結果:
| サブスクライバー | メッセージ受信の有無 |
|---|---|
sub-roomA | 非受信(location値が一致しない) |
sub-roomB | 受信(location=roomBが一致) |
sub-all | 受信(フィルターなし) |
ステップ5: 複数条件(ANDロジック)のテスト
Subscription Filterは&で複数条件を結合できます。locationとunitの両方が一致する必要があるサブスクライバーを起動します。
mqttx sub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id sub-roomA-celsius \
-t "sensor/+/temperature?location=roomA&unit=celsius"両条件を満たすメッセージをパブリッシュします。
mqttx pub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id publisher \
-t "sensor/1/temperature" \
-m '{"value": 22.0}' \
--user-properties "location: roomA" \
--user-properties "unit: celsius"sub-roomA-celsiusサブスクライバーはメッセージを受信します。次にunitが不一致のメッセージをパブリッシュします。
mqttx pub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id publisher \
-t "sensor/1/temperature" \
-m '{"value": 71.6}' \
--user-properties "location: roomA" \
--user-properties "unit: fahrenheit"sub-roomA-celsiusサブスクライバーはこのメッセージを受信しません。location=roomAは一致していますが、unit条件が満たされていないためです。
ステップ6: User Propertiesなしのメッセージをパブリッシュする
User Propertiesなしのメッセージをパブリッシュします。
mqttx pub -h localhost -p 1883 \
--mqtt-version 5 \
--client-id publisher \
-t "sensor/1/temperature" \
-m '{"value": 20.0}'期待される結果:
| サブスクライバー | メッセージ受信の有無 |
|---|---|
sub-roomA | 非受信(locationキーが存在しない) |
sub-roomB | 非受信(locationキーが存在しない) |
sub-all | 受信(フィルターなし) |
これは、必須のUser Propertyキーが欠落している場合、フィルター式付きサブスクライバーにはメッセージがフィルタリングされることを示しています。
まとめ
| シナリオ | 動作 |
|---|---|
| メッセージのUser Propertiesがフィルター式に一致する | 配信される |
| メッセージのUser Propertiesが部分的に一致(AND条件未達成) | 配信されない |
| 必須のUser Propertyキーが存在しない | 配信されない |
| サブスクリプションにフィルター式がない | トピックにマッチする全メッセージが配信される |
次のステップ
- Subscription Filter概要: 設計、概念、ユースケースを詳しく理解する
- ワイルドカードサブスクリプション: ワイルドカードトピックフィルターとSubscription Filterを組み合わせて柔軟なルーティングを実現する