Skip to content

データ統合を使用したデバイスのオフラインメッセージの保存

温度と湿度のデータをシミュレートし、MQTTプロトコルを通じてEMQX Cloudに報告します。その後、EMQX Cloudのデータ統合を使用してオフラインメッセージをクラウドサービスリソース(サードパーティのデータベースやメッセージキュー)に保存します。本記事では、例としてRedisを使用して実装します。

WARNING

オフラインメッセージを保存するにはQoS > 0が必要です

開始する前に、以下の操作を完了してください:

  • EMQX Cloud上にデプロイメント(EMQXクラスター)が作成されていること。
  • Dedicatedプランのユーザーの場合:ピアリング接続の作成を先に完了してください。以下に記載するすべてのIPはリソースの内部ネットワークIPを指します。(DedicatedプランでNATゲートウェイを使用している場合は、パブリックIPでリソースに接続可能です)
  • BYOCプランのユーザーの場合:BYOCがデプロイされているVPCとリソースがあるVPC間でピアリング接続を確立してください。以下のすべてのIPはリソースの内部IPを指します。パブリックIP経由でリソースにアクセスする必要がある場合は、BYOCがデプロイされているVPCのパブリッククラウドコンソールでNATゲートウェイを設定してください。

Redisの設定

  1. Redisのインストール

    bash
    docker run -itd --name redis -p 6379:6379 redis

データ統合の設定

デプロイメントの詳細画面に移動し、EMQXダッシュボードをクリックしてダッシュボードにアクセスします。

  1. リソースの新規作成

    左メニューの「ルール」→「リソース」をクリックし、「新規リソース」をクリックしてドロップダウンからRedisのシングルノードモードのリソースタイプを選択します。先ほど作成したRedisの情報を入力し、「テスト」をクリックします。エラーが出た場合は、データベースの設定が正しいか即座に確認してください。 リソース作成

  2. ルールのテスト

    左メニューの「ルール」→「ルール」をクリックし、「作成」をクリックして以下のルールを入力し、SQL文をマッチさせます。トピックが temp_hum/emqx の場合にメッセージ情報を読み出します。

    ルールSQLのFROM文の説明:

    • temp_hum/emqx: パブリッシャーが "temp_hum/emqx" にメッセージをパブリッシュすると、オフラインメッセージのRedisへの保存がトリガーされます。
    • $events/session_subscribed: サブスクライバーがトピック "temp_hum/emqx" をサブスクライブすると、オフラインメッセージの取得がトリガーされます。
    • $events/message_acked: サブスクライバーがメッセージのACKを返信すると、受信済みのオフラインメッセージの削除がトリガーされます。
    sql
    SELECT
        *
    FROM
        "temp_hum/emqx",
        "$events/session_subscribed",
        "$events/message_acked"
    WHERE
        topic =~ 'temp_hum/emqx'

    SQLテスト

  3. アクションの追加

    左下の「アクションを追加」をクリックし、ドロップダウンから「オフラインメッセージ」→「Redisにオフラインメッセージを保存」を選択し、最初に作成したリソースを選択します。

    TIP

    ここでRedisのキーの有効期限を計画する必要があります。オフラインメッセージは100件以下の保存を推奨します。

    アクション作成

  4. 「新規ルール」をクリックし、ルール一覧に戻ります。 ルール一覧

  5. ルールの監視を確認します。 監視画面

テスト

  1. MQTTXを使用して温度と湿度のデータ報告をシミュレートします。

    broker.emqx.ioを作成したデプロイメントの接続アドレスに置き換え、EMQXダッシュボードでクライアント認証情報を追加してください。

    TIP

    メッセージ送信にはQoS > 0が必要です

    MQTTX

  2. データダンプ結果の確認

    bash
    $ docker exec -it redis bash
    $ redis-cli
    $ keys *

    redis

  3. MQTTXを使用してオフラインデータを消費します。

    MQTTXでトピック temp_hum/emqx をサブスクライブしてオフラインデータを取得します。

    TIP

    サブスクライブするトピックのQoSは0より大きくする必要があります。そうしないとメッセージが繰り返し受信されます。

    mqttxmqttx

  4. Redisで消費済みデータを確認します。

    bash
    $ docker exec -it redis bash
    $ redis-cli
    $ keys *

    redis