データ統合を使用したデバイスのオフラインメッセージの保存
温度と湿度のデータをシミュレートしてMQTTプロトコル経由でEMQX Cloudに報告し、その後EMQX Cloudのデータ統合を利用してオフラインメッセージをクラウドサービスリソース(サードパーティのデータベースやメッセージキュー)に保存します。本記事ではRedisを例として実装します。
WARNING
オフラインメッセージを保存するにはQoS > 0が必要です
開始する前に、以下の操作を完了してください:
- EMQX Cloud上にデプロイメント(EMQXクラスター)が作成されていること。
- Dedicated Flexプランユーザーの場合:ピアリング接続の作成を先に完了してください。以下に記載のIPはすべてリソースの内部ネットワークIPを指します。(Dedicated FlexプランでNATゲートウェイを利用している場合は、パブリックIPでリソースに接続可能です)
- BYOCプランユーザーの場合:BYOCがデプロイされているVPCとリソースが存在するVPC間でピアリング接続を確立してください。以下に記載のIPはすべてリソースの内部IPを指します。パブリックIP経由でリソースにアクセスする必要がある場合は、BYOCがデプロイされているVPCのパブリッククラウドコンソールでNATゲートウェイを設定してください。
Redisの設定
Redisのインストール
bashdocker run -itd --name redis -p 6379:6379 redis
データ統合の設定
デプロイメント詳細からEMQXダッシュボードにアクセスします。
リソースの新規作成
左メニューのRules → Resourcesをクリックし、「New Resource」をクリックしてドロップダウンからRedisのシングルノードモードのリソースタイプを選択します。先ほど作成したRedisの情報を入力し、「Test」をクリックします。エラーが出た場合は、データベースの設定が正しいか即座に確認してください。

ルールのテスト
左メニューのRules → Rulesをクリックし、「Create」をクリックして以下のSQL文を入力します。トピックが
temp_hum/emqxの場合にメッセージ情報を読み出します。ルールSQLのFROM文の説明:
temp_hum/emqx:パブリッシャーが"temp_hum/emqx"にメッセージをパブリッシュすると、オフラインメッセージのRedisへの保存がトリガーされます。$events/session_subscribed:サブスクライバーがトピック"temp_hum/emqx"をサブスクライブすると、オフラインメッセージの取得がトリガーされます。$events/message_acked:サブスクライバーがメッセージのACKを返信すると、受信済みのオフラインメッセージの削除がトリガーされます。
sqlSELECT * FROM "temp_hum/emqx", "$events/session_subscribed", "$events/message_acked" WHERE topic =~ 'temp_hum/emqx'
応答アクションの追加
左下の「Add Action」をクリックし、ドロップダウンから → Offline Message → Save Offline Message to Redis を選択し、最初に作成したリソースを選択します。
TIP
Redisのキーの有効期限を計画する必要があります。オフラインメッセージは100件以内の保存を推奨します。

「New Rule」をクリックしてルール一覧に戻ります。

ルールのモニタリングを確認します。

テスト
MQTTXを使用して温度・湿度データの報告をシミュレートします。
broker.emqx.ioを作成したデプロイメントの接続アドレスに置き換え、EMQXダッシュボードでクライアント認証情報を追加してください。
TIP
メッセージ送信にはQoS > 0が必要です

データのダンプ結果を確認します。
bash$ docker exec -it redis bash $ redis-cli $ keys *
MQTTXを使用してオフラインデータを消費します。
MQTTXでトピック
temp_hum/emqxをサブスクライブしてオフラインデータを取得します。TIP
サブスクライブするトピックのQoSは0より大きくする必要があります。そうしないとメッセージが繰り返し受信されます。


Redisで消費済みデータを確認します。
bash$ docker exec -it redis bash $ redis-cli $ keys *