Skip to content

Amazon S3へのMQTTデータ取り込み

Amazon S3は、高い信頼性、安定性、およびセキュリティを備えたインターネットベースのストレージサービスであり、迅速なデプロイと使いやすさが特徴です。EMQX Platformは、MQTTメッセージを効率的にAmazon S3バケットに保存することができ、柔軟なIoTデータストレージ機能を実現します。

本ページでは、EMQX PlatformとAmazon S3間のデータ統合について詳しく紹介し、ルールおよびSinkの作成方法について実践的なガイドを提供します。

動作概要

EMQX PlatformにおけるAmazon S3データ統合は、複雑なビジネス開発に対応可能なすぐに使える機能として提供されています。典型的なIoTアプリケーションでは、EMQX Platformがデバイス接続およびメッセージ送受信を担うIoTプラットフォームとして機能し、Amazon S3がメッセージデータの保存を担当するデータストレージプラットフォームとして利用されます。

EMQX Platform-integration-s3

EMQX PlatformはルールとSinkを利用してデバイスのイベントやデータをAmazon S3へ転送します。アプリケーションはAmazon S3からデータを読み出し、さらなるデータ活用を行います。具体的なワークフローは以下の通りです:

  1. デバイスのEMQX Platformへの接続:IoTデバイスはMQTTプロトコルで正常に接続されるとオンラインイベントをトリガーします。イベントにはデバイスID、送信元IPアドレスなどのプロパティ情報が含まれます。
  2. デバイスからのメッセージパブリッシュと受信:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュします。EMQX Platformはメッセージを受信し、ルールエンジン内で比較処理を行います。
  3. ルールエンジンによるメッセージ処理:組み込みのルールエンジンは、トピックマッチングに基づき特定のソースからのメッセージやイベントを処理します。対応するルールをマッチングし、データフォーマット変換、特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを行います。
  4. Amazon S3への書き込み:ルールはメッセージをS3に書き込むアクションをトリガーします。Amazon S3 Sinkを使用して処理結果からデータを抽出し、S3へ送信します。メッセージはテキストまたはバイナリ形式で保存可能であり、メッセージ内容やSink設定に応じて複数行の構造化データを1つのCSVファイルにまとめることもできます。

イベントやメッセージデータがAmazon S3に書き込まれた後は、Amazon S3に接続してデータを読み出し、以下のような柔軟なアプリケーション開発が可能です:

  • データアーカイブ:デバイスメッセージをAmazon S3のオブジェクトとして長期保存し、コンプライアンス要件やビジネスニーズに対応。
  • データ分析:S3から分析サービス(例:Snowflake)にデータを取り込み、予知保全やデバイス効率評価などのデータ分析を実施。

特徴と利点

EMQX PlatformのAmazon S3データ統合を利用すると、以下の特徴と利点をビジネスにもたらします:

  • メッセージ変換:メッセージはEMQX Platformのルール内で高度な処理・変換が可能であり、Amazon S3への保存や後続利用を容易にします。
  • 柔軟なデータ操作:S3 Sinkにより、特定のデータフィールドをAmazon S3バケットに簡単に書き込め、バケット名やオブジェクトキーの動的設定もサポートします。
  • 統合されたビジネスプロセス:S3 Sinkを通じてデバイスデータをAmazon S3の豊富なエコシステムと連携でき、データ分析やアーカイブなど多様なビジネスシナリオを実現します。
  • 低コストの長期保存:データベースと比較して、Amazon S3は高可用性かつ信頼性の高いコスト効率の良いオブジェクトストレージサービスであり、長期保存に適しています。

これらの特徴により、効率的で信頼性が高くスケーラブルなIoTアプリケーションを構築し、ビジネスの意思決定や最適化に役立てることができます。

はじめる前に

このセクションでは、EMQX PlatformでAmazon S3 Sinkを作成する前の準備について説明します。

前提条件

ネットワーク設定

EMQXがAmazon S3にパブリックネットワーク経由でアクセスするため、デプロイ環境でNAT Gatewayを有効にする必要があります。トップメニューバーのVASをクリックし、NAT Gatewayカードを選択するか、デプロイ概要ページの下部タブバーでEnable NAT Gateway serviceを選択してください。

S3バケットの準備

EMQX PlatformはAmazon S3およびその他のS3互換ストレージサービスをサポートしています。ここではAWSクラウドサービスを使ってS3バケットを作成します。

  1. AWS S3コンソールCreate bucketボタンをクリックし、バケット名やリージョンなどの必要情報を入力してS3バケットを作成します。詳細な操作はAWSドキュメントを参照してください。

  2. バケットの権限設定。バケット作成後、バケットを選択しPermissionsタブをクリックします。ニーズに応じてバケットをパブリック読み書き、プライベート、その他の権限に設定可能です。以下のJSONを参考に設定してください:

    json
    {
     "Version": "2012-10-17",
     "Statement": [
       {
         "Sid": "Stmt1ListBucket",
         "Effect": "Allow",
         "Action": ["s3:ListBucket"],
         "Resource": ["arn:aws:s3:::emqx-cloud-s3-connector-test"]
       },
       {
         "Sid": "Stmt2GetAndPutObject",
         "Effect": "Allow",
         "Action": ["s3:GetObject", "s3:PutObject"],
         "Resource": ["arn:aws:s3:::emqx-cloud-s3-connector-test/*"]
       },
       {
         "Effect": "Allow",
         "Action": "s3:ListAllMyBuckets",
         "Resource": "*"
       }
     ]
    }
  3. アクセスキーの取得。AWSコンソールでIAMサービスを検索・選択し、S3用の新規ユーザーを作成してAccess KeyとSecret Keyを取得します。

Amazon S3バケットの作成と設定が完了したら、EMQX PlatformでAmazon S3 Sinkを作成する準備が整います。

コネクターの作成

データ統合ルールを作成する前に、対応するコネクターを作成する必要があります。

  1. 初めてコネクターを作成する場合は、Data Persistenceカテゴリの中からAmazon S3を選択します。既にコネクターを作成済みの場合は、New Connectorを選択し、続けてData Persistenceカテゴリの中からAmazon S3を選択してください。
  2. Connector Name:システムが自動的にコネクター名を生成します。
  3. 接続情報を入力します:
    • Host:リージョンにより異なり、s3.{region}.amazonaws.comの形式です。
    • Port443を入力します。
    • Access Key IDおよびSecret Access Key:AWSで作成したアクセスキーを入力します。
    • Enable TLSをクリックして有効化し、TLS Verifyはオフにします。
  4. Testボタンをクリックします。Amazon S3サービスにアクセス可能であれば成功メッセージが表示されます。
  5. Newボタンをクリックして作成を完了します。

これでコネクターの作成が完了し、続いてS3サービスに書き込むデータを指定するルールとSinkの作成に進みます。

ルールの作成

このセクションでは、EMQX PlatformでソースMQTTトピックt/#からのメッセージを処理し、処理結果を設定済みのSinkを通じてS3のemqx-cloud-s3-connector-testバケットに書き込むルールの作成方法を示します。

  1. ルールエリアのNew Ruleをクリックするか、作成したコネクターのActions列にある新規ルールアイコンをクリックします。

  2. SQL editorにルールのマッチングSQL文を入力します。

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    SQLに不慣れな場合は、SQL ExamplesEnable DebugをクリックしてルールSQLの学習やテストが可能です。

  3. NextをクリックしてAmazon S3 Sinkを含むアクションを追加します。

  4. Connectorドロップダウンボックスから先ほど作成したコネクターを選択します。

  5. Bucketemqx-cloud-s3-connector-testを入力します。このフィールドは${var}形式のプレースホルダーもサポートしますが、対応するバケット名が事前にS3に作成されている必要があります。

  6. 必要に応じてACLを選択し、アップロードされるオブジェクトのアクセス権限を指定します。

  7. Upload Methodを選択します。2つの方法の違いは以下の通りです:

    • Direct Upload:ルールがトリガーされるたびに、設定済みのオブジェクトキーと内容に従ってデータを直接S3にアップロードします。バイナリや大きなテキストデータの保存に適していますが、多数のファイルが生成される可能性があります。
    • Aggregated Upload:複数のルールトリガー結果を1つのファイル(例:CSVファイル)にまとめてS3にアップロードします。構造化データの保存に適しており、ファイル数を減らし書き込み効率を向上させます。

    各方法で設定項目が異なります。選択した方法に応じて設定してください:

  8. Advanced Settingsを展開し、必要に応じて詳細設定を行います(任意)。詳細は詳細設定を参照してください。

  9. Confirmボタンをクリックしてルール作成を完了します。

  10. Successful new ruleポップアップでBack to Rulesをクリックし、データ統合の設定チェーンが完了します。

ルールのテスト

このセクションでは、Direct Upload方式で設定したルールのテスト方法を示します。

MQTTXを使ってトピックt/1にメッセージをパブリッシュします:

bash
mqttx pub -i EMQX Platform_c -t t/1 -m '{ "msg": "hello S3" }'

数件メッセージを送信した後、MinIOコンソールまたはAmazon S3コンソールで結果を確認します。

AWSマネジメントコンソールにログインし、Amazon S3コンソールを開きます:https://console.aws.amazon.com/s3/

バケット一覧からemqx-cloud-s3-connector-testバケットを選択してバケット内に入り、オブジェクト一覧で直前にパブリッシュしたメッセージがmsgオブジェクトとして正常に書き込まれていることを確認します。オブジェクトのチェックボックスを選択し、Downloadを選んでローカルにダウンロードし内容を確認できます。

詳細設定

このセクションでは、S3 Sinkの詳細設定オプションについて説明します。ダッシュボードでSinkを設定する際に、Advanced Settingsを展開して以下のパラメータをニーズに応じて調整可能です。

項目名説明デフォルト値
Buffer Pool SizeEMQX PlatformとS3間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへ送信する前にパフォーマンス最適化とスムーズなデータ伝送を担います。16
Request TTL「Request TTL」(Time To Live)は、リクエストがバッファに入ってから有効とみなされる最大時間(秒)を指定します。このタイマーはリクエストがバッファに入った時点からカウント開始されます。TTLを超えてバッファに滞留するか、送信後にS3からの応答やアックがタイムリーに返ってこない場合、リクエストは期限切れと判断されます。
Health Check IntervalSinkがS3との接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
Max Buffer Queue SizeS3 Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的にS3へ送信するための中継役を果たします。システム性能やデータ伝送要件に応じて調整してください。256
Query Modeメッセージ送信を最適化するため、synchronousまたはasynchronousのリクエストモードを選択可能です。非同期モードではS3への書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受信してからS3に到達するまでのタイムラグが発生する可能性があります。Asynchronous
In-flight Window「In-flight queue requests」は、送信済みだがまだ応答やアックを受け取っていないリクエストを指します。この設定はSinkがS3と通信中に同時に存在可能なin-flightリクエストの最大数を制御します。
Request Modeasynchronousの場合、このパラメータは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、この値を1に設定してください。
100