Skip to content

RedisへのMQTTデータ取り込み

Redisは、オープンソースのインメモリデータストアであり、データベース、キャッシュ、ストリーミングエンジン、メッセージブローカーとして数百万の開発者に利用されています。EMQXはRedisとの統合をサポートしており、MQTTメッセージやクライアントイベントをRedisに保存できます。Redisとのデータ統合により、メッセージのキャッシュやクライアントイベントの統計にRedisを活用できます。

本ページでは、EMQXとRedis間のデータ統合について詳細に解説し、実際の作成および検証手順を示します。

動作概要

Redisデータ統合はEMQXの標準機能であり、EMQXのリアルタイムデータキャプチャと転送能力を、Redisの豊富なデータ構造と高性能なキー・バリュー読み書き性能と組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからRedisへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとRedis間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Integration Redis

RedisへのMQTTデータ取り込みは以下のように動作します。

  1. メッセージのパブリッシュおよび受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXに正常に接続し、機械、センサー、製造ラインの稼働状態、計測値、またはトリガーイベントに基づくリアルタイムMQTTデータをEMQXにパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータ処理:メッセージが到着するとルールエンジンを通過し、EMQXで定義されたルールにより処理されます。ルールは事前定義された条件に基づき、どのメッセージをRedisにルーティングするかを決定します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. Redisへのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作のために事前設定されたRedisコマンドを実行するアクションがトリガーされます。
  4. データの保存と活用:Redisに保存されたデータを読み取ることで、企業はRedisの豊富なデータ操作機能を活用し、多様なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得やGPS地理位置情報の分析、リアルタイムデータ分析やソートなどの操作が可能となり、リアルタイム追跡やルート推奨などの機能を支援します。

特長と利点

Redisとのデータ統合は、効率的なデータ伝送、処理、活用を実現するための多くの特長と利点を提供します。

  • 高性能かつスケーラブル:EMQXの分散アーキテクチャとRedisのクラスター構成により、データ量の増加に応じてアプリケーションをシームレスにスケールできます。大規模データセットでも一貫した性能と応答性を確保します。
  • リアルタイムデータストリーム:EMQXはリアルタイムデータストリームの処理に特化しており、デバイスからRedisへの効率的かつ信頼性の高いデータ伝送を実現します。Redisは高速なデータ操作を実行できるため、リアルタイムデータキャッシュに最適なデータストレージコンポーネントです。
  • リアルタイムデータ分析:Redisはデバイス接続数やメッセージパブリッシュ数、特定のビジネス指標などのリアルタイムメトリクスを計算可能です。一方、EMQXはリアルタイムメッセージの伝送と処理を担い、データ分析のためのリアルタイムデータ入力を提供します。
  • 地理位置情報分析:Redisは地理空間データ構造とコマンドを提供し、地理位置情報の保存とクエリが可能です。EMQXの強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様なIoTアプリケーションに広く応用できます。

はじめる前に

このセクションでは、Redisデータ統合の作成を開始する前に必要な準備事項とRedisサーバーのセットアップ方法を説明します。

前提条件

Redisサーバーのインストール

Dockerを使ってRedisをインストールし、起動します。

bash
# Redisコンテナを起動し、パスワードをpublicに設定
docker run --name redis -p 6379:6379 -d redis --requirepass "public"

# コンテナにアクセス
docker exec -it redis bash

# Redisサーバーにアクセスし、AUTHコマンドで認証
redis-cli
127.0.0.1:6379> AUTH public
OK

# インストールの検証
127.0.0.1:6379> set emqx "Hello World"
OK
127.0.0.1:6379> get emqx
"Hello World"

これでRedisのインストールが完了し、SETおよびGETコマンドで検証できました。その他のRedisコマンドについてはRedis Commandsを参照してください。

コネクターの作成

このセクションでは、Redis SinkをRedisサーバーに接続するためのコネクター作成方法を示します。

以下の手順は、EMQXとRedisをローカルマシンで実行していることを前提としています。Redisが別の場所にデプロイされている場合は設定を適宜調整してください。

  1. ダッシュボードに入り、Integration -> Connectorsをクリックします。
  2. ページ右上のCreateをクリックします。
  3. Create ConnectorページでRedisを選択し、Nextをクリックします。
  4. コネクター名を入力します。名前は英数字の組み合わせとしてください(例:my_redis)。
  5. ビジネスニーズに応じてRedis Modeを設定します(例:single)。
  6. 接続情報を入力します。
    • Server Host127.0.0.1:6379を入力します。
    • Usernameadminを入力します。
    • Passwordpublicを入力します。
    • Database ID0を入力します。
    • その他のオプションはビジネスニーズに応じて設定してください。
    • 暗号化接続を確立したい場合は、Enable TLSのトグルスイッチをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
  7. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがRedisサーバーに接続できるかテストできます。
  8. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成に進めます。詳細はルールとRedis Sinkの作成を参照してください。

Redis Sinkを用いたルールの作成

このセクションでは、各クライアントの最新メッセージをキャッシュし、メッセージ破棄の統計を収集するルールの作成方法を示します。

メッセージキャッシュと統計機能のために、2つの別々のRedis Sinkを作成する必要があります。作成するSinkの種類に応じて、以下のRedisコマンドテンプレートの設定手順に従ってください。

  1. EMQXダッシュボードで、Integration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにcache_to_redisを入力し、利用する機能に応じてSQL Editorにルールを設定します。

    • メッセージキャッシュ用ルールを作成する場合、以下の文を入力します。これはトピックt/#配下のMQTTメッセージをRedisに保存することを意味します。

      注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドをSELECT部分に含めてください。

      bash
      SELECT
        *
      FROM
        "t/#"
    • メッセージ破棄統計用ルールを作成する場合、以下の文を入力します。

      bash
      SELECT
        *
      FROM
        "$events/message_dropped", "$events/delivery_dropped"

      EMQXルールは2種類のメッセージ破棄イベントを定義しており、これらをトリガーとしてRedisに記録できます。

      イベントトピックパラメーター
      転送中にメッセージが破棄される$events/message_dropped$events/message_dropped
      配信中にメッセージが破棄される$events/delivery_dropped$events/delivery_dropped

    TIP

    初心者の方は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことをおすすめします。

    • Add Actionボタンをクリックして、ルールによってトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをRedisに送信します。
  4. Type of ActionのドロップダウンリストからRedisを選択します。ActionはデフォルトのCreate Actionのままにします。既に作成済みのSinkを選択することもできますが、本デモでは新規Sinkを作成します。

  5. Sinkの名前を入力します。名前は英数字の組み合わせとしてください。

  6. Connectorのドロップダウンボックスからmy_redisを選択します。ドロップダウン横のボタンから新規コネクターを作成することも可能です。設定パラメーターの詳細はコネクターの作成を参照してください。

  7. 利用する機能に応じてRedis Command Templateを設定します。

    • メッセージキャッシュ用Sinkを作成する場合、RedisのHSETコマンドとハッシュデータ構造を使用してメッセージを保存します。データ形式はclientidをキーとし、usernamepayloadtimestampなどのフィールドを格納します。Redis内の他のキーと区別するため、emqx_messagesプレフィックスを付け、:で区切ります。

      bash
      # HSET key filed value [field value...]
      HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
    • メッセージ破棄統計用Sinkを作成する場合、以下のHINCRBYコマンドを使用して、各トピックごとの破棄メッセージ数を集計します。

      bash
      # HINCRBY key field increment
      HINCRBY emqx_message_dropped_count ${topic} 1

      このコマンドが実行されるたびに、対応するカウンターが1ずつ増加します。

  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はフォールバックアクションを参照してください。

  9. 詳細設定(任意):必要に応じてsyncまたはasyncクエリモードを選択します。詳細はSinkの特長を参照してください。

  10. Createをクリックする前に、Test ConnectivityをクリックしてSinkがRedisサーバーに接続できるかテストできます。

  11. CreateボタンをクリックしてSink設定を完了します。新しいSinkがAction Outputsに追加されます。

  12. Create Ruleページに戻り、設定内容を確認します。Createボタンをクリックしてルールを生成します。

これでRedis Sink用のルールが正常に作成されました。Integration -> Rulesページで新規ルールを確認できます。**Actions(Sink)**タブをクリックすると新しいRedis Sinkが表示されます。

また、Integration -> Flow Designerをクリックしてトポロジーを確認すると、トピックt/#配下のメッセージがルールmy_ruleで解析されRedisに送信・保存されていることがわかります。

ルールのテスト

MQTTXを使ってトピックt/1にメッセージを送信し、メッセージキャッシュイベントをトリガーします。もしトピックt/1にサブスクライバーがいなければ、メッセージは破棄され、メッセージ破棄ルールがトリガーされます。

bash
mqttx pub -i emqx_c -u emqx_u -t t/1 -m '{ "msg": "hello Redis" }'

2つのSinkの稼働状況を確認すると、1件の新規マッチと1件の正常送信があるはずです。

メッセージがキャッシュされているか確認します。

bash
127.0.0.1:6379> HGETALL emqx_messages:emqx_c
1) "username"
2) "emqx_u"
3) "payload"
4) "{ \"msg\": \"hello Redis\" }"
5) "timestamp"
6) "1675263885119"

テストを再実行すると、timestampフィールドが更新されているはずです。

破棄されたメッセージが集計されているか確認します。

bash
127.0.0.1:6379> HGETALL emqx_message_dropped_count
1) "t/1"
2) "1"

テストを繰り返すと、t/1に対応するカウンターの数値も増加します。