オフラインメッセージ
このプラグインはMQTTメッセージを外部データベースにパーシステンスし、サブスクライバーが再接続後にメッセージを取得できるようにします。パブリッシュ時に切断されていた場合でもメッセージを取り出せます。
標準のMQTTセッションパーシステンスでは不十分なケース、例えばセッションを超えてメッセージを保持したい場合や、他のシステムでもメッセージ履歴を参照する必要がある場合に有用です。
対応バックエンド:
- MySQL:2つのテーブル(
mqtt_msg、mqtt_sub)を使用し、SQL文は設定可能です。 - Redis:サブスクリプション/メッセージはハッシュで管理し、トピックごとのインデックスはソート済みセットを使用します。シングル、センチネル、クラスターデプロイに対応しています。
同時に有効にできるバックエンドは1つのみで、mysql.enable または redis.enable のいずれかを有効にしてください。
プラグイン設定
プラグインには両バックエンドをカバーするデフォルトの config.hocon が付属しています。ダッシュボードのプラグイン詳細ページも同じスキーマを表示するため、ほとんどの運用者はファイルを直接編集せずUIから設定します。
共通のトップレベルフィールド:
topics:プラグインが追跡するトピックフィルターのリスト。空リストの場合、そのバックエンドは無効になります。pool_size:バックエンドへの接続プールサイズ。batch_size/batch_time:書き込みバッチのパラメータ(両方を1/0に設定するとバッチ処理を無効化)。ssl.*:バックエンド接続のTLS設定。
MySQL固有設定
server:host:port。username、password、database:認証情報。init_default_schema:trueの場合、プラグイン起動時にデフォルトのmqtt_msg/mqtt_subテーブルを作成します。スキーマを自分で管理する場合はfalseにしてください。insert_message_sql、delete_message_sql、select_message_sql、insert_subscription_sql、select_subscriptions_sql、delete_subscription_sql:MQTTメッセージフィールド(id、from、topic、qos、payload、flags.retain、timestamp)およびサブスクリプションフィールド(clientid、topic、qos)にバインドされる${var}プレースホルダーを含む、上書き可能なSQLテンプレート。
Redis固有設定
servers:カンマ区切りのhost:portリスト。センチネルモードの場合はredis_type = "sentinel"とし、sentinelにマスター名を設定。redis_type:single、sentinel、clusterのいずれか。username、password、database:認証情報および論理DB(クラスターモードではdatabaseは無視されます)。message_key_prefix(デフォルトmqtt:msg)、subscription_key_prefix(デフォルトmqtt:sub):キーのネームスペース。message_ttl:メッセージごとのTTL(秒)。TTLを超えたメッセージはトピックごとのソート済みセットインデックスからクリーンアップされます。
データベーススキーマ
MySQL
CREATE TABLE IF NOT EXISTS `mqtt_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`msgid` varchar(64) DEFAULT NULL,
`topic` varchar(180) NOT NULL,
`sender` varchar(64) DEFAULT NULL,
`qos` tinyint(1) NOT NULL DEFAULT '0',
`retain` tinyint(1) DEFAULT NULL,
`payload` blob,
`arrived` datetime NOT NULL,
PRIMARY KEY (`id`),
INDEX topic_index(`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;
CREATE TABLE IF NOT EXISTS `mqtt_sub` (
`clientid` varchar(64) NOT NULL,
`topic` varchar(180) NOT NULL,
`qos` tinyint(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`clientid`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;init_default_schema = true に設定するとプラグイン起動時にこれらのテーブルを作成します。そうでない場合はバックエンドを有効にする前に自分で作成してください。
Redis
Redisの構造はオンデマンドで作成されるため、スキーママイグレーションは不要です。
mqtt:sub:{clientid}:ハッシュ、{topic} -> {qos}。mqtt:msg:{msgid}:ハッシュ、フィールドはid、from、qos、topic、payload、ts、retain。msgidはbase62エンコードされています。mqtt:msg:{topic}:ソート済みセット、メンバーはbase62のメッセージID、スコアはタイムスタンプ。期限切れクリーンアップに使用。
Redis ACLを使用している場合は、接続ユーザーに対して mqtt:sub:* および mqtt:msg:* にマッチするキーに対し、HSET、HDEL、HGETALL、HMSET、DEL、EXPIRE、ZADD、ZRANGE、ZREMRANGEBYSCORE、ZREM の権限を付与してください。
インストール
以下のダウンロードセクションからご利用のEMQXバージョンに対応したtarballを取得し、ダッシュボード、REST API、またはCLIからインストールしてください。
REST APIを使う場合:
curl -u key:secret -X POST http://localhost:18083/api/v5/plugins/install \
-H "Content-Type: multipart/form-data" \
-F "plugin=@emqx_offline_messages-<version>.tar.gz"プラグインを起動します(ダッシュボードまたは emqx ctl plugins start emqx_offline_messages-<version>)。その後、ダッシュボードのプラグイン詳細ページを開いてバックエンドを設定してください。
動作確認
プラグインが追跡しているトピックに対して、クライアントがサブスクライブしていない状態でメッセージをいくつかパブリッシュします。
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline1'
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline2'
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline3'その後、新しいクライアントIDでサブスクライブすると、プラグインに保存されたメッセージが再生されます。
ダウンロード
各EMQXリリース向けのtarball:
| EMQXバージョン | プラグインバージョン | パッケージ |
|---|---|---|
| 5.10.4 | 2.0.0 | emqx_offline_messages-2.0.0.tar.gz |