Skip to content

GCP Pub/Sub 統合

Google Cloud Pub/Sub は、非常に高い信頼性とスケーラビリティを実現するために設計された非同期メッセージングサービスです。EMQX Cloud は、MQTT データのリアルタイム抽出、処理、分析のために Google Cloud Pub/Sub とのシームレスな統合をサポートしています。Cloud Functions、App Engine、Cloud Run、Kubernetes Engine、Compute Engine などのさまざまな Google Cloud サービスへデータをプッシュできます。また、Google Cloud から MQTT へのデータ配信も可能で、ユーザーが GCP 上で迅速に IoT アプリケーションを構築できるよう支援します。

本ページでは、EMQX Cloud と GCP Pub/Sub 間のデータ統合について包括的に紹介し、プロデューサー(Sink)およびコンシューマー(Source)統合の作成と検証に関する実践的な手順を提供します。

動作概要

GCP Pub/Sub データ統合は、EMQX Cloud の標準機能として提供されており、MQTT データストリームを Google Cloud とシームレスに統合し、IoT アプリケーション開発における豊富なサービスや機能を活用できるよう設計されています。

data_integration_gcp_pubsub

EMQX Cloud はルールエンジンと Sink を介して MQTT データを GCP Pub/Sub に転送します。GCP Pub/Sub プロデューサーの例を挙げると、全体の流れは以下の通りです。

  1. IoT デバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリやステータスデータをパブリッシュし、ルールエンジンをトリガーします。
  2. ルールエンジンがメッセージを処理:組み込みのルールエンジンは特定のソースからの MQTT メッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールをマッチさせ、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
  3. GCP Pub/Sub へのブリッジング:ルールはメッセージを GCP Pub/Sub に転送するアクションをトリガーし、データプロパティやオーダーキーの設定、MQTT トピックと GCP Pub/Sub トピックのマッピングを簡単に構成できます。これにより、より豊富なコンテキスト情報と順序保証を伴うデータ統合が可能となり、柔軟な IoT データ処理を実現します。

MQTT メッセージデータが GCP Pub/Sub に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。

  • リアルタイムデータ処理と分析:Dataflow、BigQuery、Pub/Sub のストリーミング機能など、強力な Google Cloud のデータ処理・分析ツールを活用し、メッセージデータのリアルタイム処理と分析を行い、有益なインサイトや意思決定支援を得られます。
  • イベント駆動型機能:Cloud Functions や Cloud Run などの Google Cloud イベント処理をトリガーし、動的かつ柔軟な機能トリガーと処理を実現します。
  • データ保存と共有:Cloud Storage や Firestore などの Google Cloud ストレージサービスにメッセージデータを送信し、大量データの安全な保存と管理を行います。これにより、他の Google Cloud サービスとデータを共有・分析し、多様なビジネスニーズに対応できます。

特徴と利点

GCP Pub/Sub とのデータ統合は、以下のような特徴と利点を提供します。

  • 堅牢なメッセージングサービス:EMQX と GCP Pub/Sub はともに高可用性とスケーラビリティを備え、大規模なメッセージストリームの確実な受信、配信、処理を保証します。IoT データの順序付け、メッセージの品質保証、パーシステンス(永続化)をサポートし、信頼性の高いメッセージ伝送と処理を実現します。
  • 柔軟なルールエンジン:組み込みのルールエンジンにより、特定のソースメッセージやイベントをトピックマッチングに基づいて処理できます。データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの操作が可能で、GCP Pub/Sub と組み合わせてさらなる処理や分析が可能です。
  • 豊富なコンテキスト情報:GCP Pub/Sub データ統合を通じて、メッセージにより豊かなコンテキスト情報を付加できます。クライアント属性を Pub/Sub 属性やソーティングキーにマッピングすることで、後続のアプリケーション開発やデータ処理においてより精密な分析や処理を支援します。

まとめると、EMQX Cloud と GCP Pub/Sub の統合により、高信頼性・高スケーラビリティのメッセージ配信と、データ分析・統合のための豊富なツールやサービスを活用可能となり、堅牢な IoT アプリケーション構築とイベント駆動型の柔軟なビジネスロジック実装を実現します。

はじめる前に

このセクションでは、GCP Pub/Sub データ統合の作成を開始する前に必要な準備について説明します。

前提条件

ネットワークの設定

開始前に、EMQX Cloud 上にデプロイメント(EMQX クラスター)を作成し、ネットワークを設定する必要があります。

  • Dedicated Flex デプロイメントユーザーの場合:VPC が Google Cloud Platform(GCP)上にある場合、VPC ピアリング接続を確立せずに直接データ転送が可能です。その他のクラウドプラットフォーム上の VPC の場合は、NAT ゲートウェイ を設定し、パブリック IP 経由でターゲットコネクターにアクセスしてください。
  • BYOC(Bring Your Own Cloud)デプロイメントユーザーの場合:BYOC がデプロイされている VPC とターゲットコネクターが所在する VPC 間でピアリング接続を確立してください。ピアリング接続作成後は、内部ネットワーク IP 経由でターゲットコネクターにアクセス可能です。パブリック IP 経由でリソースにアクセスする必要がある場合は、BYOC がデプロイされている VPC に対してパブリッククラウドコンソールで NAT ゲートウェイを設定してください。

GCP でのサービスアカウントキーの作成

GCP Pub/Sub サービスを利用するには、サービスアカウントとサービスアカウントキーを作成する必要があります。

  1. GCP アカウントでサービスアカウントを作成します。サービスアカウントには対象トピックに対する Pub/Sub Editor 権限を付与してください。

  2. 作成したサービスアカウントのメールアドレスをクリックし、「キー」タブを選択します。「キーを追加」ドロップダウンリストから 新しいキーを作成 を選択し、サービスアカウントキーを JSON 形式で作成・ダウンロードします。

GCP でのトピックの作成と管理

EMQX で GCP Pub/Sub データ統合を設定する前に、トピックを作成し、GCP での基本的な管理操作に慣れておく必要があります。

  1. Google Cloud コンソールで Pub/Sub -> トピック ページに移動します。詳細な手順は トピックの作成と管理 を参照してください。

    TIP

    サービスアカウントには、そのトピックへのパブリッシュ権限が必要です。

  2. トピック ID フィールドにトピックの ID を入力し、トピックを作成 をクリックします。

  3. サブスクリプション ページに移動し、一覧から作成したトピックの トピック ID をクリックします。トピックに対するサブスクリプションを作成します。

    • 配信タイプ は Pull を選択します。
    • メッセージ保持期間 は 7 日を選択します。

    詳細は GCP Pub/Sub サブスクリプション を参照してください。

  4. サブスクリプション ID -> メッセージ -> Pull をクリックすると、トピックに送信されたメッセージを確認できます。

MQTT データを GCP Pub/Sub にストリームする

このセクションでは、EMQX Cloud デプロイメントから GCP Pub/Sub へ MQTT メッセージを転送するためのプロデューサー(Sink)コネクターの作成方法を示します。内容はコネクターの作成、ルールの作成、ルールのテストを含みます。

コネクターの作成

データ統合ルールを作成する前に、Google PubSub コネクターを作成してサーバーにアクセスできるようにします。

  1. デプロイメントに移動し、左ナビゲーションメニューから データ統合 をクリックします。
  2. 初めてコネクターを作成する場合は、データ転送 カテゴリの下にある Google PubSub を選択します。すでにコネクターを作成している場合は、新しいコネクター を選択し、続いて データ転送 カテゴリの下の Google PubSub を選択します。
  3. 新しいコネクター ページで以下のオプションを設定します:
    • コネクター名:システムが自動的にコネクター名を生成します。
    • GCP サービスアカウント認証情報GCP でサービスアカウントキーを作成 でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
    • その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。
  4. テスト をクリックして接続を検証します。Google PubSub サービスにアクセスできれば、成功メッセージが表示されます。
  5. 新規作成 をクリックしてコネクターの設定を完了します。作成に成功しました ダイアログが表示され、ルールを今すぐ作成するか尋ねられます。新しいルール をクリックするとルール作成画面に進み、コネクターに戻る をクリックすると後でルールを作成できます。

ルールの作成

次に、書き込むデータを指定し、処理済みデータを GCP PubSub に転送するアクションをルールに追加します。

  1. 前のステップで 新しいルール をクリックした場合はルール編集ページが自動で開きます。そうでない場合は、コネクターの 操作 列にある 新しいルール アイコンをクリックするか、ルール セクションの 新しいルール をクリックします。

  2. SQL エディター にルールマッチング用の SQL 文を入力します。以下の例は、temp_hum/emqx トピックに送信されたメッセージから報告時刻 up_timestamp、クライアント ID、メッセージ本文(ペイロード)の温度と湿度を抽出します。

    sql
     SELECT 
     timestamp as up_timestamp, 
     clientid as client_id, 
     payload.temp as temp,
     payload.hum as hum
     FROM
     "temp_hum/emqx"

    Try It Out を使ってデータ入力をシミュレートし、結果をテストできます。

  3. 次へ をクリックしてアクションを追加します。

  4. コネクター ドロップダウンから先ほど作成したコネクターを選択します。

  5. 以下の情報を設定します:

    • アクション名:システムが自動的に生成するか、自分で名前を付けられます。

    • GCP PubSub トピック:GCP で作成したトピック ID を入力します。

    • ペイロードテンプレート:空欄のままにするかテンプレートを定義します。

      • 空欄の場合、MQTT メッセージの可視入力(clientid、topic、payload など)を JSON 形式でエンコードします。
      • 定義したテンプレートを使う場合、${variable_name} 形式のプレースホルダーが MQTT コンテキストの対応値に置換されます。例えば ${topic} は MQTT メッセージのトピック my/topic に置換されます。

      本例では、以下の GCP Pub/Sub メッセージテンプレートを使用できます。

      text
           # GCP Pub/Sub メッセージテンプレート 
       {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
    • 属性テンプレートおよびオーダーキーのテンプレート(任意):同様に、送信メッセージの属性やオーダーキーの書式設定用テンプレートを定義可能です。

      • 属性ではキーと値の両方に ${variable_name} 形式のプレースホルダーが使え、MQTT コンテキストから値を抽出します。キーのテンプレートが空文字列になる場合、そのキーは送信メッセージから除外されます。
      • オーダーキーも ${variable_name} 形式のプレースホルダーが使え、空文字列の場合は GCP Pub/Sub 送信メッセージの orderingKey フィールドは設定されません。
    • 詳細設定(任意):その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。

  6. 確認 をクリックしてルール作成を完了します。

  7. 新しいルールの作成に成功しました ポップアップで ルールに戻る をクリックして終了します。

作成成功後、ルール 一覧に新しいルールが表示されます。関連する アクション(Sink) は操作欄で確認できます。

ルールのテスト

MQTTX を使って温度・湿度データの報告をシミュレートすることを推奨しますが、他のクライアントでも可能です。

  1. MQTTX でデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. GCP Pub/Sub -> サブスクリプションに移動し、MESSAGES タブをクリックするとメッセージを確認できます。

  3. コンソールで運用データを確認します。ルール一覧でルール ID をクリックすると、ルールの統計情報とそのルールに紐づくすべてのアクションの統計を確認できます。

GCP Pub/Sub からメッセージをコンシュームする

このセクションでは、EMQX Cloud デプロイメントが GCP Pub/Sub からメッセージをコンシュームし、設定したデータ統合を通じて MQTT トピックに再パブリッシュする方法を示します。

TIP

GCP PubSub コンシューマーは、EMQX バージョン 5.10.3 以降を実行する Dedicated Flex デプロイメントで利用可能です。

GCP PubSub コンシューマーコネクターの作成

コンシューマールールを追加する前に、EMQX Cloud デプロイメントと GCP Pub/Sub 間の接続を確立するための GCP PubSub コンシューマーコネクター を作成します。

  1. デプロイメントに移動し、左ナビゲーションメニューから データ統合 をクリックします。

  2. 初めてコネクターを作成する場合は、データ入力 カテゴリの下にある Google PubSub Consumer を探します。すでにコネクターを作成している場合は、+ 新しいコネクター をクリックし、続いて データ入力 カテゴリの下の Google PubSub Consumer を選択します。

    gcp_pubsub_consumer_connector

  3. 新しいコネクター ページで以下のオプションを設定します:

    • コネクター名:システムが自動的にコネクター名を生成します。
    • GCP サービスアカウント認証情報GCP でサービスアカウントキーを作成 でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。JSON 内容を直接貼り付けるか、ファイルを選択 してアップロードできます。
    • その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。
  4. テスト をクリックして接続を検証します。GCP Pub/Sub サービスにアクセスできれば、成功メッセージが表示されます。

  5. 新規作成 をクリックしてコネクターの設定を完了します。作成に成功しました ダイアログが表示され、ルールを今すぐ作成するか尋ねられます。新しいルール をクリックするとルール作成画面に進み、コネクターに戻る をクリックすると後でルールを作成できます。

ルールの作成

次に、データソースを指定し、コンシュームしたメッセージを MQTT トピックに転送する出力アクションを追加するルールを作成します。

  1. 前のステップで 新しいルール をクリックした場合はルール編集ページが自動で開きます。そうでない場合は、コネクターの 操作 列にある 新しいルール アイコンをクリックするか、ルール セクションの 新しいルール をクリックします。

  2. ルール編集ページで自動的にアクションソース設定パネルが開きます。ソースタイプとして Google PubSub Consumer を選択し、次へ をクリックします。

    gcp_pubsub_consumer_source_config

  3. ソースを設定します:

    • コネクター:先ほど作成した GCP PubSub コンシューマーコネクターを選択します。
    • GCP PubSub トピック:コンシュームする GCP Pub/Sub トピック名を入力します。例:my-iot-topic
    • 1 回あたりの最大メッセージ数:1 リクエストで取得する最大メッセージ数を設定します。デフォルト値を使用するか、スループットに応じて調整してください。
    • その他の設定はデフォルト値を使用するか、ビジネスニーズに応じて設定してください。

    確認 をクリックしてソース設定を完了します。

  4. SQL エディターは自動的にデータソースに合わせて更新されます。必要に応じて SELECT フィールドを調整できます。例:

    sql
    SELECT
      *
    FROM
      "$bridges/gcp_pubsub_consumer:<source-name>"

    GCP Pub/Sub メッセージから利用可能なフィールドは以下の通りです:

    フィールド名説明
    message_idGCP Pub/Sub によって割り当てられたメッセージ ID
    publish_timeメッセージがパブリッシュされたタイムスタンプ
    topicメッセージが読み取られた GCP Pub/Sub トピック
    valueメッセージのペイロード
    attributesメッセージに付随するキー・バリュー属性
    ordering_key設定されている場合のメッセージのオーダーキー
  5. 次へ をクリックし、出力アクションを追加します。

  6. アクションタイプとして 再パブリッシュ を選択し、以下を設定します:

    • トピック:パブリッシュ先の MQTT トピック。例:gcp/messages${topic} のようなプレースホルダーを使って動的にトピックを指定可能です。
    • QoS012 のいずれかを明示的に選択します。動的に QoS を設定したい場合は、ルール SQL に qos フィールドを追加し、アクションで参照してください。
    • Retaintrue または false を明示的に選択します。動的に Retain を設定したい場合は、ルール SQL に対応フィールドを追加し、アクションで参照してください。
    • メッセージテンプレート:空欄のままにするとルール出力のすべてのフィールドを転送します。${.} と入力するとすべてのフィールドを含め、${value} と入力するとメッセージペイロードのみを転送します。
  7. 確認 をクリックしてルール作成を完了します。

  8. 新しいルールの作成に成功しました ポップアップで ルールに戻る をクリックして終了します。

作成成功後、ルール 一覧に新しいルールが表示されます。関連する アクション(Source) は操作欄で確認できます。再パブリッシュアクションはルール編集ボタンをクリックして確認可能です。

gcp_pubsub_consumer_rule_list

ルールのテスト

GCP Pub/Sub トピックにメッセージをパブリッシュし、デプロイメント内の転送先 MQTT トピックをサブスクライブしてルールを検証できます。

  1. MQTTX(または任意の MQTT クライアント)を使い、再パブリッシュアクションで設定した MQTT トピック(例:gcp/messages)をサブスクライブします。

  2. GCP コンソールまたは gcloud CLI を使って GCP Pub/Sub トピックにメッセージをパブリッシュします。

    bash
    gcloud pubsub topics publish my-iot-topic --message='{"temp": 27.5, "hum": 41.8}'
  3. MQTT クライアントがサブスクライブしたトピックでメッセージを受信できることを確認します。

  4. コンソールでルール統計を確認します。ルール 一覧でルール ID をクリックすると、ルール実行統計およびそのルールに関連するすべてのアクションの統計が表示されます。