Skip to content

MQTTデータをRedisに取り込む

Redisは、オープンソースのインメモリデータストアであり、データベース、キャッシュ、ストリーミングエンジン、メッセージブローカーとして数百万の開発者に利用されています。EMQXはRedisとの統合をサポートしており、MQTTメッセージやクライアントイベントをRedisに保存できます。Redisデータ統合により、メッセージのキャッシュやクライアントイベントの統計にRedisを活用できます。

本ページでは、EMQXプラットフォームとRedis間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。

動作概要

Redisデータ統合はEMQXプラットフォームの標準機能であり、EMQXプラットフォームのリアルタイムデータキャプチャおよび送信機能と、Redisの豊富なデータ構造および高性能なキー・バリューの読み書き性能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからRedisへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。

以下の図は、EMQXプラットフォームとRedis間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Platform Integration Redis

MQTTデータをRedisに取り込む流れは次の通りです。

  1. メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXプラットフォームに接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイムMQTTデータをパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージはルールエンジンを通過し、EMQXプラットフォームで定義されたルールに基づいて処理されます。ルールは事前に定義された条件により、どのメッセージをRedisにルーティングするかを決定します。ペイロードの変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
  3. Redisへのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために、あらかじめ設定されたRedisコマンドを実行するアクションがトリガーされます。
  4. データの保存と活用:Redisに保存されたデータを読み取ることで、企業はRedisの豊富なデータ操作機能を活用し、さまざまなユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得やGPS位置情報の分析、リアルタイムデータ分析やソートなどが可能となり、リアルタイム追跡やルート推奨などの機能を支えます。

特長とメリット

Redisとのデータ統合は、効率的なデータ伝送、処理、活用を実現するための多彩な特長とメリットを備えています。

  • 高いパフォーマンスとスケーラビリティ:EMQXの分散アーキテクチャとRedisのクラスター モードにより、データ量の増加に応じてアプリケーションをシームレスにスケールできます。大規模データでも一貫したパフォーマンスと応答性を確保します。
  • リアルタイムデータストリーム:EMQXプラットフォームはリアルタイムデータストリーム処理に特化しており、デバイスからRedisへの効率的かつ信頼性の高いデータ伝送を実現します。Redisは高速なデータ操作を実行できるため、リアルタイムのデータキャッシュに最適なデータストレージコンポーネントです。
  • リアルタイムデータ分析:Redisはデバイス接続数やメッセージパブリッシュ数、特定の業務指標などのリアルタイムメトリクスを計算可能です。EMQXプラットフォームはリアルタイムメッセージの送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
  • 地理位置情報分析:Redisは地理空間データ構造とコマンドを提供し、地理位置情報の保存と検索が可能です。EMQXプラットフォームの強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様なIoTアプリケーションに広く応用できます。

はじめる前に

このセクションでは、Redisデータ統合を作成する前に必要な準備、特にRedisサーバーのセットアップ方法について説明します。

前提条件

ネットワークの設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Redisサーバーのインストール

Dockerを使ったRedisのインストール

Dockerを使ってRedisをインストールし、起動します。

bash
# Redisコンテナを起動
docker run --name redis -p 6379:6379 -d redis

# コンテナにアクセス
docker exec -it redis bash

# Redisサーバーにアクセス
redis-cli

# インストールの検証
127.0.0.1:6379> set emqx_cloud "Hello World"
OK
127.0.0.1:6379> get emqx_cloud
"Hello World"

これでRedisのインストールが完了し、SETおよびGETコマンドで動作を確認できました。その他のRedisコマンドについてはRedis Commandsをご参照ください。

Redis Cloudを使ったRedisサービスの作成

  1. Redis Cloudコンソールにログインし、サブスクリプションを作成します。本デモではFixed Planを選択できます。
  2. データベースを作成します。
  3. データベースの設定ページで、接続に必要なアドレス、ユーザー名、パスワードなどの情報を確認します。
  4. 接続ボタンをクリックし、Redis CLIオプションを選択してコマンドをコピーし、コマンドラインで実行してサービスに接続し検証します。

詳細はRedis Cloud Documentationをご参照ください。

コネクターの作成

データ統合ルールを作成する前に、RedisサーバーにアクセスするためのRedisコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からRedisを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリのRedisを選択します。

  3. コネクター名はシステムが自動生成します。

  4. ビジネス要件に応じてRedisモードを設定します。例:single

  5. 接続情報を入力します。

    • サーバーホスト:サーバーのIPアドレスとポート番号
    • データベースID0を入力
    • ユーザー名とパスワード:Redis Cloudで作成したサービスの場合は、データベースの設定ページからユーザー名とパスワードをコピーして入力します。
    • その他のオプションはビジネス要件に応じて設定します。
    • 暗号化接続を確立したい場合は、TLSを有効にするトグルスイッチをオンにします。
  6. テストボタンをクリックし、Redisサービスにアクセス可能であれば成功メッセージが表示されます。

  7. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータをRedisに転送するアクションをルールに追加します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. 利用したい機能に基づいてSQLエディターでルールを設定します。ここでは、クライアントがtemp_hum/emqxトピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQLを記述します。

    sql
     SELECT
       timestamp div 1000 as up_timestamp,
       clientid as client_id,
       payload as temp_hum
     FROM
       "temp_hum/emqx"

    TIP

    初心者の方はSQL例をクリックし、テストを有効にするでSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. Redisコマンドテンプレートを設定します。トピックから読み取った「up_timestamp」「client_id」「温度」「湿度」のデータをRedisに保存します。

    bash
     HMSET ${client_id} ${up_timestamp} ${temp_hum}
  6. 確定ボタンをクリックしてルール作成を完了します。

  7. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。

ルールのテスト

温度と湿度のデータ報告をシミュレートするためにMQTTXの利用を推奨しますが、他のクライアントでも構いません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアントID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. 保存された結果を確認します。

    • Docker経由でRedisをインストールした場合は、まずコンテナにアクセスし、redis-cliを実行してRedisサービスに接続します。
    • Redis Cloudで作成したサービスの場合は、Redis CLI接続オプションを選択し、コマンドをコピーしてコマンドラインで実行し、Redisサービスに接続します。
    bash
    127.0.0.1:6379> HGETALL test_client
    1) "1710921138"
    2) "{\n  \"temp\": 27.5,\n  \"hum\": 41.8\n}"
  3. コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。