Skip to content

オフラインメッセージ

このプラグインはMQTTメッセージを外部データベースにパーシステンスし、サブスクライバーが再接続後にメッセージを取得できるようにします。パブリッシュ時に切断されていた場合でもメッセージを取り出せます。

標準のMQTTセッションパーシステンスでは不十分なケース、例えばセッションを超えてメッセージを保持したい場合や、他のシステムでもメッセージ履歴を参照する必要がある場合に有用です。

対応バックエンド:

  • MySQL:2つのテーブル(mqtt_msgmqtt_sub)を使用し、SQL文は設定可能です。
  • Redis:サブスクリプション/メッセージはハッシュで管理し、トピックごとのインデックスはソート済みセットを使用します。シングル、センチネル、クラスターデプロイに対応しています。

同時に有効にできるバックエンドは1つのみで、mysql.enable または redis.enable のいずれかを有効にしてください。

プラグイン設定

プラグインには両バックエンドをカバーするデフォルトの config.hocon が付属しています。ダッシュボードのプラグイン詳細ページも同じスキーマを表示するため、ほとんどの運用者はファイルを直接編集せずUIから設定します。

共通のトップレベルフィールド:

  • topics:プラグインが追跡するトピックフィルターのリスト。空リストの場合、そのバックエンドは無効になります。
  • pool_size:バックエンドへの接続プールサイズ。
  • batch_size / batch_time:書き込みバッチのパラメータ(両方を 1 / 0 に設定するとバッチ処理を無効化)。
  • ssl.*:バックエンド接続のTLS設定。

MySQL固有設定

  • serverhost:port
  • usernamepassworddatabase:認証情報。
  • init_default_schematrue の場合、プラグイン起動時にデフォルトの mqtt_msg / mqtt_sub テーブルを作成します。スキーマを自分で管理する場合は false にしてください。
  • insert_message_sqldelete_message_sqlselect_message_sqlinsert_subscription_sqlselect_subscriptions_sqldelete_subscription_sql:MQTTメッセージフィールド(idfromtopicqospayloadflags.retaintimestamp)およびサブスクリプションフィールド(clientidtopicqos)にバインドされる ${var} プレースホルダーを含む、上書き可能なSQLテンプレート。

Redis固有設定

  • servers:カンマ区切りの host:port リスト。センチネルモードの場合は redis_type = "sentinel" とし、sentinel にマスター名を設定。
  • redis_typesinglesentinelcluster のいずれか。
  • usernamepassworddatabase:認証情報および論理DB(クラスターモードでは database は無視されます)。
  • message_key_prefix(デフォルト mqtt:msg)、subscription_key_prefix(デフォルト mqtt:sub):キーのネームスペース。
  • message_ttl:メッセージごとのTTL(秒)。TTLを超えたメッセージはトピックごとのソート済みセットインデックスからクリーンアップされます。

データベーススキーマ

MySQL

sql
CREATE TABLE IF NOT EXISTS `mqtt_msg` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `msgid` varchar(64) DEFAULT NULL,
  `topic` varchar(180) NOT NULL,
  `sender` varchar(64) DEFAULT NULL,
  `qos` tinyint(1) NOT NULL DEFAULT '0',
  `retain` tinyint(1) DEFAULT NULL,
  `payload` blob,
  `arrived` datetime NOT NULL,
  PRIMARY KEY (`id`),
  INDEX topic_index(`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

CREATE TABLE IF NOT EXISTS `mqtt_sub` (
  `clientid` varchar(64) NOT NULL,
  `topic` varchar(180) NOT NULL,
  `qos` tinyint(1) NOT NULL DEFAULT '0',
  PRIMARY KEY (`clientid`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

init_default_schema = true に設定するとプラグイン起動時にこれらのテーブルを作成します。そうでない場合はバックエンドを有効にする前に自分で作成してください。

Redis

Redisの構造はオンデマンドで作成されるため、スキーママイグレーションは不要です。

  • mqtt:sub:{clientid}:ハッシュ、{topic} -> {qos}
  • mqtt:msg:{msgid}:ハッシュ、フィールドは idfromqostopicpayloadtsretainmsgid はbase62エンコードされています。
  • mqtt:msg:{topic}:ソート済みセット、メンバーはbase62のメッセージID、スコアはタイムスタンプ。期限切れクリーンアップに使用。

Redis ACLを使用している場合は、接続ユーザーに対して mqtt:sub:* および mqtt:msg:* にマッチするキーに対し、HSETHDELHGETALLHMSETDELEXPIREZADDZRANGEZREMRANGEBYSCOREZREM の権限を付与してください。

インストール

以下のダウンロードセクションからご利用のEMQXバージョンに対応したtarballを取得し、ダッシュボード、REST API、またはCLIからインストールしてください。

REST APIを使う場合:

bash
curl -u key:secret -X POST http://localhost:18083/api/v5/plugins/install \
  -H "Content-Type: multipart/form-data" \
  -F "plugin=@emqx_offline_messages-<version>.tar.gz"

プラグインを起動します(ダッシュボードまたは emqx ctl plugins start emqx_offline_messages-<version>)。その後、ダッシュボードのプラグイン詳細ページを開いてバックエンドを設定してください。

動作確認

プラグインが追跡しているトピックに対して、クライアントがサブスクライブしていない状態でメッセージをいくつかパブリッシュします。

bash
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline1'
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline2'
mqttx pub -q 1 -t 't/2' -m 'hello-from-offline3'

その後、新しいクライアントIDでサブスクライブすると、プラグインに保存されたメッセージが再生されます。

ダウンロード

各EMQXリリース向けのtarball:

EMQXバージョンプラグインバージョンパッケージ
5.10.42.0.0emqx_offline_messages-2.0.0.tar.gz