Skip to content

RedisへのMQTTデータ取り込み

Redisは、オープンソースのインメモリデータストアであり、データベース、キャッシュ、ストリーミングエンジン、メッセージブローカーとして数百万の開発者に利用されています。EMQXはRedisとの連携をサポートしており、MQTTメッセージやクライアントイベントをRedisに保存できます。Redisとのデータ連携により、メッセージのキャッシュやクライアントイベントの統計にRedisを活用できます。

本ページでは、EMQXとRedis間のデータ連携の詳細な概要と、実際の連携作成および検証手順を解説します。

動作概要

Redisデータ連携はEMQXの標準機能であり、EMQXのリアルタイムデータ取得・転送能力とRedisの豊富なデータ構造および高性能なKey-Value読み書き性能を組み合わせています。組み込みのルールエンジンコンポーネントにより、複雑なコーディングなしでEMQXからRedisへのデータ取り込みを簡素化しています。

以下の図は、EMQXとRedis間の典型的なデータ連携アーキテクチャを示しています。

EMQX Integration Redis

MQTTデータのRedis取り込みは以下のように動作します:

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXに接続し、機械、センサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイムMQTTデータをEMQXにパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータ処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールによって処理されます。ルールは事前定義された条件に基づき、Redisにルーティングすべきメッセージを判別します。ペイロード変換が指定されている場合は、データ形式の変換や特定情報のフィルタリング、追加コンテキストの付加などの変換処理が適用されます。
  3. Redisへのデータ取り込み:ルールエンジンがデータを処理後、プリセットされたRedisコマンドを実行してデータのキャッシュやカウントなどの操作を行います。
  4. データの保存と活用:Redisに保存されたデータを読み取ることで、企業はRedisの豊富なデータ操作機能を活用し、様々なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得やGPS地理位置解析、リアルタイムデータ分析やソートなどの操作が可能で、リアルタイム追跡やルート推薦などの機能を支援します。

特長とメリット

Redisとのデータ連携は、効率的なデータ伝送、処理、活用を実現するための多彩な特長とメリットを提供します:

  • 高性能かつスケーラブル:EMQXの分散アーキテクチャとRedisのクラスターモードにより、データ量の増加に応じてアプリケーションをシームレスにスケールできます。大規模データセットでも一貫した性能と応答性を維持します。
  • リアルタイムデータストリーム:EMQXはリアルタイムデータストリーム処理に特化しており、デバイスからRedisへの効率的かつ信頼性の高いデータ伝送を保証します。Redisは高速なデータ操作を実行でき、リアルタイムデータキャッシュのニーズに応え、EMQXの理想的なデータ保存コンポーネントとなります。
  • リアルタイムデータ分析:Redisはリアルタイムのデバイス接続数、メッセージパブリッシュ数、特定の業務指標などのメトリクス計算に利用できます。EMQXはリアルタイムメッセージ伝送と処理を担い、データ分析のためのリアルタイムデータ入力を提供します。
  • 地理位置解析:Redisは地理空間データ構造とコマンドを備え、地理位置情報の保存と検索が可能です。EMQXの強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様なIoTアプリケーションに広く応用できます。

事前準備

このセクションでは、Redisデータ連携を作成する前に必要な準備とRedisサーバーのセットアップ方法を説明します。

前提条件

Redisサーバーのインストール

Dockerを使ってRedisをインストールし、起動します:

bash
# Redisコンテナを起動し、パスワードをpublicに設定
docker run --name redis -p 6379:6379 -d redis --requirepass "public"

# コンテナにアクセス
docker exec -it redis bash

# Redisサーバーにアクセスし、AUTHコマンドで認証
redis-cli
127.0.0.1:6379> AUTH public
OK

# インストールの確認
127.0.0.1:6379> set emqx "Hello World"
OK
127.0.0.1:6379> get emqx
"Hello World"

これでRedisのインストールが完了し、SETおよびGETコマンドで動作確認ができました。RedisのコマンドについてはRedis Commandsをご参照ください。

コネクターの作成

このセクションでは、Redis SinkをRedisサーバーに接続するためのコネクター作成方法を説明します。

以下の手順は、EMQXとRedisをローカルマシンで実行していることを前提としています。Redisが別の場所にデプロイされている場合は設定を適宜調整してください。

  1. ダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでRedisを選択し、Nextをクリックします。
  4. コネクター名を入力します。名前は英数字の組み合わせとしてください。例:my_redis
  5. ビジネスニーズに応じてRedis Modeを設定します。例:single
  6. 接続情報を入力します。
    • Server Host127.0.0.1:6379を入力
    • Usernameadminを入力
    • Passwordpublicを入力
    • Database ID0を入力
    • その他のオプションはビジネスニーズに応じて設定してください。
    • 暗号化接続を行う場合は、Enable TLSのトグルスイッチをオンにします。TLS接続の詳細はTLS for External Resource Accessを参照してください。
  7. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがRedisサーバーに接続できるかテストできます。
  8. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成を続けられます。詳細はCreate a Rule and Redis Sinkを参照してください。

Redis Sink付きルールの作成

このセクションでは、各クライアントの最新メッセージをキャッシュし、メッセージ破棄の統計を収集するルールの作成方法を説明します。

メッセージキャッシュと統計機能には、それぞれ別のRedis Sinkを作成する必要があります。作成するSinkの種類に応じて、以下のRedis Command Template設定手順に従ってください。

  1. EMQXダッシュボードで、Integration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにcache_to_redisを入力し、使用する機能に応じてSQL Editorにルールを設定します:

    • メッセージキャッシュ用ルールを作成する場合、以下の文を入力します。これはトピックt/#配下のMQTTメッセージをRedisに保存することを意味します。

      注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドをSELECT部分に含めてください。

      bash
      SELECT
        *
      FROM
        "t/#"
    • メッセージ破棄統計用ルールを作成する場合、以下の文を入力します。

      bash
      SELECT
        *
      FROM
        "$events/message_dropped", "$events/delivery_dropped"

      EMQXルールは2種類のメッセージ破棄イベントを定義しており、これらのイベントでルールをトリガーしRedisに記録できます:

      イベントトピックパラメーター
      転送中にメッセージが破棄される$events/message_dropped$events/message_dropped
      配信中にメッセージが破棄される$events/delivery_dropped$events/delivery_dropped

    TIP

    初心者の方は、SQL Examplesをクリックし、Enable TestをオンにしてSQLルールの学習とテストを行うことを推奨します。

    • Add Actionボタンをクリックして、ルールによりトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをRedisに送信します。
  4. Type of ActionのドロップダウンリストからRedisを選択します。ActionはデフォルトのCreate Actionのままにします。すでにSinkを作成済みの場合はそれを選択することも可能です。本デモでは新規Sinkを作成します。

  5. Sinkの名前を入力します。名前は英数字の組み合わせとしてください。

  6. Connectorのドロップダウンからmy_redisを選択します。隣のボタンから新規コネクターを作成することも可能です。設定パラメーターの詳細はCreate a Connectorを参照してください。

  7. 使用する機能に応じてRedis Command Templateを設定します:

    • メッセージキャッシュ用Sinkを作成する場合、RedisのHSETコマンドとハッシュデータ構造を使い、clientidをキーとしてusernamepayloadtimestampなどのフィールドを保存します。Redis内の他のキーと区別するため、emqx_messagesプレフィックスを付け、コロンで区切ります。

      bash
      # HSET key filed value [field value...]
      HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
    • メッセージ破棄統計用Sinkを作成する場合、以下のHINCRBYコマンドを使い、各トピックで破棄されたメッセージ数を集計します。

      bash
      # HINCRBY key field increment
      HINCRBY emqx_message_dropped_count ${topic} 1

      このコマンドが実行されるたびに、対応するカウンターが1ずつ増加します。

  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はFallback Actionsを参照してください。

  9. 詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はFeatures of Sinkを参照してください。

  10. Createをクリックする前に、Test ConnectivityをクリックしてSinkがRedisサーバーに接続できるかテストできます。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認します。Createボタンをクリックしてルールを生成します。

これでRedis Sink付きルールの作成が完了しました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいRedis Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーを確認でき、トピックt/#配下のメッセージがルールmy_ruleで解析された後にRedisへ送信・保存されていることがわかります。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、メッセージキャッシュイベントをトリガーします。もしトピックt/1にサブスクライバーがいなければ、メッセージは破棄され、メッセージ破棄ルールがトリガーされます。

bash
mqttx pub -i emqx_c -u emqx_u -t t/1 -m '{ "msg": "hello Redis" }'

2つのSinkの稼働状況を確認すると、新たに1件のMatchedと1件のSent Successfullyメッセージがあるはずです。

メッセージがキャッシュされているか確認します。

bash
127.0.0.1:6379> HGETALL emqx_messages:emqx_c
1) "username"
2) "emqx_u"
3) "payload"
4) "{ \"msg\": \"hello Redis\" }"
5) "timestamp"
6) "1675263885119"

テストを再度実行すると、timestampフィールドが更新されているはずです。

破棄されたメッセージが集計されているか確認します:

bash
127.0.0.1:6379> HGETALL emqx_message_dropped_count
1) "t/1"
2) "1"

テストを繰り返すと、t/1に対応するカウンターの数値も増加します。