Skip to content

Amazon Kinesis への MQTT データストリーム送信

AWS Kinesis は、AWS 上で提供されるフルマネージドのリアルタイムストリーミングデータ処理サービスであり、ストリーミングデータの収集、処理、分析を容易に行えます。あらゆる規模のストリーミングデータを経済的かつ効率的にリアルタイム処理でき、高い柔軟性を持ち、数十万のソースからの大量のストリーミングデータを低レイテンシで処理可能です。

EMQX Cloud は Amazon Kinesis Data Streams とのシームレスな連携をサポートし、大規模な IoT デバイスを接続してリアルタイムのメッセージ収集と送信を実現します。このデータ統合を通じて、Amazon Kinesis Data Streams に接続し、リアルタイムデータ分析や複雑なストリーム処理を行えます。

本ページでは、Kafka データ統合の機能的特徴を詳しく紹介し、実際の作成手順を案内します。内容は、Amazon Kinesis コネクターの作成、ルールの作成、ルールのテストを含み、MQTT プロトコル経由でシミュレートした温度・湿度データを EMQX Cloud に報告し、設定したデータ統合を通じて Amazon Kinesis にデータを保存する方法を示します。

動作の仕組み

Amazon Kinesis データ統合は、EMQX の標準機能として設計されており、ユーザーが MQTT データストリームを Amazon Kinesis とシームレスに連携し、IoT アプリケーション開発における豊富なサービスと機能を活用できるよう支援します。

EMQX Cloud と Amazon Kinesis の連携

EMQX はルールエンジンと Sink を介して MQTT データを Amazon Kinesis に転送します。全体の流れは以下の通りです。

  1. IoT デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
  2. ルールエンジンがメッセージを処理:組み込みのルールエンジンは、特定のトピックにマッチする MQTT メッセージを処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  3. Amazon Kinesis へのブリッジ:ルールによってトリガーされるアクションでメッセージを Amazon Kinesis に転送します。パーティションキー、書き込み先のデータストリーム、メッセージ形式をカスタム設定でき、柔軟なデータ統合を実現します。

MQTT メッセージデータが Amazon Kinesis に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

  • リアルタイムデータ処理・分析:Amazon Kinesis の強力なデータ処理・分析ツールとストリーミング機能を活用し、メッセージデータのリアルタイム処理・分析を行い、価値あるインサイトや意思決定支援を得られます。
  • イベント駆動機能:Amazon のイベント処理をトリガーし、動的かつ柔軟な関数トリガーや処理を実現します。
  • データ保存・共有:メッセージデータを Amazon Kinesis のストレージサービスに送信し、大量データを安全に保存・管理します。これにより他の Amazon サービスと共有・分析し、多様なビジネスニーズに対応可能です。

特徴と利点

EMQX Cloud と AWS Kinesis Data Streams 間のデータ統合は、以下の機能と利点をビジネスにもたらします。

  • 信頼性の高いデータ送信と順序保証:EMQX と AWS Kinesis Data Streams は共に信頼性の高いデータ送信機構を提供します。EMQX は MQTT プロトコルを通じてメッセージの確実な送信を保証し、AWS Kinesis Data Streams はパーティションとシーケンス番号でメッセージの順序を保証します。これにより、デバイスから送信されたメッセージが正確に目的地に届き、正しい順序で処理されます。
  • リアルタイムデータ処理:デバイスからの高頻度データは、EMQX のルール SQL によるリアルタイムの一次処理を経て、MQTT メッセージのフィルタリング、抽出、付加、変換が容易に行えます。AWS Kinesis Data Streams に送信後は、AWS Lambda や AWS 管理の Apache Flink と連携してさらなるリアルタイム分析が可能です。
  • 弾力的なスケーラビリティ対応:EMQX は数百万の IoT デバイス接続を容易に実現し、弾力的なスケーラビリティを提供します。一方、AWS Kinesis Data Streams はオンデマンドの自動リソース割り当てと拡張を行います。両者を組み合わせたアプリケーションは接続数やデータ量の増加に合わせてスケールし、ビジネスの成長に継続的に対応します。
  • パーシステンス(永続化)されたデータ保存:AWS Kinesis Data Streams はパーシステンス機能を備え、毎秒数百万件のデバイスデータストリームを確実に保存します。必要に応じて過去データの取得が可能で、オフライン分析や処理を支援します。

AWS Kinesis Data Streams を利用したストリーミングデータパイプラインの構築は、EMQX と AWS プラットフォームの統合の難易度を大幅に軽減し、ユーザーにより豊かで柔軟なデータ処理ソリューションを提供します。これにより、EMQX ユーザーは AWS 上で機能的に充実した高性能なデータ駆動型アプリケーションを構築できます。

はじめる前に

このセクションでは、EMQX Cloud で AWS Kinesis データ統合を作成する前に必要なセットアップ手順を説明します。ネットワークの設定、必要な AWS リソースの作成、任意でローカルのテスト環境の構築を行います。

前提条件

作業を進める前に、EMQX データ統合で使用される以下の概念に慣れていることを確認してください。

ネットワークの設定

EMQX が AWS Kinesis と通信できるようにネットワークアクセスを設定する必要があります。ここでは以下の2つのサポートされる方法を説明します。

  • NAT ゲートウェイ:デプロイメントに NAT ゲートウェイを設定し、パブリック IP を使って AWS サービスにアクセスします。
    設定方法は NAT Gateway を参照してください。
  • AWS PrivateLink:EMQX と AWS Kinesis 間に PrivateLink 接続を確立します。
    以下の手順に従い、VPC エンドポイントを作成し、PrivateLink 接続を設定してください。

Kinesis へのアクセス用インターフェース VPC エンドポイントの作成

この節では、AWS でインターフェース VPC エンドポイントを作成する手順を説明します。このエンドポイントにより、パブリックインターネットを経由せずに VPC と AWS Kinesis 間でプライベートかつ安全な接続が可能になります。

  1. Amazon VPC コンソール を開きます。
  2. ナビゲーションペインで Endpoints を選択します。
  3. Create endpoint をクリックします。
  4. TypeAWS services を選択します。
  5. Service nameKinesis サービスを選択します。
  6. VPCSubnet(s) を設定し、EMQX デプロイメントからのアクセスを許可するよう Security Group を設定します。
  7. Create Endpoint をクリックし、ターゲットグループ設定時に使用するため IP アドレスを控えます。

詳細は AWS 公式ドキュメント をご参照ください。

VPC エンドポイント作成

EMQX と AWS Kinesis 間で安全かつプライベートな通信を可能にするため、特にパブリックインターネットアクセスがない環境では PrivateLink 接続の確立が必要です。以下の手順で必要な AWS リソースの設定と EMQX Cloud での PrivateLink サポート有効化を行います。

  1. AWS プラットフォームでの準備手順の完了 のステップ1~3を完了します。

    • ステップ3「ロードバランシング用ターゲットグループの作成」では以下を設定します。

  2. AWS プラットフォームでの準備手順の完了 のステップ4に進み、ネットワークロードバランサー(NLB)を作成・設定します。

    • アベイラビリティゾーンが EMQX デプロイメントと一致していることを確認してください。

    • リスナーProtocol:PortTCP:443 に設定。

      ロードバランサー設定

  3. エンドポイントサービスの作成 を実施します。

    • 利用可能なロードバランサーで前ステップで作成した NLB を選択。
  4. EMQX Cloud で PrivateLink を有効化 を行います。

    • 生成された PrivateLink アドレスを保存してください。Kinesis コネクター設定時に必要となります。

      PrivateLink ステータス

Amazon Kinesis Data Streams でストリームを作成

このステップでは、AWS マネジメントコンソールを使って Kinesis Data Stream を作成します(詳細はこのチュートリアルを参照)。作成したデータストリームは EMQX Cloud から送信されるメッセージの送信先となります。

  1. AWS マネジメントコンソールにサインインし、Kinesis コンソールを開きます。
  2. ナビゲーションバーでリージョンセレクターを展開し、リージョンを選択します。
  3. Create data stream を選択します。
  4. 「Create Kinesis stream」ページでデータストリーム名を入力し、On-demand または Provisioned のキャパシティモードを選択します。

Amazon Kinesis Data Streams のローカルエミュレーション(任意)

開発やテスト目的で、LocalStack を使いローカル環境で Kinesis サービスをエミュレートできます。これによりクラウドに接続せずに AWS 互換サービスをローカルで実行可能です。

  1. Docker イメージを使ってインストール・起動します。

    bash
    # LocalStack Docker イメージをローカルで起動
    docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1
    
    # コンテナにアクセス
    docker exec -it localstack bash
  2. シャード数1で my_stream という名前のストリームを作成します。

    bash
    awslocal kinesis create-stream --stream-name "my_stream" --shard-count 1

Amazon Kinesis コネクターの作成

データ統合ルールを作成する前に、EMQX Cloud から Kinesis サービスへデータ送信を可能にするための Amazon Kinesis コネクターを作成します。

  1. デプロイメント画面に移動し、左ナビゲーションメニューから Data Integration をクリックします。

  2. 初めてコネクターを作成する場合は、Data Forward カテゴリの中から Amazon Kinesis を選択します。
    既存のコネクターがある場合は、New Connector をクリックし、Data Forward から Amazon Kinesis を選択してください。

  3. New Connector ページで以下の項目を設定します。

    • Amazon Kinesis Endpoint
      • NAT Gateway を利用する場合は、https://kinesis.<region>.amazonaws.com の形式で入力します。<region> は AWS Kinesis サービスがホストされているリージョンに置き換えてください。
      • AWS PrivateLink を利用する場合は、Kinesis サービスの PrivateLink アドレスを入力します。https:// で始まることを確認してください。
      • LocalStack を利用する場合は、http://localhost:4566 を使用します。
    • AWS Access Key IDアクセスキー ID を入力します。LocalStack 利用時は任意の値で構いません。
    • AWS Secret Access Keyシークレットアクセスキー を入力します。LocalStack 利用時は任意の値で構いません。
    • その他の設定はデフォルトのままか、ビジネスニーズに合わせて調整してください。
  4. Test ボタンをクリックし、Kinesis サービスへの接続を検証します。接続成功時は確認メッセージが表示されます。

    Kinesis コネクター

  5. New をクリックしてコネクターの作成を完了します。

ルールの作成

次に、書き込み対象のデータを指定し、処理済みデータを Amazon Kinesis に転送するアクションをルールに追加します。

  1. ルールエリアで New Rule をクリック、または作成したコネクターの Actions 列にある新規ルールアイコンをクリックします。

  2. SQL エディターにルールマッチング用の SQL 文を入力します。以下の例は、temp_hum/emqx トピックに送信されたメッセージから報告時刻 up_timestamp、クライアント ID、メッセージ本文(ペイロード)を読み取り、温度と湿度を抽出します。

    sql
    SELECT 
    timestamp,
    clientid, 
    payload.temp as temp, 
    payload.hum as hum
    
    FROM
    "temp_hum/emqx"

    Try It Out を使ってデータ入力をシミュレートし、結果をテストできます。

  3. Next をクリックしてアクションを追加します。

  4. Connector ドロップダウンから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します。

    • Action Name:システムが自動生成します。
    • Amazon Kinesis Stream:Amazon Kinesis Data Streams で作成したストリーム名を入力します。
    • Partition Key:このストリームに送信されるレコードに関連付けるパーティションキーを入力します。${variable_name} 形式のプレースホルダーも使用可能です(次のステップで例を示します)。
    • Payload Template フィールドは空欄のままにするか、テンプレートを定義します。
      • 空欄の場合、MQTT メッセージの可視入力(clientid、topic、payload など)を JSON 形式でエンコードします。

      • テンプレートを定義する場合、${variable_name} 形式のプレースホルダーが MQTT コンテキストの対応値に置換されます。

        例えば、MQTT メッセージのトピックが my/topic の場合、${topic} はそれに置換されます。テンプレートは要件に応じて柔軟に調整可能です。以下は温度と湿度の値を転送する例です。

        bash
        {"timestamp": ${timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
    • 高度な設定(任意):バッファキューやバッチモードの使用を必要に応じて選択します。
  6. Confirm ボタンをクリックしてルール作成を完了します。

  7. Successful new rule ポップアップで Back to Rules をクリックし、データ統合の設定チェーンを完了します。

ルールのテスト

温度・湿度データの報告をシミュレートするために MQTTX の使用を推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTX を使いデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {"temp":"23.5","hum":"32.6"}
  2. コンソールで運用データを確認します。ルール一覧でルール ID をクリックすると、ルールの統計情報およびそのルールに属するすべてのアクションの統計が表示されます。

  3. Amazon Kinesis Data Viewer にアクセスし、レコード取得時にメッセージが表示されることを確認します。

    Amazon Kinesis 結果

  4. LocalStack を使った確認方法

    LocalStack を利用している場合は、以下の手順で受信データを確認します。

    • EMQX にメッセージ送信前に、以下のコマンドで ShardIterator を取得します。
    bash
    awslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST
    {
    "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    }
    • MQTTX でトピック temp_hum/emqx にメッセージを送信します。

    • 以下のコマンドでレコードを読み取り、受信データをデコードします。

    bash
    awslocal kinesis get-records --shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    {
        "Records": [
            {
                "SequenceNumber": "49642650476690467334495639799144299020426020544120356866",
                "ApproximateArrivalTimestamp": 1689389148.261,
                "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9",
                "PartitionKey": "key",
                "EncryptionType": "NONE"
            }
        ],
        "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=",
        "MillisBehindLatest": 0
    }
    
    echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d
    {"temp":"23.5","hum":"32.6"}