Skip to content

BigQueryへのMQTTデータ取り込み

BigQueryは、大量のリレーショナル構造化データを対象としたエンタープライズ向けデータウェアハウスです。大規模かつアドホックなSQLベースの分析やレポートに最適化されており、組織の洞察を得るのに適しています。EMQXは、MQTTデータのリアルタイム抽出、処理、分析のためにBigQueryとのシームレスな統合をサポートしています。

本ページでは、EMQXとBigQuery間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。

動作の仕組み

BigQueryデータ統合は、EMQXの標準機能として提供されており、MQTTデータストリームをGoogle Cloudとシームレスに連携させ、IoTアプリケーション開発のための豊富なサービスや機能を活用できるよう設計されています。

bigquery_architecture

EMQXはルールエンジンとSinkを介してMQTTデータをBigQueryに転送します。全体の流れは以下の通りです。

  1. IoTデバイスがメッセージをパブリッシュ:デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
  2. ルールエンジンがメッセージを処理:組み込みのルールエンジンは、特定のソースからのMQTTメッセージをトピックマッチングに基づいて処理します。ルールエンジンは対応するルールにマッチし、データ形式の変換、特定情報のフィルタリング、メッセージへのコンテキスト情報の付加などを行います。
  3. BigQueryへのブリッジング:ルールがトリガーされると、メッセージをBigQueryに転送するアクションが実行されます。データプロパティ、オーダーキー、MQTTトピックからBigQueryトピックへのマッピングを簡単に設定可能です。これにより、より豊かなコンテキスト情報とデータ統合の順序保証が提供され、柔軟なIoTデータ処理が可能となります。

特長とメリット

EMQXとBigQueryの統合は、MQTTデータのための堅牢でスケーラブルかつリアルタイムなデータパイプラインを提供します。以下の特長とメリットにより、IoT分析やデータ駆動型の意思決定を簡素化します。

  • リアルタイムデータ取り込み:EMQXからBigQueryへMQTTメッセージを低レイテンシでシームレスにストリーム配信。即時処理と分析が必要なタイムセンシティブなアプリケーションに対応。
  • 柔軟なデータマッピング:MQTTトピックおよびメッセージペイロードをBigQueryのテーブルやフィールドにカスタマイズしてマッピング可能。
  • スケーラブルかつサーバーレスな分析:BigQueryのフルマネージドかつサーバーレスなアーキテクチャを活用し、大規模なIoTデータ分析を実現。
  • Google Cloudエコシステムとの簡単連携:Data Studio、Looker、AI PlatformなどのGoogle Cloudサービスとネイティブに連携し、可視化や機械学習を容易に。データ収集から洞察生成までのエンドツーエンドパイプライン構築を簡素化。

はじめる前に

このセクションでは、BigQueryデータ統合の作成を開始する前に必要な準備について説明します。

前提条件

GCPでのサービスアカウントキー作成

EMQXがBigQueryに接続できるように、Google Cloudでサービスアカウントを作成し、JSON形式のキーを生成する必要があります。

  1. GCPアカウントでサービスアカウントを作成します。サービスアカウントには、使用するデータセットやテーブルへのアクセス権限が必要です。例えば、「BigQuery Data Editor」ロールを付与して対象データセットやテーブルの読み書きを許可するか、少なくともデータの読み書き権限を持たせてください。

  2. 作成したサービスアカウントのメールアドレスをクリックします。

  3. Keyタブをクリックし、Add keyのドロップダウンからCreate new keyを選択してサービスアカウントキーを作成し、JSON形式でダウンロードします。

    TIP

    ダウンロードしたサービスアカウントキーは後でEMQXのBigQuery認証に使用するため、安全に保管してください。

    サービスアカウントキー

GCPでのWorkload Identity Federation設定

Workload Identity Federation(WIF)を使うと、長期間有効なサービスアカウントキーのファイルを使わずにEMQXがGCPリソースにアクセスできます。EMQXは外部IDプロバイダー(例:Microsoft Azure)からのトークンをGCPのセキュリティトークンサービスで一時的なGCPトークンに交換し、そのトークンでサービスアカウントをなりすまします。トークンの更新は自動で行われます。

WIFを使用するには、コネクター作成前にGCPプロジェクトで以下を完了してください。

  1. Google CloudコンソールでIAM & Admin -> Workload Identity Federationに移動し、ワークロードアイデンティティプールを作成し、Pool IDProject Numberを控えます。

  2. プールにプロバイダーを追加し、Provider IDを控えます。OIDC認証の場合、外部IDプロバイダーからOAuth 2.0クライアント認証情報(クライアントID、クライアントシークレット、トークンエンドポイントURI)を取得します。

  3. ワークロードアイデンティティプールに、BigQueryのデータセットやテーブルにアクセスできるGCPサービスアカウントをなりすます権限を付与します。コネクター設定時にサービスアカウントのメールアドレスが必要です。

    TIP

    詳細な手順はWorkload Identity Federationの設定を参照してください。

例:Microsoft Azure (Entra ID)

Microsoft Entra IDでAPIを公開するアプリケーションを登録し、クライアントシークレットを作成します。コネクター設定時に以下の値を使用します。

コネクター項目
Endpoint URIhttps://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/token
OAuth Client IDアプリケーション(クライアント)ID、形式は api://<application-id>
OAuth Client Secretアプリケーション用に生成したクライアントシークレット
OAuth Request Scopeapi://<application-id>/.default

注意

scopeはアプリケーションのaudience(aud)と完全に一致させる必要があります。そうしないとGCP STSとのトークン交換に失敗します。詳細はMicrosoftのOAuth 2.0クライアントクレデンシャルフローを参照してください。

サービスアカウントにWIFプールへのアクセス権を付与する際は、Subject値にApplication IDではなくObject IDを使用してください。Object IDはAzureポータルのアプリケーションの概要ページのEnterprise applicationsに表示されます。

GCPでのデータセットとテーブルの作成・管理

EMQXでBigQueryデータ統合を設定する前に、GCPで必要なデータセットとテーブルを作成してください。

  1. Google CloudコンソールでBigQuery -> Studioページに移動します。詳細はデータのロードとクエリクイックスタートガイドを参照してください。

    TIP

    使用するサービスアカウントには、対象テーブルへの書き込み権限が必要です。

  2. Explorerペインでケバブメニュー(⋮)をクリックし、Create datasetを選択します。データセット名を指定し、Create datasetをクリックします。

  3. データセット作成後、Explorerペインでデータセットを選択し、(+) Create tableをクリックします。

    • ソースは「Empty Table」を選択します。

    • テーブル名を入力します。

    • テーブルスキーマを定義します。例えば、Edit as textトグルをクリックし、以下のスキーマ定義をテキストフィールドに貼り付けます。

      clientid:string,payload:bytes,topic:string,publish_received_at:timestamp
    • Create tableをクリックして設定を完了します。

  4. EMQXが書き込み可能なように権限を設定します。

    • データセットを選択し、Shareをクリックします。

    • サービスアカウントのメールアドレスをプリンシパルとして追加します。

    • 適切なロールを割り当てます。例:

      • データセットに対して「BigQuery Data Viewer」(読み取りアクセス)

      • テーブルに対して「Editor」(読み書きアクセス)

  5. テーブル作成後、クエリを実行して確認できます。

    • テーブルをクリックし、Queryをクリックします。

    • 以下の簡単なSQL文を実行してテーブルにアクセスできることを確認します。

      sql
      SELECT * FROM `my_project.my_dataset.my_tab` LIMIT 1000

BigQueryコネクターの作成

BigQuery Producer Sinkアクションを追加する前に、EMQXとBigQuery間の接続を確立するためのBigQueryコネクターを作成する必要があります。

  1. EMQXダッシュボードでIntegration -> Connectorをクリックします。

  2. ページ右上のCreateをクリックし、コネクター選択画面でBigQueryを選択してNextをクリックします。

  3. 名前と説明を入力します(例:my_bigquery)。この名前はBigQuery Sinkとコネクターを紐づけるために使用され、クラスター内で一意である必要があります。

  4. Authenticationドロップダウンから以下の認証方法のいずれかを選択し、対応する項目を入力します。

    • Service Account JSON:前述のサービスアカウントキー作成でエクスポートしたJSON形式のサービスアカウント認証情報をアップロードします。

    • Workload Identity Federation (WIF):以下の項目を入力します。この方法はサービスアカウントJSONファイルを使用しません。詳細はWorkload Identity Federation設定を参照してください。

      • GCP Project ID:コネクターがアクセスするリソースのプロジェクトID。

      • GCP Project Number:コネクターがアクセスするリソースのプロジェクト番号。

      • Service Account Email:なりすますサービスアカウントのメールアドレス。

      • Workload Identity Pool ID:WIFトークン交換に使用するワークロードアイデンティティプールのID。

      • Workload Identity Provider ID:WIFトークン交換に使用するワークロードアイデンティティプロバイダーのID。

      • Initial Token Configurationでは、認証情報タイプを選択し、対応する項目を入力します。現在サポートされているのはOIDC with Client Credentials Grant Typeのみです。

        • Endpoint URI:OIDCプロバイダーのOAuthトークンエンドポイントURI。

        • OAuth Client ID:OAuthサーバーからトークンをリクエストするためのクライアントID。

        • OAuth Client Secret:OAuthサーバーからトークンをリクエストするためのクライアントシークレット。

        • OAuth Request Scope:OAuthアクセストークン取得時に指定するscope(プロバイダーによっては必須)。

  5. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがBigQueryサーバーに接続できるかテストできます。

  6. ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてBigQuery Sinkを指定するルール作成に進めます。詳細はBigQuery Sink付きルールの作成を参照してください。

BigQuery Sink付きルールの作成

このセクションでは、BigQueryに保存するデータを指定するルールの作成方法を説明します。

  1. EMQXダッシュボードでIntegration -> Rulesをクリックします。

  2. ページ右上のCreateをクリックします。

  3. ルールIDにmy_ruleを入力します。

  4. SQL Editorでルールを設定します。例えば、トピックt/bqのMQTTメッセージをBigQueryに保存したい場合、以下のSQL構文を使用します。

    注意:独自のSQL構文を指定する場合、Sinkのペイロードテンプレートで必要なすべてのフィールドをSELECT句に含めてください。

    sql
    SELECT
      clientid,
      topic,
      payload,
      publish_received_at
    FROM
      "t/bq"

    注意

    BigQueryテーブルのカラムに存在するフィールドのみを選択してください。存在しないフィールドはBigQueryで認識されません。

    TIP

    初心者の場合は、SQL Examplesをクリックし、Enable Testを有効にしてSQLルールの学習とテストを行うことを推奨します。

  5. Add Actionボタンをクリックし、ルールでトリガーされるアクションを定義します。Type of ActionドロップダウンからBigQueryを選択すると、ルールで処理されたデータがBigQueryに送信されます。

  6. ActionドロップダウンはCreate Actionのままにするか、既存のBigQuery Sinkを選択できます。本例では新しいSinkを作成してルールに追加します。

  7. NameフィールドにSinkの名前を入力します。名前は英数字の組み合わせで指定してください。

  8. Connectorドロップダウンから先ほど作成したmy_bigqueryを選択します。新しいコネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメーターの詳細はコネクターの作成を参照してください。

  9. DatasetおよびTableに、GCPでのデータセットとテーブルの作成・管理で作成したデータセット名とテーブル名をそれぞれ入力します。

  10. フォールバックアクション(オプション):メッセージ配信失敗時の信頼性向上のために、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらのアクションがトリガーされます。詳細はフォールバックアクションを参照してください。

  11. 詳細設定(オプション):必要に応じて詳細設定を行います。詳細は詳細設定を参照してください。

  12. Createをクリックする前に、Test ConnectivityをクリックしてコネクターがBigQueryサーバーに接続できるかテスト可能です。

  13. CreateボタンをクリックしてSink設定を完了すると、新しいSinkがAction Outputsタブに表示されます。

  14. Create Ruleページに戻り、Createをクリックしてルールを作成します。

これでルールが正常に作成されました。Integration -> Rulesページで新規作成したルールを確認できます。**Actions(Sink)**タブをクリックすると新しいBigQuery Sinkが表示されます。

また、Integration -> Flow Designerをクリックするとトポロジーが表示され、トピックt/bqのメッセージがルールmy_ruleで解析されてBigQueryに送信・保存されている様子を確認できます。

ルールのテスト

  1. MQTTXを使ってトピックt/bqにメッセージを送信します。

    bash
    mqttx pub -i emqx_c -t t/bq -m '{ "msg": "hello BigQuery" }'
  2. Sinkの稼働状況を確認し、新規の受信メッセージと送信メッセージが1件ずつあることを確認します。

  3. GCPのBigQuery -> Studioに移動し、対象テーブルをクリックしてQueryをクリックします。クエリを実行するとメッセージが確認できます。

詳細設定

このセクションでは、BigQuery Producer Sinkの詳細設定オプションについて説明します。ダッシュボードのSink設定画面でAdvanced Settingsを展開し、用途に応じて以下のパラメーターを調整できます。

項目名説明デフォルト値
Buffer Pool SizeEMQXとBigQuery間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保持・処理し、ターゲットサービスへの送信を最適化しスムーズなデータ伝送を実現します。16
Request TTLリクエストがバッファに入ってから有効とみなされる最大時間(秒)を指定します。このタイマーはリクエストがバッファに入った瞬間からカウントされます。TTLを超えてバッファに滞留するか、送信後にBigQueryからの応答やアックがタイムリーに得られない場合、リクエストは期限切れとみなされます。45
Health Check IntervalSinkがBigQueryとの接続状態を自動的にヘルスチェックする間隔(秒)を指定します。15
Health Check Interval Jitter複数ノードが同時にヘルスチェックを開始する確率を減らすため、基本のヘルスチェック間隔に加える一様ランダム遅延です。複数のアクションやソースが同じコネクターを共有する場合、ジッターを有効にするとヘルスチェック開始時刻がずれます。0ミリ秒
Health Check TimeoutコネクターがBigQueryとの接続状態をヘルスチェックする際のタイムアウト時間を指定します。60
Max Buffer Queue SizeBigQuery Sinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時的に保持し、BigQueryへの送信を効率化します。システム性能やデータ伝送要件に応じて調整してください。256
Query Modesynchronousまたはasynchronousのリクエストモードを選択し、メッセージ送信の最適化を図ります。非同期モードではBigQueryへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがBigQuery到達前にメッセージを受信する可能性があります。Async
Batch SizeEMQXからBigQueryへ一度に転送するデータバッチの最大サイズを指定します。サイズを調整することでデータ転送の効率と性能を最適化可能です。Batch Sizeを「1」に設定すると、データレコードはバッチ化されず個別に送信されます。1000
Inflight Window「インフライトキューリクエスト」とは、開始されたがまだ応答やアックを受け取っていないリクエストを指します。この設定はSinkがBigQueryと通信中に同時に存在可能なインフライトリクエストの最大数を制御します。Request Modeasynchronousの場合に特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理したい場合、この値は1に設定してください。100