Skip to content

Upstash for Redis への MQTT データ取り込み

Upstash はクラウドベースのサーバーレスデータプラットフォームであり、開発者がインフラ管理の手間なく Redis データベースと Kafka をアプリケーションにシームレスに統合できる環境を提供します。サーバーレスアーキテクチャを採用しており、高性能なインメモリデータストアである Redis と Kafka の利点を、デプロイやスケーリング、メンテナンスの複雑さを気にすることなく享受できます。

本ページでは、Upstash データ統合の機能概要と実装に関する実践的なガイドを詳述します。Kafka コネクターの作成、ルールの定義、動作検証などの基本作業をカバーし、さらに MQTT プロトコルを用いてシミュレートした温湿度データを EMQX Platform に報告し、設定したデータ統合を通じて Upstash に保存する手順を示します。

動作概要

Upstash Redis データ統合は EMQX Platform に標準搭載された機能であり、MQTT ベースの IoT データと Kafka の強力なデータ処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを通じて、複雑なコーディングなしに両プラットフォーム間のデータフローと処理を簡素化します。

以下の図は、EMQX Platform と Redis 間の典型的なデータ統合アーキテクチャを示しています。

EMQX Platform Integration Redis

Upstash Redis への MQTT データ取り込みは以下の流れで行われます。

  1. メッセージのパブリッシュと受信:産業用 IoT デバイスは MQTT プロトコルを通じて EMQX Platform デプロイメントに正常に接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイム MQTT データを EMQX Platform にパブリッシュします。EMQX Platform はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージ到着時にルールエンジンを通過し、EMQX Platform で定義されたルールに基づいて処理されます。ルールは事前定義された条件により、どのメッセージを Redis にルーティングするかを判定します。ペイロード変換が指定されている場合は、データ形式の変換や特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. Redis へのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために設定された Redis コマンドを実行するアクションがトリガーされます。
  4. データの保存と活用:Redis に保存されたデータを読み取ることで、企業は豊富なデータ操作機能を活用し、多様なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得や GPS 地理位置情報の分析、リアルタイムデータ分析やソートなどの操作が可能となり、リアルタイム追跡やルート推奨などの機能を支えます。

特長とメリット

Redis とのデータ統合は、効率的なデータ送信、処理、活用を実現するための多彩な特長とメリットを備えています。

  • 高性能かつスケーラブル:EMQX の分散アーキテクチャと Redis のクラスター モードにより、データ量の増加に応じてアプリケーションをシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を保証します。
  • リアルタイムデータストリーム:EMQX Platform はリアルタイムデータストリーム処理に特化しており、デバイスから Redis への効率的かつ信頼性の高いデータ伝送を実現します。Redis は高速なデータ操作を実行できるため、リアルタイムのデータキャッシュに最適なデータストレージコンポーネントとなります。
  • リアルタイムデータ分析:Redis はデバイス接続数やメッセージパブリッシュ数、特定の業務指標などのリアルタイムメトリクスを計算可能です。一方、EMQX Platform はリアルタイムメッセージの送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
  • 地理位置情報分析:Redis は地理空間データ構造とコマンドを備え、地理位置情報の保存とクエリが可能です。EMQX Platform の強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様な IoT アプリケーションに幅広く応用できます。

はじめる前に

本節では、EMQX Platform で Upstash for Redis データ統合を作成するための準備作業を紹介します。

前提条件

ネットワーク設定

EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。

  • 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。

Upstash for Redis データベースのセットアップ

Upstash を利用開始するには、https://upstash.com/ にアクセスし、アカウントを作成してください。

Redis データベースの作成

  1. ログイン後、Create Database ボタンをクリックして Redis データベースを作成します。

  2. 有効な名前を入力し、データベースをデプロイしたいリージョンを選択します。パフォーマンスを最適化するため、デプロイメントのリージョンに最も近いリージョンを選択することを推奨します。

  3. Create をクリックすると、サーバーレスの Redis データベースが作成されます。

詳細の確認

データベースコンソールに入り、次のステップに必要な情報を確認してください。

コネクターの作成

  1. デプロイメントに移動し、左ナビゲーションメニューから Data Integration をクリックします。

  2. 初めてコネクターを作成する場合は、Data Persistence カテゴリーの中から Upstash for Redis を選択します。既にコネクターを作成済みの場合は、New Connector を選択し、続けて Data Persistence カテゴリーの中から Upstash for Redis を選択します。

  3. Connector Name:システムが自動的にコネクター名を生成します。

  4. 接続情報を入力します:

    • Server Host:Redis 詳細ページの EndpointsPort の情報
    • Password:Redis 詳細ページの Password の情報
    • その他はデフォルトのままにします。
  5. 詳細設定(任意)

  6. Test ボタンをクリックします。Redis サービスにアクセス可能であれば成功のメッセージが表示されます。

  7. New ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定するルールを作成し、処理済みデータを Upstash for Redis に転送するためのアクションをルールに追加します。

  1. ルールエリアの New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。

  2. SQL Editor にルールのマッチング SQL 文を入力します。以下のルールでは、メッセージが報告された時間 arrived、クライアント ID、temp_hum/emqx トピックのペイロードから温度と湿度を読み取ります。

    sql
    SELECT
      timestamp as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"

    TIP

    初心者の方は、SQL ExamplesEnable Test をクリックして SQL ルールの学習とテストを行うことができます。

  3. Next をクリックしてアクションを追加します。

  4. Connector ドロップダウンから先ほど作成したコネクターを選択します。

  5. Redis Command Template を設定します。トピックから読み取った「up_timestamp」「client ID」「温度」「湿度」のデータを Redis に保存します。

    sql
    HMSET ${client_id} ${up_timestamp} ${temp}
  6. Confirm ボタンをクリックしてルール作成を完了します。

  7. Successful new rule ポップアップで Back to Rules をクリックし、データ統合設定の一連の作業を完了します。

ルールのテスト

温湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック: temp_hum/emqx

    • クライアント ID: test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. Upstash コンソールでデータを確認します。Data Browser でクライアントエントリーを選択すると、メッセージを確認できます。

  3. コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報とそのルールに属するすべてのアクションの統計情報が表示されます。