Amazon S3へのMQTTデータ取り込み
Amazon S3 は、高い信頼性、安定性、セキュリティを備えたインターネットベースのストレージサービスであり、迅速なデプロイと使いやすさが特徴です。EMQXはMQTTメッセージを効率的にAmazon S3バケットに保存することができ、柔軟なIoTデータストレージ機能を実現します。
本ページでは、EMQXとAmazon S3のデータ連携について詳しく紹介し、ルールおよびSinkの作成方法を実践的に解説します。
TIP
EMQXはAmazon S3以外にも、S3プロトコルをサポートする以下のストレージサービスに対応しています:
- MinIO:MinIOは高性能な分散オブジェクトストレージシステムです。Amazon S3 API互換のオープンソースオブジェクトストレージサーバーで、プライベートクラウド構築に適しています。
- Google Cloud Storage:Google Cloud StorageはGoogle Cloudの統合オブジェクトストレージで、大量データの保存に対応し、Amazon S3互換のインターフェースを提供します。
用途やビジネスニーズに応じて適切なストレージサービスを選択してください。
動作概要
EMQXのAmazon S3データ連携はすぐに使える機能で、複雑なビジネス開発にも簡単に設定可能です。典型的なIoTアプリケーションでは、EMQXがデバイス接続およびメッセージ伝送を担うIoTプラットフォームとして機能し、Amazon S3がメッセージデータの保存プラットフォームとして役割を果たします。
EMQXはルールエンジンとSinkを利用してデバイスのイベントやデータをAmazon S3へ転送します。アプリケーションはAmazon S3からデータを読み込み、さらなるデータ活用を行います。具体的なワークフローは以下の通りです:
- デバイスのEMQX接続:IoTデバイスはMQTTプロトコルで接続成功時にオンラインイベントを発生させます。イベントにはデバイスID、送信元IPアドレスなどの情報が含まれます。
- デバイスメッセージのパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQXはメッセージを受信し、ルールエンジン内で照合します。
- ルールエンジンによるメッセージ処理:組み込みのルールエンジンはトピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールをマッチングし、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報の付加などを行います。
- Amazon S3への書き込み:ルールがトリガーされると、メッセージをS3に書き込むアクションが実行されます。Amazon S3 Sinkを使用して処理結果からデータを抽出しS3へ送信します。メッセージはテキストまたはバイナリ形式で保存可能で、複数行の構造化データを単一のCSVまたはJSON Linesファイルにまとめることもできます。これはメッセージ内容やSinkの設定に依存します。
イベントやメッセージデータがAmazon S3に書き込まれた後は、Amazon S3に接続してデータを読み込み、以下のような柔軟なアプリケーション開発が可能です:
- データアーカイブ:デバイスメッセージをAmazon S3のオブジェクトとして長期保存し、コンプライアンス要件やビジネスニーズに対応。
- データ分析:S3のデータをSnowflakeなどの分析サービスに取り込み、予知保全やデバイス効率評価などのデータ分析に活用。
特長と利点
EMQXのAmazon S3データ連携を利用することで、以下の特長と利点をビジネスにもたらせます:
- メッセージ変換:メッセージはEMQXルール内で多様な処理・変換が可能で、Amazon S3への書き込み前に適切な形に整形でき、後続の保存や利用が容易になります。
- 柔軟なデータ操作:S3 Sinkを使うことで、特定のデータフィールドだけをAmazon S3バケットに書き込むことができ、バケットやオブジェクトキーの動的設定にも対応し柔軟なデータ保存が可能です。
- 統合されたビジネスプロセス:S3 SinkによりデバイスデータをAmazon S3の豊富なエコシステムと組み合わせられ、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
- 低コストの長期保存:データベースと比較して、Amazon S3は高可用性・高信頼性かつコスト効率の良いオブジェクトストレージを提供し、長期保存に適しています。
これらの特長により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てられます。
はじめる前に
ここでは、EMQXでAmazon S3 Sinkを作成する前に必要な準備について説明します。
前提条件
S3バケットの準備
EMQXはAmazon S3およびその他のS3互換ストレージサービスをサポートしています。AWSクラウドサービスを利用するか、DockerでMinIOインスタンスを展開できます。
コネクターの作成
S3 Sinkを追加する前に、対応するコネクターを作成する必要があります。
- ダッシュボードの Integration -> Connector ページに移動します。
- 右上の Create ボタンをクリックします。
- コネクタータイプとして Amazon S3 を選択し、次へ進みます。
- コネクター名を入力します。英数字の組み合わせで、ここでは
my-s3
と入力します。 - 接続情報を入力します。
- Amazon S3バケットを使用する場合は以下を入力します:
- Host:リージョンによって異なり、
s3.{region}.amazonaws.com
の形式です。 - Port:
443
を入力します。 - Access Key ID と Secret Access Key:AWSで作成したアクセスキーを入力します。
- Host:リージョンによって異なり、
- MinIOを使用する場合は以下を入力します:
- Host:
127.0.0.1
を入力します。リモートでMinIOを実行している場合は実際のホストアドレスを入力してください。 - Port:
9000
を入力します。 - Access Key ID と Secret Access Key:MinIOで作成したアクセスキーを入力します。
- Host:
- Amazon S3バケットを使用する場合は以下を入力します:
- 残りの設定はデフォルト値のままにします。
- Createをクリックする前に、Test Connectivityを押してコネクターがS3サービスに接続できるかテストできます。
- 下部の Create ボタンをクリックしてコネクター作成を完了します。
これでコネクター作成が完了し、次にS3サービスへ書き込むデータを指定するルールとSinkの作成に進みます。
Amazon S3 Sinkを使ったルールの作成
ここでは、EMQXでソースMQTTトピックt/#
からのメッセージを処理し、処理結果をS3のiot-data
バケットに書き込むルールの作成手順を示します。
ダッシュボードの Integration -> Rules ページに移動します。
右上の Create ボタンをクリックします。
ルールIDに
my_rule
を入力し、SQLエディターに以下のルールSQLを入力します:sqlSELECT * FROM "t/#"
TIP
SQLに不慣れな場合は、SQL ExamplesやEnable DebugをクリックしてルールSQLの学習や結果のテストが可能です。
アクションを追加し、Action Typeドロップダウンから
Amazon S3
を選択します。アクションのドロップダウンはデフォルトのcreate action
のままにするか、既存のAmazon S3アクションを選択できます。ここでは新しいSinkを作成してルールに追加します。Sinkの名前と説明を入力します。
先ほど作成した
my-s3
コネクターをコネクタードロップダウンから選択します。ドロップダウン横の作成ボタンを押すとポップアップで新規コネクターを素早く作成できます。必要な設定パラメータはコネクターの作成を参照してください。Bucketに
iot-data
を入力します。このフィールドは${var}
形式のプレースホルダーもサポートしますが、対応する名前のバケットが事前にS3に作成されている必要があります。必要に応じてACLを選択し、アップロードされるオブジェクトのアクセス権限を指定します。
Upload Methodを選択します。2つの方法の違いは以下の通りです:
- Direct Upload:ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接S3にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
- Aggregated Upload:複数のルールトリガーの結果を1つのファイル(CSVなど)にまとめてS3にアップロードします。構造化データの保存に適し、ファイル数を減らし書き込み効率を向上させます。
選択した方法により設定項目が異なります。以下の通り設定してください:
Object Contentを設定します。デフォルトは全フィールドを含むJSONテキスト形式で、
${var}
形式のプレースホルダーをサポートします。ここでは${payload}
を入力し、メッセージ本文をオブジェクト内容として使用します。この場合、オブジェクトの保存形式はメッセージ本文の形式に依存し、圧縮ファイルや画像、その他バイナリ形式もサポートします。フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらのアクションがトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細は詳細設定を参照してください。
残りの設定はデフォルト値のままにし、CreateボタンをクリックしてSink作成を完了します。作成成功後、ルール作成画面に戻り、新しいSinkがルールアクションに追加されます。
ルール作成画面でCreateボタンをクリックし、ルール作成全体を完了します。
これでルールの作成が完了しました。Rulesページで新規作成したルールを確認でき、**Actions (Sink)**タブで新しいS3 Sinkも確認できます。
また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/#
のメッセージがルールmy_rule
で解析されS3に書き込まれる流れを視覚的に確認できます。
ルールのテスト
ここでは、Direct Upload方式で設定したルールのテスト方法を示します。
MQTTXを使ってトピックt/1
にメッセージをパブリッシュします:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello S3" }'
数件メッセージを送信した後、MinIOコンソールまたはAmazon S3コンソールにアクセスして結果を確認します。
詳細設定
ここではS3 Sinkの高度な設定オプションについて説明します。ダッシュボードでSinkを設定する際、Advanced Settingsを展開し、用途に応じて以下のパラメータを調整できます。
項目名 | 説明 | デフォルト値 |
---|---|---|
Buffer Pool Size | EMQXとS3間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保持・処理し、ターゲットサービスへ送信します。パフォーマンス最適化とスムーズなデータ送信に重要です。 | 16 |
Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストはバッファに入った時点からTTLがカウントされ、TTLを超えるか、S3からの応答やアックがタイムリーに得られない場合、リクエストは期限切れとみなされます。 | |
Health Check Interval | SinkがS3との接続状態を自動的にヘルスチェックする間隔(秒)を指定します。 | 15 |
Max Buffer Queue Size | S3 Sinkの各バッファワーカーがバッファリングできる最大バイト数を指定します。バッファワーカーはデータを一時的に保持し、効率的にデータストリームを処理します。システム性能やデータ送信要件に応じて調整してください。 | 256 |
Query Mode | 同期(synchronous )または非同期(asynchronous )のリクエストモードを選択し、メッセージ送信を最適化します。非同期モードではS3への書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージをS3到達前に受信する可能性があります。 | Asynchronous |
In-flight Window | 「インフライトキューリクエスト」とは、送信済みで応答やアックをまだ受け取っていないリクエストを指します。この設定はSinkとS3間の通信で同時に存在可能なインフライトリクエストの最大数を制御します。 Request Modeが asynchronous の場合、同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、この値を1 に設定してください。 | 100 |
Min Part Size | 集約完了後のパートアップロードの最小チャンクサイズです。アップロード対象データはこのサイズに達するまでメモリに蓄積されます。 | 5MB |
Max Part Size | パートアップロードの最大チャンクサイズです。S3 Sinkはこのサイズを超えるパートのアップロードを試みません。 | 5GB |