Skip to content

Amazon S3 Tables への MQTT データ取り込み

Amazon S3 Tables は、分析ワークロードに最適化された専用のストレージソリューションです。Apache Iceberg フォーマットで IoT センサーの読み取り値などの表形式データを、高性能かつスケーラブルかつ安全に保存できます。

EMQX は Amazon S3 Tables とのシームレスな連携をサポートし、MQTT メッセージを効率的に S3 テーブルバケットに保存できるようになりました。この連携により、柔軟かつスケーラブルな IoT データストレージが可能となり、Amazon Athena、Amazon Redshift、Amazon EMR などの AWS サービスを用いた高度な分析や処理を支援します。

本ページでは EMQX と Amazon S3 Tables 間のデータ統合について詳しく解説し、ルールおよび Sink の作成方法を実践的に案内します。

動作概要

EMQX は Amazon S3 Tables と連携し、MQTT データをリアルタイムかつ構造化された形で Amazon S3 に取り込み、長期保存および分析に活用できるようにします。この連携は EMQX のルールエンジンと S3 Tables Sink を活用し、MQTT メッセージを Apache Iceberg フォーマットのテーブルに変換して直接 S3 テーブルバケットにストリーミングします。

一般的な IoT シナリオでは以下のように動作します。

  • EMQX は MQTT ブローカーとして、デバイスの接続管理、メッセージのルーティング、データ処理を担当します。
  • Amazon S3 Tables は MQTT メッセージデータを表形式で耐久的かつクエリ可能なストレージとして保存します。
  • Amazon Athena は Iceberg テーブルの定義や保存データへの SQL クエリ実行に利用されます。

emqx-integration-s3-tables

ワークフローは以下の通りです。

  1. デバイスの EMQX への接続:IoT デバイスが MQTT 経由で EMQX に接続し、テレメトリデータをパブリッシュします。
  2. メッセージルーティングとルールマッチング:EMQX は組み込みのルールエンジンを使い、受信した MQTT メッセージを定義済みトピックにマッチさせ、特定のフィールドや値を抽出します。
  3. データ変換:EMQX のルールでメッセージペイロードをフィルタリング、変換、または強化し、ターゲットとなる Iceberg テーブルのスキーマに合わせます。
  4. Amazon S3 Tables への書き込み:ルールが S3 Tables Sink アクションをトリガーし、変換済みデータをバッチ処理して Iceberg 互換の書き込み API を用いて Amazon S3 Tables に送信します。データは Iceberg テーブルのパーティション下に Parquet ファイルとして永続化されます。
  5. クエリと分析:取り込んだデータは Amazon Athena でクエリ可能となり、他のデータセットと結合したり、Redshift Spectrum、Amazon EMR、Presto、Trino などのサードパーティ分析エンジンで分析可能です。

特長と利点

EMQX で Amazon S3 Tables データ統合を利用することで、以下の特長と利点を得られます。

  • リアルタイムストリーム処理:EMQX のルールエンジンにより、MQTT メッセージをリアルタイムに抽出・変換・条件付きルーティングし、S3 Tables に配信できます。
  • Iceberg ベースの S3 ストレージ:メッセージは Apache Iceberg テーブルに書き込まれ、従来のデータベースを不要にしつつ、SQL ライクなアクセスパターンを実現します。
  • 分析ツールとの簡単連携:S3 Tables にデータがあれば、Amazon Athena(SQL)、Amazon EMR、Redshift Spectrum、Presto、Trino、Snowflake などでクエリや分析が可能です。
  • 柔軟かつコスト効率の良いストレージ:Amazon S3 は高耐久かつ低コストのオブジェクトストレージを提供し、アーカイブ、コンプライアンス、デバイス生成データの時系列分析に最適です。

はじめる前に

このセクションでは、EMQX で Amazon S3 Tables Sink を作成するための準備について説明します。

前提条件

作業を進める前に、以下の内容を理解していることを確認してください。

EMQX の概念:

  • ルールエンジン:MQTT メッセージからデータを抽出・変換するロジックを定義する方法を理解してください。
  • データ統合:EMQX のコネクターおよび Sink の概念を理解してください。

AWS の概念:

AWS S3 Tables に不慣れな場合は、以下の主要用語を確認してください。

  • テーブルバケット:S3 Tables で Iceberg ベースのテーブルデータとメタデータを保存するための特殊な S3 バケット。
  • Amazon Athena:Amazon S3 に保存されたデータに対して直接 SQL クエリを実行できるサーバーレスクエリエンジン。DDL ステートメント(例:CREATE TABLE)を使ってスキーマや構造を定義可能。
  • カタログ:Athena のメタデータコンテナで、データベース(ネームスペース)やテーブルを管理。
  • データベース(ネームスペース):カタログ内のテーブルを論理的にグループ化したもの。
  • Iceberg テーブル:高性能でトランザクション対応のデータレイク向けテーブルフォーマット。スキーマ進化、パーティションプルーニング、タイムトラベルクエリをサポート。

S3 Tables バケットの準備

EMQX で Sink を作成する前に、AWS S3 Tables 側で MQTT データの保存先を準備する必要があります。以下を用意してください。

  • 実際のデータファイルを保存するテーブルバケット
  • 関連テーブルを論理的にまとめるネームスペース
  • 構造化された MQTT データを受け取る Iceberg ベースのテーブル
  1. AWS マネジメントコンソールにログインします。

  2. S3 サービスに移動し、左側のナビゲーションペインで Table buckets をクリックします。

  3. Create table bucket をクリックし、テーブルバケット名(例:mybucket)を入力して Create table bucket をクリックします。

  4. バケット作成後、そのバケットをクリックしてテーブル一覧画面に移動します。

  5. Create table with Athena をクリックすると、ネームスペースの入力を求めるポップアップが表示されます。

  6. Create a namespace を選択し、ネームスペース名を入力して作成を確定します。

  7. ネームスペース作成後、再度 Create table with Athena をクリックして続行します。

  8. Iceberg テーブルのスキーマを定義します。

    • Query table with Athena をクリックし、Query editor で以下を設定します。

      • Catalog セレクターからカタログ(例:バケット名が mybucket の場合は s3tablescatalog/mybucket)を選択。
      • Database セレクターから先ほど作成したネームスペースを選択。
    • 以下の DDL を実行してテーブルを作成し、テーブルタイプが ICEBERG に設定されていることを確認します。例:

      sql
      CREATE TABLE testtable (
        c_str string,
        c_long int )
      TBLPROPERTIES ('table_type' = 'ICEBERG');

      これは EMQX からの構造化 MQTT データを保存する Iceberg ベースのテーブルを定義します。

  9. テーブルの検証として、以下を実行しテーブルが作成されており空であることを確認します。

    sql
    select * from testtable

    TIP

    Athena で SQL を実行する際は、必ず正しいカタログとデータベース(ネームスペース)が選択されていることを確認してください。これにより、意図した S3 テーブルバケット内にテーブルが作成されます。

コネクターの作成

S3 Tables Sink を追加する前に、対応するコネクターを作成します。

  1. ダッシュボードの Integration -> Connector ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. コネクタータイプとして S3 Tables を選択し、次へ進みます。

  4. コネクター名を入力します。英数字の大文字・小文字の組み合わせで、ここでは my-s3-tables と入力します。

  5. 必要な接続情報を入力します。

    • S3Tables ARN:S3 テーブルバケットの Amazon リソースネーム(ARN)を入力します。AWS コンソールの Table buckets セクションで確認可能です。
    • Access Key IDSecret Access Key:S3 Tables と Athena へのアクセス権限を持つ IAM ユーザーまたはロールの AWS アクセス認証情報を入力します。
    • Enable TLS:S3 Tables への接続時は TLS がデフォルトで有効です。TLS 接続オプションの詳細は TLS for External Resource Access を参照してください。
    • Health Check Timeout:コネクターが S3 Tables との接続状態を自動的にヘルスチェックする際のタイムアウト時間を指定します。
  6. 残りの設定はデフォルト値を使用します。

  7. Create をクリックする前に、Test Connectivity をクリックして S3 Tables サービスへの接続確認が可能です。

  8. 最後に Create ボタンをクリックしてコネクター作成を完了します。

これでコネクター作成が完了しました。次にルールと Sink を作成し、S3 Tables へのデータ書き込みを指定します。

Amazon S3 Tables Sink を用いたルールの作成

このセクションでは、EMQX でソース MQTT トピック t/# からメッセージを処理し、処理結果を S3 Tables の mybucket バケットに書き込むルールの作成方法を示します。

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. ルール ID に my_rule を入力し、SQL エディターに以下のルール SQL を入力します。

    sql
    SELECT
      payload.str as c_str,
      payload.int as c_long
    FROM
        "t/#"

    TIP

    SQL に不慣れな場合は、SQL ExamplesEnable Debug をクリックしてルール SQL の学習や結果のテストが可能です。

    TIP

    出力フィールドが Iceberg テーブルのスキーマと一致していることを必ず確認してください。必須カラムが欠落または誤った名前の場合、データのテーブルへの追加に失敗する可能性があります。

  4. アクションを追加し、Action Type ドロップダウンから S3 Tables を選択します。アクションのドロップダウンはデフォルトの create action のままにするか、既存の S3 Tables アクションを選択してください。ここでは新しい Sink を作成してルールに追加します。

  5. Sink 名と任意で説明を入力します。

  6. Connector ドロップダウンから先ほど作成した my-s3-tables コネクターを選択します。新しいコネクターを素早く定義したい場合は、ドロップダウン横の Create ボタンをクリックしてください。設定パラメーターは コネクターの作成 を参照してください。

  7. Sink の設定を行います。

    • Namespace:テーブルが存在するネームスペース。複数セグメントの場合はドット区切りで指定(例:my.name.space)。
    • Table:データを追加する Iceberg テーブル名(例:testtable)。
    • Max Records:S3 へ書き込む前にバッチ処理する最大レコード数。到達すると即座にバッチをフラッシュしてアップロードします。
    • Time Interval:Max Records に達していなくても、指定した最大待機時間(ミリ秒)経過後にデータをフラッシュします。
  8. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細は フォールバックアクション を参照してください。

  9. 詳細設定 を展開し、必要に応じて高度な設定オプションを調整します(任意)。詳細は 詳細設定 を参照してください。

  10. 残りの設定はデフォルト値を使用し、Create ボタンをクリックして Sink 作成を完了します。作成成功後、ルール作成画面に戻り、新しい Sink がルールのアクションに追加されます。

  11. ルール作成画面で Create ボタンをクリックし、ルール作成全体を完了します。

これでルール作成が正常に完了しました。Rules ページで新規作成したルールを確認でき、Actions (Sink) タブで新しい S3 Tables Sink を確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーを視覚的に確認でき、トピック t/# のメッセージがルール my_rule によって解析され、S3 Tables に書き込まれる流れを把握できます。

ルールのテスト

このセクションでは、S3 Tables Sink を設定したルールのテスト方法を説明します。

  1. MQTTX を使い、トピック t/1 にメッセージをパブリッシュします。

    bash
    mqttx pub -i emqx_c -t t/1 -m '{ "str": "hello S3 Tables", "int": 123 }'

    このメッセージは payload.strpayload.int フィールドを含み、ルール SQL とテーブルスキーマに合致しています。

  2. Rules ページでルールのメトリクスおよび Sink の状態を監視します。新規の受信メッセージと送信メッセージがそれぞれ 1 件ずつ増えているはずです。

  3. Athena のクエリエディターを開き、正しい Catalog(例:s3tablescatalog/mybucket)と Database(ネームスペース)が選択されていることを確認します。

  4. 以下の SQL クエリを実行します。

    sql
    SELECT * FROM testtable

    以下のような行が表示されるはずです。

    c_strc_long
    hello S3 Tables123

詳細設定

このセクションでは、S3 Tables Sink の詳細設定オプションについて説明します。ダッシュボードの Sink 設定画面で Advanced Settings を展開し、用途に応じて以下のパラメーターを調整できます。

フィールド名説明デフォルト値
Buffer Pool SizeEMQX と S3 Tables 間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ターゲットサービスへの送信を最適化しスムーズなデータ転送を支えます。16
Request TTLバッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストがバッファ内にこの TTL を超えて滞留するか、送信後に S3 Tables からタイムリーな応答やアックが得られない場合、リクエストは期限切れとみなされます。
Health Check IntervalSink が S3 Tables との接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
Max Buffer Queue SizeS3 Tables Sink の各バッファワーカーがバッファリング可能な最大バイト数を指定します。ワーカーはデータを一時保存し、効率的なデータストリーム処理を実現します。システム性能やデータ転送要件に応じて調整してください。256
Query Modeメッセージ送信を最適化するため、synchronous(同期)または asynchronous(非同期)モードを選択可能です。非同期モードでは S3 Tables への書き込みが MQTT メッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージ到着前に受信する可能性があります。Asynchronous
In-flight Window「インフライトキューリクエスト」とは、送信済みで応答やアックをまだ受け取っていないリクエストを指します。この設定は Sink と S3 Tables 間の通信で同時に存在可能なインフライトリクエストの最大数を制御します。
Request Modeasynchronous の場合、同一 MQTT クライアントからのメッセージを厳密に順次処理したい場合は、この値を 1 に設定してください。
100