Upstash for Redis への MQTT データ取り込み
Upstash はクラウドベースのサーバーレスデータプラットフォームであり、開発者がインフラ管理の手間なく Redis データベースと Kafka をアプリケーションにシームレスに統合できる環境を提供します。サーバーレスアーキテクチャを採用しており、高性能なインメモリデータストアである Redis と Kafka の利点を、デプロイやスケーリング、メンテナンスの複雑さを気にすることなく享受できます。
本ページでは、Upstash データ統合の機能概要と実装に関する実践的なガイドを詳述します。Kafka コネクターの作成、ルールの定義、動作検証などの基本作業をカバーし、さらに MQTT プロトコルを用いてシミュレートした温湿度データを EMQX Platform に報告し、設定したデータ統合を通じて Upstash に保存する手順を示します。
動作概要
Upstash Redis データ統合は EMQX Platform に標準搭載された機能であり、MQTT ベースの IoT データと Kafka の強力なデータ処理機能を橋渡しします。組み込みのルールエンジンコンポーネントを通じて、複雑なコーディングなしに両プラットフォーム間のデータフローと処理を簡素化します。
以下の図は、EMQX Platform と Redis 間の典型的なデータ統合アーキテクチャを示しています。
Upstash Redis への MQTT データ取り込みは以下の流れで行われます。
- メッセージのパブリッシュと受信:産業用 IoT デバイスは MQTT プロトコルを通じて EMQX Platform デプロイメントに正常に接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイム MQTT データを EMQX Platform にパブリッシュします。EMQX Platform はこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージ到着時にルールエンジンを通過し、EMQX Platform で定義されたルールに基づいて処理されます。ルールは事前定義された条件により、どのメッセージを Redis にルーティングするかを判定します。ペイロード変換が指定されている場合は、データ形式の変換や特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
- Redis へのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために設定された Redis コマンドを実行するアクションがトリガーされます。
- データの保存と活用:Redis に保存されたデータを読み取ることで、企業は豊富なデータ操作機能を活用し、多様なユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得や GPS 地理位置情報の分析、リアルタイムデータ分析やソートなどの操作が可能となり、リアルタイム追跡やルート推奨などの機能を支えます。
特長とメリット
Redis とのデータ統合は、効率的なデータ送信、処理、活用を実現するための多彩な特長とメリットを備えています。
- 高性能かつスケーラブル:EMQX の分散アーキテクチャと Redis のクラスター モードにより、データ量の増加に応じてアプリケーションをシームレスにスケール可能です。大規模データセットでも一貫したパフォーマンスと応答性を保証します。
- リアルタイムデータストリーム:EMQX Platform はリアルタイムデータストリーム処理に特化しており、デバイスから Redis への効率的かつ信頼性の高いデータ伝送を実現します。Redis は高速なデータ操作を実行できるため、リアルタイムのデータキャッシュに最適なデータストレージコンポーネントとなります。
- リアルタイムデータ分析:Redis はデバイス接続数やメッセージパブリッシュ数、特定の業務指標などのリアルタイムメトリクスを計算可能です。一方、EMQX Platform はリアルタイムメッセージの送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
- 地理位置情報分析:Redis は地理空間データ構造とコマンドを備え、地理位置情報の保存とクエリが可能です。EMQX Platform の強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様な IoT アプリケーションに幅広く応用できます。
はじめる前に
本節では、EMQX Platform で Upstash for Redis データ統合を作成するための準備作業を紹介します。
前提条件
ネットワーク設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Upstash for Redis データベースのセットアップ
Upstash を利用開始するには、https://upstash.com/ にアクセスし、アカウントを作成してください。
Redis データベースの作成
ログイン後、Create Database ボタンをクリックして Redis データベースを作成します。
有効な名前を入力し、データベースをデプロイしたいリージョンを選択します。パフォーマンスを最適化するため、デプロイメントのリージョンに最も近いリージョンを選択することを推奨します。
Create をクリックすると、サーバーレスの Redis データベースが作成されます。
詳細の確認
データベースコンソールに入り、次のステップに必要な情報を確認してください。
コネクターの作成
デプロイメントに移動し、左ナビゲーションメニューから Data Integration をクリックします。
初めてコネクターを作成する場合は、Data Persistence カテゴリーの中から Upstash for Redis を選択します。既にコネクターを作成済みの場合は、New Connector を選択し、続けて Data Persistence カテゴリーの中から Upstash for Redis を選択します。
Connector Name:システムが自動的にコネクター名を生成します。
接続情報を入力します:
- Server Host:Redis 詳細ページの Endpoints と Port の情報
- Password:Redis 詳細ページの Password の情報
- その他はデフォルトのままにします。
詳細設定(任意)
Test ボタンをクリックします。Redis サービスにアクセス可能であれば成功のメッセージが表示されます。
New ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータを Upstash for Redis に転送するためのアクションをルールに追加します。
ルールエリアの New Rule をクリックするか、作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。
SQL Editor にルールのマッチング SQL 文を入力します。以下のルールでは、メッセージが報告された時間
arrived
、クライアント ID、temp_hum/emqx
トピックのペイロードから温度と湿度を読み取ります。sqlSELECT timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum FROM "temp_hum/emqx"
TIP
初心者の方は、SQL Examples と Enable Test をクリックして SQL ルールの学習とテストを行うことができます。
Next をクリックしてアクションを追加します。
Connector ドロップダウンから先ほど作成したコネクターを選択します。
Redis Command Template を設定します。トピックから読み取った「up_timestamp」「client ID」「温度」「湿度」のデータを Redis に保存します。
sqlHMSET ${client_id} ${up_timestamp} ${temp}
Confirm ボタンをクリックしてルール作成を完了します。
Successful new rule ポップアップで Back to Rules をクリックし、データ統合設定の一連の作業を完了します。
ルールのテスト
温湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他の任意のクライアントでも構いません。
MQTTX を使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアント ID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
Upstash コンソールでデータを確認します。Data Browser でクライアントエントリーを選択すると、メッセージを確認できます。
コンソールで運用データを確認します。ルール一覧のルール ID をクリックすると、ルールの統計情報とそのルールに属するすべてのアクションの統計情報が表示されます。