MQTTデータをRedisに取り込む
Redisは、オープンソースのインメモリデータストアであり、データベース、キャッシュ、ストリーミングエンジン、メッセージブローカーとして数百万の開発者に利用されています。EMQXはRedisとの統合をサポートしており、MQTTメッセージやクライアントイベントをRedisに保存できます。Redisデータ統合により、メッセージのキャッシュやクライアントイベントの統計にRedisを活用できます。
本ページでは、EMQXプラットフォームとRedis間のデータ統合について詳細に解説し、データ統合の作成および検証手順を実践的に説明します。
動作概要
Redisデータ統合はEMQXプラットフォームの標準機能であり、EMQXプラットフォームのリアルタイムデータキャプチャおよび送信機能と、Redisの豊富なデータ構造および高性能なキー・バリューの読み書き性能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXプラットフォームからRedisへのデータ取り込みを簡素化し、複雑なコーディングを不要にします。
以下の図は、EMQXプラットフォームとRedis間のデータ統合の典型的なアーキテクチャを示しています。
MQTTデータをRedisに取り込む流れは次の通りです。
- メッセージのパブリッシュと受信:産業用IoTデバイスはMQTTプロトコルを通じてEMQXプラットフォームに接続し、機械やセンサー、製品ラインの稼働状態や計測値、トリガーイベントに基づくリアルタイムMQTTデータをパブリッシュします。EMQXプラットフォームはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージはルールエンジンを通過し、EMQXプラットフォームで定義されたルールに基づいて処理されます。ルールは事前に定義された条件により、どのメッセージをRedisにルーティングするかを決定します。ペイロードの変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの拡充などが適用されます。
- Redisへのデータ取り込み:ルールエンジンがデータを処理した後、キャッシュやカウントなどの操作を行うために、あらかじめ設定されたRedisコマンドを実行するアクションがトリガーされます。
- データの保存と活用:Redisに保存されたデータを読み取ることで、企業はRedisの豊富なデータ操作機能を活用し、さまざまなユースケースを実現できます。例えば物流分野では、デバイスの最新状態取得やGPS位置情報の分析、リアルタイムデータ分析やソートなどが可能となり、リアルタイム追跡やルート推奨などの機能を支えます。
特長とメリット
Redisとのデータ統合は、効率的なデータ伝送、処理、活用を実現するための多彩な特長とメリットを備えています。
- 高いパフォーマンスとスケーラビリティ:EMQXの分散アーキテクチャとRedisのクラスター モードにより、データ量の増加に応じてアプリケーションをシームレスにスケールできます。大規模データでも一貫したパフォーマンスと応答性を確保します。
- リアルタイムデータストリーム:EMQXプラットフォームはリアルタイムデータストリーム処理に特化しており、デバイスからRedisへの効率的かつ信頼性の高いデータ伝送を実現します。Redisは高速なデータ操作を実行できるため、リアルタイムのデータキャッシュに最適なデータストレージコンポーネントです。
- リアルタイムデータ分析:Redisはデバイス接続数やメッセージパブリッシュ数、特定の業務指標などのリアルタイムメトリクスを計算可能です。EMQXプラットフォームはリアルタイムメッセージの送受信と処理を担い、データ分析のためのリアルタイム入力を提供します。
- 地理位置情報分析:Redisは地理空間データ構造とコマンドを提供し、地理位置情報の保存と検索が可能です。EMQXプラットフォームの強力なデバイス接続機能と組み合わせることで、物流、コネクテッドカー、スマートシティなど多様なIoTアプリケーションに広く応用できます。
はじめる前に
このセクションでは、Redisデータ統合を作成する前に必要な準備、特にRedisサーバーのセットアップ方法について説明します。
前提条件
ネットワークの設定
EMQX Platform上でデプロイメント(EMQXクラスター)を作成し、ネットワークを構成する必要があります。
- 専用デプロイメントユーザーの場合:まず、VPCピアリング接続を作成してください。ピアリング接続が確立された後、内部ネットワークのIPを介してプラットフォームコンソールにログインし、対象のコネクターにアクセスできます。あるいは、NATゲートウェイを設定して、パブリックIP経由で対象のコネクターにアクセスすることも可能です。
- BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOCがデプロイされているVPCと対象コネクターが存在するVPC間でピアリング接続を確立してください。ピアリング接続作成後、内部ネットワークのIPを介して対象コネクターにアクセスできます。パブリックIPアドレス経由でリソースにアクセスする必要がある場合は、パブリッククラウドコンソールでBYOCがデプロイされているVPCに対してNATゲートウェイを構成してください。
Redisサーバーのインストール
Dockerを使ったRedisのインストール
Dockerを使ってRedisをインストールし、起動します。
# Redisコンテナを起動
docker run --name redis -p 6379:6379 -d redis
# コンテナにアクセス
docker exec -it redis bash
# Redisサーバーにアクセス
redis-cli
# インストールの検証
127.0.0.1:6379> set emqx_cloud "Hello World"
OK
127.0.0.1:6379> get emqx_cloud
"Hello World"
これでRedisのインストールが完了し、SET
およびGET
コマンドで動作を確認できました。その他のRedisコマンドについてはRedis Commandsをご参照ください。
Redis Cloudを使ったRedisサービスの作成
- Redis Cloudコンソールにログインし、サブスクリプションを作成します。本デモではFixed Planを選択できます。
- データベースを作成します。
- データベースの設定ページで、接続に必要なアドレス、ユーザー名、パスワードなどの情報を確認します。
- 接続ボタンをクリックし、Redis CLIオプションを選択してコマンドをコピーし、コマンドラインで実行してサービスに接続し検証します。
詳細はRedis Cloud Documentationをご参照ください。
コネクターの作成
データ統合ルールを作成する前に、RedisサーバーにアクセスするためのRedisコネクターを作成する必要があります。
デプロイメントに移動し、左側ナビゲーションメニューからデータ統合をクリックします。
初めてコネクターを作成する場合は、データ永続化カテゴリの中からRedisを選択します。既にコネクターを作成済みの場合は、新規コネクターを選択し、続けてデータ永続化カテゴリのRedisを選択します。
コネクター名はシステムが自動生成します。
ビジネス要件に応じてRedisモードを設定します。例:
single
接続情報を入力します。
- サーバーホスト:サーバーのIPアドレスとポート番号
- データベースID:
0
を入力 - ユーザー名とパスワード:Redis Cloudで作成したサービスの場合は、データベースの設定ページからユーザー名とパスワードをコピーして入力します。
- その他のオプションはビジネス要件に応じて設定します。
- 暗号化接続を確立したい場合は、TLSを有効にするトグルスイッチをオンにします。
テストボタンをクリックし、Redisサービスにアクセス可能であれば成功メッセージが表示されます。
新規作成ボタンをクリックして作成を完了します。
ルールの作成
次に、書き込むデータを指定するルールを作成し、処理済みデータをRedisに転送するアクションをルールに追加します。
ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。
利用したい機能に基づいてSQLエディターでルールを設定します。ここでは、クライアントが
temp_hum/emqx
トピックに温度と湿度のメッセージを送信した際にエンジンをトリガーするSQLを記述します。sqlSELECT timestamp div 1000 as up_timestamp, clientid as client_id, payload as temp_hum FROM "temp_hum/emqx"
TIP
初心者の方はSQL例をクリックし、テストを有効にするでSQLルールを学習・テストできます。
次へをクリックしてアクションを追加します。
コネクターのドロップダウンから先ほど作成したコネクターを選択します。
Redisコマンドテンプレートを設定します。トピックから読み取った「up_timestamp」「client_id」「温度」「湿度」のデータをRedisに保存します。
bashHMSET ${client_id} ${up_timestamp} ${temp_hum}
確定ボタンをクリックしてルール作成を完了します。
新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の一連の流れを完了します。
ルールのテスト
温度と湿度のデータ報告をシミュレートするためにMQTTXの利用を推奨しますが、他のクライアントでも構いません。
MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。
トピック:
temp_hum/emqx
クライアントID:
test_client
ペイロード:
json{ "temp": "27.5", "hum": "41.8" }
保存された結果を確認します。
- Docker経由でRedisをインストールした場合は、まずコンテナにアクセスし、
redis-cli
を実行してRedisサービスに接続します。 - Redis Cloudで作成したサービスの場合は、Redis CLI接続オプションを選択し、コマンドをコピーしてコマンドラインで実行し、Redisサービスに接続します。
bash127.0.0.1:6379> HGETALL test_client 1) "1710921138" 2) "{\n \"temp\": 27.5,\n \"hum\": 41.8\n}"
- Docker経由でRedisをインストールした場合は、まずコンテナにアクセスし、
コンソールで運用データを確認します。ルール一覧のルールIDをクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計情報が表示されます。