Skip to content

データ統合を使用したデバイスのオフラインメッセージの保存

温度と湿度のデータをシミュレートしてMQTTプロトコル経由でEMQX Cloudに報告し、その後EMQX Cloudのデータ統合を利用してオフラインメッセージをクラウドサービスリソース(サードパーティのデータベースやメッセージキュー)に保存します。本記事ではRedisを例として実装します。

WARNING

オフラインメッセージを保存するにはQoS > 0が必要です

開始する前に、以下の操作を完了してください:

  • EMQX Cloud上にデプロイメント(EMQXクラスター)が作成されていること。
  • Dedicated Flexプランユーザーの場合:ピアリング接続の作成を先に完了してください。以下に記載のIPはすべてリソースの内部ネットワークIPを指します。(Dedicated FlexプランでNATゲートウェイを利用している場合は、パブリックIPでリソースに接続可能です)
  • BYOCプランユーザーの場合:BYOCがデプロイされているVPCとリソースが存在するVPC間でピアリング接続を確立してください。以下に記載のIPはすべてリソースの内部IPを指します。パブリックIP経由でリソースにアクセスする必要がある場合は、BYOCがデプロイされているVPCのパブリッククラウドコンソールでNATゲートウェイを設定してください。

Redisの設定

  1. Redisのインストール

    bash
    docker run -itd --name redis -p 6379:6379 redis

データ統合の設定

デプロイメント詳細からEMQXダッシュボードにアクセスします。

  1. リソースの新規作成

    左メニューのRules → Resourcesをクリックし、「New Resource」をクリックしてドロップダウンからRedisのシングルノードモードのリソースタイプを選択します。先ほど作成したRedisの情報を入力し、「Test」をクリックします。エラーが出た場合は、データベースの設定が正しいか即座に確認してください。 リソース作成

  2. ルールのテスト

    左メニューのRules → Rulesをクリックし、「Create」をクリックして以下のSQL文を入力します。トピックがtemp_hum/emqxの場合にメッセージ情報を読み出します。

    ルールSQLのFROM文の説明:

    • temp_hum/emqx:パブリッシャーが"temp_hum/emqx"にメッセージをパブリッシュすると、オフラインメッセージのRedisへの保存がトリガーされます。
    • $events/session_subscribed:サブスクライバーがトピック"temp_hum/emqx"をサブスクライブすると、オフラインメッセージの取得がトリガーされます。
    • $events/message_acked:サブスクライバーがメッセージのACKを返信すると、受信済みのオフラインメッセージの削除がトリガーされます。
    sql
    SELECT
        *
    FROM
        "temp_hum/emqx",
        "$events/session_subscribed",
        "$events/message_acked"
    WHERE
        topic =~ 'temp_hum/emqx'

    SQLテスト

  3. 応答アクションの追加

    左下の「Add Action」をクリックし、ドロップダウンから → Offline Message → Save Offline Message to Redis を選択し、最初に作成したリソースを選択します。

    TIP

    Redisのキーの有効期限を計画する必要があります。オフラインメッセージは100件以内の保存を推奨します。

    アクション作成

  4. 「New Rule」をクリックしてルール一覧に戻ります。 ルール一覧

  5. ルールのモニタリングを確認します。 モニタリング

テスト

  1. MQTTXを使用して温度・湿度データの報告をシミュレートします。

    broker.emqx.ioを作成したデプロイメントの接続アドレスに置き換え、EMQXダッシュボードでクライアント認証情報を追加してください。

    TIP

    メッセージ送信にはQoS > 0が必要です

    MQTTX

  2. データのダンプ結果を確認します。

    bash
    $ docker exec -it redis bash
    $ redis-cli
    $ keys *

    redis

  3. MQTTXを使用してオフラインデータを消費します。

    MQTTXでトピックtemp_hum/emqxをサブスクライブしてオフラインデータを取得します。

    TIP

    サブスクライブするトピックのQoSは0より大きくする必要があります。そうしないとメッセージが繰り返し受信されます。

    mqttxmqttx

  4. Redisで消費済みデータを確認します。

    bash
    $ docker exec -it redis bash
    $ redis-cli
    $ keys *

    redis