Skip to content

Upstash for Redis への MQTT データ取り込み

Upstash はクラウドベースのサーバレスデータプラットフォームであり、開発者がインフラ管理の手間なく Redis データベースや Kafka をアプリケーションにシームレスに統合できる環境を提供します。サーバレスアーキテクチャを採用しているため、高性能なインメモリデータストアである Redis と Kafka の利点を、デプロイメントやスケーリング、メンテナンスの複雑さを気にせずに享受できます。

本ページでは、Upstash データ統合の機能概要と実装に関する実践的なガイドを詳述します。Kafka コネクターの作成、ルールの定義、動作検証などの基本的な作業をカバーし、さらに MQTT プロトコルを用いてシミュレートした温湿度データを EMQX Cloud に報告し、設定したデータ統合を通じて Upstash に保存する流れを示します。

動作の仕組み

Upstash Redis データ統合は EMQX Cloud に標準搭載された機能であり、MQTT ベースの IoT データと Kafka の強力なデータ処理機能をつなぐ役割を果たします。組み込みのルールエンジンコンポーネントを通じて、複雑なコーディングなしに両プラットフォーム間のデータフローと処理を簡素化します。

以下の図は、EMQX Cloud と Redis 間のデータ統合の典型的なアーキテクチャを示しています。

EMQX Cloud Integration Redis

Upstash Redis への MQTT データ取り込みは以下の流れで行われます:

  1. メッセージのパブリッシュと受信:産業用 IoT デバイスは MQTT プロトコルを通じて EMQX Cloud デプロイメントに正常に接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイム MQTT データを EMQX Cloud にパブリッシュします。EMQX Cloud はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloud に定義されたルールによって処理されます。ルールは事前に定義された条件に基づき、どのメッセージを Redis にルーティングするかを決定します。ペイロード変換を指定するルールがある場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. Redis へのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために、あらかじめ設定した Redis コマンドの実行がトリガーされます。
  4. データの保存と活用:Redis に保存されたデータを読み取ることで、企業は豊富なデータ操作機能を活用し、多様なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得や GPS 地理情報解析を基にしたリアルタイムデータ分析やソートが可能となり、リアルタイム追跡やルート推奨などの機能を支えます。

特長とメリット

Redis とのデータ統合は、効率的なデータ送信、処理、活用を実現するために以下のような特長とメリットを備えています:

  • 高性能かつスケーラブル:EMQX の分散アーキテクチャと Redis のクラスターモードにより、データ量の増加に応じてアプリケーションをシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を保証します。
  • リアルタイムデータストリーム:EMQX Cloud はリアルタイムデータストリーム処理に特化しており、デバイスから Redis への効率的かつ信頼性の高いデータ送信を実現します。Redis は高速なデータ操作が可能で、リアルタイムデータキャッシュのニーズに応え、EMQX Cloud の理想的なデータストレージコンポーネントとなります。
  • リアルタイムデータ分析:Redis はデバイス接続数やメッセージパブリッシュ数、特定のビジネス指標などのリアルタイムメトリクス計算に利用可能です。一方、EMQX Cloud はリアルタイムメッセージ送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
  • 地理位置情報分析:Redis は地理空間データ構造とコマンドを備え、地理位置情報の保存とクエリをサポートします。EMQX Cloud の強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様な IoT アプリケーションに広く適用可能です。

はじめる前に

本セクションでは、EMQX Cloud における Upstash for Redis データ統合の作成に必要な準備作業を紹介します。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

Upstash for Redis データベースのセットアップ

Upstash を利用開始するには、https://upstash.com/ にアクセスし、アカウントを作成してください。

Redis データベースの作成

  1. ログイン後、Create Database ボタンをクリックして Redis データベースを作成します。

  2. 有効な名前を入力し、データベースをデプロイしたいリージョンを選択します。パフォーマンス最適化のため、デプロイメントのリージョンに最も近いリージョンを選ぶことを推奨します。

  3. Create をクリックすると、サーバレスの Redis データベースが作成されます。

詳細の確認

データベースコンソールに入り、次のステップで必要となる情報を確認してください。

コネクターの作成

  1. ご自身のデプロイメントにアクセスし、左側ナビゲーションメニューから Data Integration をクリックします。

  2. 初めてコネクターを作成する場合は、Data Persistence カテゴリの中から Upstash for Redis を選択します。すでにコネクターを作成済みの場合は、New Connector を選択し、続いて Data Persistence カテゴリの中から Upstash for Redis を選択してください。

  3. Connector Name はシステムにより自動生成されます。

  4. 接続情報を入力します:

    • Server Host:Redis 詳細ページの EndpointsPort の情報
    • Password:Redis 詳細ページの Password の情報
    • その他はデフォルトのままにします。
  5. 高度な設定(任意)を行います。

  6. Test ボタンをクリックし、Redis サービスにアクセス可能であれば成功メッセージが表示されます。

  7. New ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込み対象のデータを指定するルールを作成し、処理済みデータを Upstash for Redis に転送するためのアクションをルールに追加します。

  1. ルールエリアの New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。

  2. SQL Editor にルールのマッチング SQL 文を入力します。以下のルール例では、メッセージ到着時刻 arrived、クライアント ID、temp_hum/emqx トピックのペイロードから温度と湿度を読み取ります。

    sql
    SELECT
      timestamp as up_timestamp,
      clientid as client_id,
      payload.temp as temp,
      payload.hum as hum
    FROM
      "temp_hum/emqx"

    TIP

    初心者の方は SQL Examples をクリックし、Try It Out で SQL ルールを学習・テストしてください。

  3. Next をクリックしてアクションを追加します。

  4. Connector ドロップダウンから先ほど作成したコネクターを選択します。

  5. Redis Command Template を設定します。トピックから読み取った「up_timestamp」「client ID」「温度」「湿度」のデータを Redis に保存します:

    sql
    HMSET ${client_id} ${up_timestamp} ${temp}
  6. Confirm ボタンをクリックし、ルール作成を完了します。

  7. Successful new rule ポップアップで Back to Rules をクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • クライアント ID:test_client

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. Upstash コンソールでデータを確認します。Data Browser でクライアントエントリを選択し、メッセージをチェックできます。

  3. コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報およびそのルール下のすべてのアクションの統計が表示されます。