Skip to content

GCP Pub/Sub への MQTT データ取り込み

Google Cloud Pub/Sub は、非常に高い信頼性とスケーラビリティを実現するために設計された非同期メッセージングサービスです。EMQX は、MQTT データのリアルタイム抽出、処理、分析のために Google Cloud Pub/Sub とのシームレスな統合をサポートしています。Cloud Functions、App Engine、Cloud Run、Kubernetes Engine、Compute Engine などのさまざまな Google Cloud サービスにデータをプッシュできます。また、Google Cloud から MQTT へのデータ配信も可能で、ユーザーが GCP 上で迅速に IoT アプリケーションを構築できるよう支援します。

本ページでは、EMQX と GCP Pub/Sub 間のデータ統合について、作成および検証の実践的な手順を含めて包括的に紹介します。

動作概要

GCP Pub/Sub データ統合は、EMQX の標準機能として提供されており、MQTT データストリームを Google Cloud とシームレスに連携させ、IoT アプリケーション開発のための豊富なサービスと機能を活用できるよう設計されています。

GCP_bridge_architect

EMQX はルールエンジンと Sink を介して MQTT データを GCP Pub/Sub に転送します。GCP Pub/Sub のプロデューサー役割の例を挙げると、全体の流れは以下の通りです。

  1. IoT デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
  2. ルールエンジンがメッセージを処理:組み込みのルールエンジンは、特定のソースからの MQTT メッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールをマッチングし、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理を行います。
  3. GCP Pub/Sub へのブリッジング:ルールはメッセージを GCP Pub/Sub に転送するアクションをトリガーし、データプロパティ、オーダーキー、MQTT トピックと GCP Pub/Sub トピックのマッピングを簡単に設定できます。これにより、より豊富なコンテキスト情報と順序保証を持つデータ統合が可能となり、柔軟な IoT データ処理を実現します。

MQTT メッセージデータが GCP Pub/Sub に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

  • リアルタイムデータ処理と分析:Dataflow、BigQuery、Pub/Sub のストリーミング機能など、強力な Google Cloud のデータ処理・分析ツールを活用し、メッセージデータのリアルタイム処理と分析を行い、価値ある洞察や意思決定支援を得られます。
  • イベント駆動の機能実装:Cloud Functions や Cloud Run などの Google Cloud イベント処理をトリガーし、動的かつ柔軟な機能の起動と処理を実現します。
  • データ保存と共有:Cloud Storage や Firestore などの Google Cloud ストレージサービスにメッセージデータを送信し、大量データの安全な保存と管理を行います。これにより、他の Google Cloud サービスと連携してデータの共有や分析が可能となり、多様なビジネスニーズに対応します。

特長とメリット

GCP Pub/Sub とのデータ統合は以下のような特長とメリットを提供します。

  • 堅牢なメッセージングサービス:EMQX と GCP Pub/Sub は共に高可用性とスケーラビリティを備え、大規模なメッセージストリームの信頼性の高い受信、配信、処理を保証します。IoT データの順序制御、メッセージの品質保証、パーシステンス(永続化)をサポートし、メッセージの確実な伝送と処理を実現します。
  • 柔軟なルールエンジン:組み込みのルールエンジンにより、特定のソースメッセージやイベントをトピックマッチングに基づいて処理可能です。データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの操作が可能で、GCP Pub/Sub と組み合わせてさらなる処理や分析を行えます。
  • 豊富なコンテキスト情報:GCP Pub/Sub データ統合を通じて、メッセージにより豊かなコンテキスト情報を付加できます。クライアント属性を Pub/Sub 属性やソーティングキーにマッピングすることで、後続のアプリケーション開発やデータ処理においてより精緻な分析や処理が可能となります。

まとめると、EMQX と GCP Pub/Sub の統合により、高信頼・高スケーラビリティなメッセージ配信と、データ分析・統合のための豊富なツールとサービスを活用できます。これにより、堅牢な IoT アプリケーションの構築や、イベント駆動型の柔軟なビジネスロジックの実装が可能となります。

はじめる前に

このセクションでは、GCP Pub/Sub データ統合の作成を開始する前に完了すべき準備について説明します。

前提条件

GCP でサービスアカウントキーを作成する

GCP Pub/Sub サービスを利用するには、サービスアカウントとサービスアカウントキーを作成する必要があります。

  1. GCP アカウントでサービスアカウントを作成します。サービスアカウントには、対象トピック(例:Pub/Sub Editor ロール)へのメッセージの検査/読み取りおよびパブリッシュ権限が必要です。

  2. 作成したサービスアカウントのメールアドレスをクリックし、Key タブを選択します。Add key のドロップダウンリストから Create new key を選択してサービスアカウントキーを作成し、JSON 形式でダウンロードします。

    TIP

    サービスアカウントキーは後で使用するため、安全に保管してください。

    サービスアカウントキー

GCP で Workload Identity Federation を設定する

Workload Identity Federation(WIF)を利用すると、EMQX は長期間有効なサービスアカウントキーを使わずに GCP リソースにアクセスできます。EMQX は外部 ID プロバイダー(例:Microsoft Azure)からトークンを取得し、GCP の Security Token Service を介して一時的な GCP トークンと交換し、そのトークンを使って GCP サービスアカウントをなりすまします。トークンの更新は自動で行われます。

WIF を利用するには、コネクター作成前に GCP プロジェクトで以下を完了してください。

  1. Google Cloud コンソールで IAM & Admin -> Workload Identity Federation に移動し、ワークロードアイデンティティプールを作成します。Pool IDProject Number を控えておきます。

  2. プールにプロバイダーを追加し、Provider ID を控えます。OIDC ベースの認証の場合、外部 ID プロバイダーから OAuth 2.0 クライアント認証情報(クライアント ID、クライアントシークレット、トークンエンドポイント URI)を取得します。

  3. Pub/Sub トピックにアクセス可能な GCP サービスアカウントをなりすますために、ワークロードアイデンティティプールに権限を付与します。コネクター設定時にサービスアカウントのメールアドレスが必要です。

    TIP

    詳細な手順はWorkload Identity Federation の設定を参照してください。

例:Microsoft Azure (Entra ID)

Microsoft Entra IDで API を公開するアプリケーションを登録し、クライアントシークレットを作成します。コネクター設定時に以下の値を使用します。

コネクター項目
Endpoint URIhttps://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/token
OAuth Client IDアプリケーション(クライアント)ID、形式は api://<application-id>
OAuth Client Secretアプリケーション用に生成したクライアントシークレット
OAuth Request Scopeapi://<application-id>/.default

補足

scope はアプリケーションのオーディエンス(aud)と完全に一致させる必要があります。そうしないと GCP STS とのトークン交換に失敗します。詳細は Microsoft のOAuth 2.0 クライアント認証フローを参照してください。

サービスアカウントに WIF プールへのアクセス権を付与する際は、アプリケーション ID ではなく Object ID を Subject 値として使用してください。Object ID は Azure ポータルのアプリケーションの概要ページの Enterprise applications セクションで確認できます。

GCP でトピックの作成と管理

EMQX で GCP Pub/Sub データ統合を設定する前に、GCP でトピックを作成し、基本的な管理操作に慣れておく必要があります。

  1. Google Cloud コンソールで Pub/Sub -> Topics ページに移動します。詳細な手順はトピックの作成と管理を参照してください。

    TIP

    サービスアカウントには該当トピックへのパブリッシュ権限が必要です。

  2. Topic ID フィールドにトピックの ID を入力し、Create topic をクリックします。

    GCP コンソールでのトピック作成
  3. Subscriptions ページに移動し、リストの中から作成したトピックの Topic ID をクリックします。トピックに対してサブスクリプションを作成します。

    • Delivery typePull を選択します。
    • Message retention duration7 日を選択します。

    詳細はGCP Pub/Sub サブスクリプションを参照してください。

    トピックへのサブスクリプション追加
  4. Subscription ID をクリックし、Messages -> Pull でトピックに送信されたメッセージを確認できます。

    サブスクリプション IDサブスクリプションメッセージのプル

GCP Pub/Sub プロデューサーコネクターの作成

GCP Pub/Sub プロデューサー Sink アクションを追加する前に、EMQX と GCP Pub/Sub 間の接続を確立するための GCP Pub/Sub プロデューサーコネクターを作成します。

  1. EMQX ダッシュボードで Integration -> Connector をクリックします。
  2. ページ右上の Create をクリックし、コネクター選択ページで Google PubSub Producer を選択して Next をクリックします。
  3. 名前と説明を入力します(例:my-pubsubproducer)。名前は GCP Pub/Sub プロデューサー Sink とコネクターを関連付けるために使用され、クラスター内で一意である必要があります。
  4. Authentication ドロップダウンから以下の認証方法のいずれかを選択し、対応するフィールドに入力します。
    • Service Account JSON:前述のサービスアカウントキーの作成でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
    • Workload Identity Federation (WIF):以下のフィールドに入力します。前提条件はWorkload Identity Federation の設定を参照してください。
      • GCP Project ID:コネクターがアクセスするリソースのプロジェクト ID。
      • GCP Project Number:コネクターがアクセスするリソースのプロジェクト番号。
      • Service Account Email:なりすますサービスアカウントのメールアドレス。
      • Workload Identity Pool ID:WIF トークン交換に使用するワークロードアイデンティティプールの ID。
      • Workload Identity Provider ID:WIF トークン交換に使用するワークロードアイデンティティプロバイダーの ID。
      • Initial Token Configuration で認証タイプを選択し、対応するフィールドに入力します。現在サポートされているのは OIDC with Client Credentials Grant Type のみです。
        • Endpoint URI:OIDC プロバイダーの OAuth トークンエンドポイント URI。
        • OAuth Client ID:OAuth サーバーからトークンを取得するためのクライアント ID。
        • OAuth Client Secret:OAuth サーバーからトークンを取得するためのクライアントシークレット。
        • OAuth Request Scope:OAuth アクセストークン取得時に指定する scope(プロバイダーにより必要な場合)。
  5. Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテストできます。
  6. ページ下部の Create ボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックして Sink を指定したルール作成を続行できます。詳細はGoogle Pub/Sub プロデューサー Sink を使ったルールの作成を参照してください。

Google Pub/Sub プロデューサー Sink を使ったルールの作成

このセクションでは、GCP Pub/Sub に保存するデータを指定するルールの作成方法を示します。

  1. EMQX ダッシュボードで Integration -> Rules をクリックします。

  2. ページ右上の Create をクリックします。

  3. ルール ID に my_rule を入力します。

  4. SQL Editor でルールを設定します。例えば、トピック /devices/+/events の MQTT メッセージを GCP Pub/Sub に保存する場合、以下の SQL を使用できます。

    注意:独自の SQL を指定する場合、SELECT 部分に Sink のペイロードテンプレートで必要なすべてのフィールドを含めてください。

    sql
    SELECT
      *
    FROM
      "/devices/+/events"

    注意:初心者の方は SQL ExamplesEnable Test をクリックして SQL ルールを学習・テストできます。

  5. + Add Action ボタンをクリックして、ルールでトリガーされるアクションを定義します。Type of Action ドロップダウンから Google PubSub Producer を選択し、ルールで処理したデータを GCP Pub/Sub に送信するようにします。

  6. Action ドロップダウンは Create Action のままにします。既存の GCP Pub/Sub プロデューサー Sink を選択することも可能です。本例では新しい Sink を作成してルールに追加します。

  7. Name フィールドに Sink の名前を入力します。名前は英数字の組み合わせにしてください。

  8. Connector ドロップダウンから先ほど作成した my_pubsubprodcer を選択します。隣のボタンから新しいコネクターを作成することも可能です。設定パラメータはコネクターの作成を参照してください。

  9. GCP PubSub Topic に、GCP でのトピック作成と管理で作成したトピック ID my-iot-core を入力します。

  10. Payload Template にテンプレートを定義するか、空欄のままにします。

    • 空欄の場合、MQTT メッセージのクライアント ID、トピック、ペイロードなどの可視フィールドを JSON 形式でエンコードします。
    • 定義済みテンプレートを使用する場合、${variable_name} 形式のプレースホルダーは MQTT コンテキストの対応値に置き換えられます。例えば ${topic} は MQTT メッセージのトピック my/topic に置き換わります。
  11. Attributes TemplateOrdering Key Template に、送信メッセージの属性およびオーダーキーのフォーマット用テンプレートを定義します(任意)。

    • Attributes はキー・値ともに ${variable_name} 形式のプレースホルダーを使用可能で、MQTT コンテキストから値を抽出します。キーのテンプレートが空文字列になる場合、そのキーは GCP Pub/Sub 送信メッセージから省略されます。
    • Ordering Key${variable_name} 形式のプレースホルダーを使用可能で、解決結果が空文字列の場合は GCP Pub/Sub 送信メッセージの orderingKey フィールドは設定されません。
  12. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。詳細はフォールバックアクションを参照してください。

  13. 詳細設定(任意):詳細はSink の機能を参照してください。

  14. Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテストできます。

  15. Create ボタンをクリックして Sink の設定を完了すると、新しい Sink が Action Outputs タブに表示されます。

  16. Create Rule ページに戻り、Create をクリックしてルールを作成します。

これでルールが正常に作成されました。Integration -> Rules ページで新規ルールを確認できます。Actions(Sink) タブで新しい Google PubSub プロデューサー Sink を確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーが表示され、トピック /devices/+/events のメッセージがルール my_rule によって解析され、GCP Pub/Sub に送信・保存されていることを確認できます。

プロデューサールールのテスト

  1. MQTTX を使ってトピック /devices/+/events にメッセージを送信します。

    bash
    mqttx pub -i emqx_c -t /devices/+/events -m '{ "msg": "hello GCP PubSub" }'
  2. Sink の稼働状況を確認し、新しい受信メッセージと送信メッセージがそれぞれ1件あることを確認します。

  3. GCP の Pub/Sub -> Subscriptions に移動し、MESSAGES タブをクリックするとメッセージが表示されます。

GCP Pub/Sub コンシューマーコネクターの作成

GCP Pub/Sub コンシューマー Sink を追加する前に、EMQX と GCP Pub/Sub 間の接続を確立するための GCP Pub/Sub コンシューマーコネクターを作成します。

  1. EMQX ダッシュボードで Integration -> Connector をクリックします。
  2. ページ右上の Create をクリックし、コネクター選択ページで Google PubSub Consumer を選択して Next をクリックします。
  3. 名前と説明を入力します(例:my-pubsubconsumer)。名前は GCP Pub/Sub コンシューマー Sink とコネクターを関連付けるために使用され、クラスター内で一意である必要があります。
  4. Authentication ドロップダウンから以下の認証方法のいずれかを選択し、対応するフィールドに入力します。
    • Service Account JSON:前述のサービスアカウントキーの作成でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
    • Workload Identity Federation (WIF):以下のフィールドに入力します。前提条件はWorkload Identity Federation の設定を参照してください。
      • GCP Project ID:コネクターがアクセスするリソースのプロジェクト ID。
      • GCP Project Number:コネクターがアクセスするリソースのプロジェクト番号。
      • Service Account Email:なりすますサービスアカウントのメールアドレス。
      • Workload Identity Pool ID:WIF トークン交換に使用するワークロードアイデンティティプールの ID。
      • Workload Identity Provider ID:WIF トークン交換に使用するワークロードアイデンティティプロバイダーの ID。
      • Initial Token Configuration で認証タイプを選択し、対応するフィールドに入力します。現在サポートされているのは OIDC with Client Credentials Grant Type のみです。
        • Endpoint URI:OIDC プロバイダーの OAuth トークンエンドポイント URI。
        • OAuth Client ID:OAuth サーバーからトークンを取得するためのクライアント ID。
        • OAuth Client Secret:OAuth サーバーからトークンを取得するためのクライアントシークレット。
        • OAuth Request Scope:OAuth アクセストークン取得時に指定する scope(プロバイダーにより必要な場合)。
  5. Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテストできます。
  6. ページ下部の Create ボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックして GCP Pub/Sub コンシューマーソースを使ったルール作成を続行できます。詳細はGoogle Pub/Sub コンシューマーソースを使ったルールの作成を参照してください。

Google Pub/Sub コンシューマーソースを使ったルールの作成

このセクションでは、GCP Pub/Sub からメッセージを消費し、EMQX に転送するルールの作成方法を示します。Google PubSub コンシューマーソースを作成・設定し、ルールのデータ入力として追加します。また、GCP Pub/Sub から受信したメッセージを EMQX に転送するために Republish アクションをルールに追加します。

  1. EMQX ダッシュボードで Integration -> Rules をクリックします。

  2. ページ右上の Create をクリックします。

  3. ルール ID に my_rule_source を入力します。

  4. 右側の Data Inputs タブでデフォルトの Input Messages を削除し、Add Input をクリックします。

  5. Input Type ドロップダウンから Google PubSub Consumer を選択します。

  6. Source ドロップダウンはデフォルトの Create Source のままにします。本例では新しいソースを作成してルールに追加します。

  7. ソースの Name と(任意で)Description を入力します。名前は英数字の組み合わせにしてください(例:my-gcppubsub-source)。

  8. Connector ドロップダウンから先ほど作成した my_pubsubconsumer を選択します。隣のボタンから新しいコネクターを作成することも可能です。設定パラメータはコネクターの作成を参照してください。

  9. GCP Pub/Sub からメッセージを消費するために、以下の情報を設定します。

    • GCP PubSub Topic:消費対象の GCP Pub/Sub トピック名を入力します(例:my-iot-core)。
    • Maximum Messages to Pull:1回のプルリクエストで GCP Pub/Sub から取得する最大メッセージ数を指定します。実際の取得数は指定値より少ない場合があります。
  10. 詳細設定(任意):詳細はSink の機能を参照してください。

  11. Create をクリックする前に、Test Connectivity をクリックして GCP Pub/Sub サーバーへの接続が成功するかテストできます。

  12. Create をクリックしてソース作成を完了します。ソースはルールの Data Inputs タブに追加され、SQL Editor のルールは以下のようになります。

    sql
    SELECT
      *
    FROM
      "$bridges/gcppubsub:my-gcppubsub-source"

    注意:初心者の方は SQL ExamplesEnable Test をクリックして SQL ルールを学習・テストできます。

    my-gcppubsub-source からのメッセージは以下の GCP Pub/Sub から MQTT トピックへのマッピング表に示すフィールドにアクセス可能です。ルール SQL を調整してデータ処理が可能です。本例ではデフォルトの SQL を使用します。

    フィールド名説明
    attributes(任意)文字列のキー・バリューのペアを含むオブジェクト(存在する場合)
    message_idGCP Pub/Sub がこのメッセージに割り当てたメッセージ ID
    ordering_key(任意)メッセージの順序付けキー(存在する場合)
    publishing_timeGCP Pub/Sub によって定義されたメッセージのタイムスタンプ
    topic発信元の GCP Pub/Sub トピック
    value(任意)メッセージのペイロード(存在する場合)

    注意:各 GCP Pub/Sub から MQTT トピックへのマッピングは、ユニークな GCP Pub/Sub トピック名を含む必要があります。すなわち、同じ GCP Pub/Sub トピックが複数のマッピングに存在してはいけません。

これで GCP Pub/Sub コンシューマーソースの作成は完了しましたが、メッセージはまだ直接 EMQX にパブリッシュされません。次に、ルールへの Republish アクションの追加 の手順を続けて、Republish アクションを作成しルールに追加します。

ルールへの Republish アクションの追加

このセクションでは、GCP Pub/Sub コンシューマーソースから消費したメッセージを転送し、EMQX トピック t/1 にパブリッシュするための Republish アクションをルールに追加する方法を示します。

  1. ページ右側の Action Output タブを選択し、Add Action ボタンをクリックして、Type of Action ドロップダウンから Republish アクションを選択します。

  2. メッセージ再パブリッシュの設定を入力します。

    • Topic:MQTT にパブリッシュするトピックを入力します。ここでは t/1 とします。

    • QoS012${qos} のいずれかを選択するか、他のフィールドから QoS を設定するためのプレースホルダーを入力します。${qos} を選択すると元のメッセージの QoS に従います。

    • Retaintrue または false を選択します。メッセージをリテインメッセージとしてパブリッシュするかを決定します。プレースホルダーを入力して他のフィールドからリテインフラグを設定することも可能です。本例では false を選択します。

    • Payload:転送するメッセージペイロードのテンプレートを設定します。空欄の場合はルールの出力結果をそのまま転送します。ここでは ${payload} を入力し、ペイロードのみを転送します。

      MQTT ペイロードテンプレートのデフォルト値は ${.} で、利用可能なすべてのデータを JSON オブジェクトとしてエンコードします。例えば、すべてのオプションフィールドを含む GCP Pub/Sub メッセージに対して ${.} をテンプレートにすると、以下のようになります。

      json
      {
        "attributes": {"attribute_key": "attribute_value"},
        "message_id": "1679665968238",
        "ordering_key": "my-ordering-key",
        "topic": "my-pubsub-topic",
        "publishing_time": "2023-08-18T14:15:18.470Z",
        "value": "my payload"
      }

      GCP Pub/Sub メッセージのサブフィールドにはドット表記でアクセス可能です。例えば ${.value} は GCP Pub/Sub メッセージの値に解決され、${.attributes.h1}h1 というメッセージ属性キーの値に解決されます。存在しない値は空文字列に置き換えられます。

    • MQTT 5.0 メッセージプロパティ:デフォルトで無効です。詳細設定はRepublish アクションの追加を参照してください。

  3. Create をクリックしてアクション作成を完了します。作成成功後、ルール作成ページに戻り、Republish アクションが Action Outputs タブに追加されます。

  4. ルール作成ページで Create ボタンをクリックし、ルール全体の作成を完了します。

これでルールが正常に作成されました。Rules ページで新規ルールを確認できます。Sources タブで新しい GCP Pub/Sub コンシューマーソースを確認できます。

また、Integrate -> Flow Designer をクリックするとトポロジーが表示され、GCP Pub/Sub コンシューマーソースからのメッセージが Republish を経由して t/1 にパブリッシュされる様子を直感的に確認できます。