GCP Pub/Sub への MQTT データ取り込み
Google Cloud Pub/Sub は、非常に高い信頼性とスケーラビリティを実現する非同期メッセージングサービスです。EMQX は、MQTT データをリアルタイムで抽出、処理、分析するために Google Cloud Pub/Sub とシームレスに統合できます。Cloud Functions、App Engine、Cloud Run、Kubernetes Engine、Compute Engine などのさまざまな Google Cloud サービスへデータをプッシュ可能です。また、Google Cloud から MQTT へのデータ配信も可能で、ユーザーが GCP 上で迅速に IoT アプリケーションを構築するのに役立ちます。
本ページでは、EMQX と GCP Pub/Sub 間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作概要
GCP Pub/Sub データ統合は、EMQX の標準機能として提供されており、MQTT データストリームを Google Cloud とシームレスに連携させ、豊富なサービスと機能を活用して IoT アプリケーション開発を支援します。
EMQX はルールエンジンと Sink を介して MQTT データを GCP Pub/Sub に転送します。GCP Pub/Sub のプロデューサー役割の例を挙げると、全体の流れは以下の通りです。
- IoT デバイスがメッセージをパブリッシュ: デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
- ルールエンジンがメッセージを処理: 内蔵のルールエンジンは、特定のトピックにマッチする MQTT メッセージを処理します。ルールにマッチしたメッセージは、データ形式の変換、特定情報のフィルタリング、コンテキスト情報の付加などの処理が行われます。
- GCP Pub/Sub へのブリッジング: ルールがトリガーされると、メッセージを GCP Pub/Sub に転送するアクションが実行されます。データのプロパティ設定、オーダーキーの指定、MQTT トピックと GCP Pub/Sub トピックのマッピングが容易に設定可能です。これにより、より豊富なコンテキスト情報と順序保証を持つデータ統合が実現し、柔軟な IoT データ処理が可能となります。
MQTT メッセージデータが GCP Pub/Sub に書き込まれた後は、以下のような柔軟なアプリケーション開発が可能です。
- リアルタイムデータ処理・分析:Dataflow、BigQuery、Pub/Sub のストリーミング機能など、Google Cloud の強力なデータ処理・分析ツールを活用し、メッセージデータのリアルタイム処理や分析を行い、貴重な洞察や意思決定支援を得られます。
- イベント駆動型機能:Cloud Functions や Cloud Run などの Google Cloud イベント処理をトリガーし、動的かつ柔軟な機能トリガーと処理を実現します。
- データ保存・共有:Cloud Storage や Firestore などの Google Cloud ストレージサービスにメッセージデータを送信し、大量データの安全な保存・管理を行います。これにより他の Google Cloud サービスと連携してデータ共有や分析を行い、多様なビジネスニーズに対応可能です。
特長とメリット
GCP Pub/Sub とのデータ統合は以下のような特長とメリットを提供します。
- 堅牢なメッセージングサービス:EMQX と GCP Pub/Sub は共に高可用性とスケーラビリティを備え、大規模なメッセージストリームの信頼性の高い受信、配信、処理を保証します。IoT データの順序性、メッセージ品質保証、パーシステンスをサポートし、メッセージの確実な伝送と処理を実現します。
- 柔軟なルールエンジン:内蔵のルールエンジンにより、特定の送信元メッセージやイベントをトピックマッチングに基づいて処理可能です。データ形式変換、特定情報のフィルタリング、コンテキスト情報の付加などが行え、GCP Pub/Sub と組み合わせてさらなる処理や分析が可能です。
- 豊富なコンテキスト情報:GCP Pub/Sub データ統合を通じて、クライアント属性を Pub/Sub 属性にマッピングしたり、ソートキーを設定したりすることで、メッセージにより豊かなコンテキスト情報を付加できます。これにより、後続のアプリケーション開発やデータ処理でより精密な分析・処理が可能となります。
まとめると、EMQX と GCP Pub/Sub の統合により、高信頼かつスケーラブルなメッセージ配信と、データ分析・統合のための豊富なツール・サービスを活用できます。これにより、堅牢な IoT アプリケーションの構築や、イベント駆動型の柔軟なビジネスロジックの実装が可能となります。
はじめる前に
このセクションでは、GCP Pub/Sub データ統合の作成を開始する前に必要な準備について説明します。
前提条件
GCP でのサービスアカウントキーの作成
GCP Pub/Sub サービスを利用するには、サービスアカウントとサービスアカウントキーの作成が必要です。
GCP アカウントで サービスアカウント を作成します。サービスアカウントには、対象トピックへのメッセージの検査/読み取りおよびパブリッシュ権限(例:Pub/Sub Editor ロール)があることを確認してください。
作成したサービスアカウントのメールアドレスをクリックし、Key タブを開きます。Add key のドロップダウンリストから Create new key を選択し、そのアカウント用のサービスアカウントキーを作成して JSON 形式でダウンロードします。
TIP
サービスアカウントキーは後で使用するため、安全に保管してください。
GCP でのトピックの作成と管理
EMQX で GCP Pub/Sub データ統合を設定する前に、トピックを作成し、GCP での基本的な管理操作に慣れておく必要があります。
Google Cloud コンソールで Pub/Sub -> Topics ページに移動します。詳細な手順は トピックの作成と管理 を参照してください。
TIP
サービスアカウントには、そのトピックへのパブリッシュ権限が必要です。
Topic ID フィールドにトピックの ID を入力し、Create topic をクリックします。
Subscriptions ページに移動し、リストの Topic ID をクリックします。トピックに対するサブスクリプションを作成します。
- Delivery type で Pull を選択します。
- Message retention duration で
7
日を選択します。
詳細は GCP Pub/Sub サブスクリプション を参照してください。
Subscription ID -> Messages -> Pull をクリックすると、トピックに送信されたメッセージを確認できます。
GCP Pub/Sub プロデューサーコネクターの作成
GCP Pub/Sub プロデューサー Sink アクションを追加する前に、EMQX と GCP Pub/Sub 間の接続を確立するため、GCP Pub/Sub プロデューサーコネクターを作成する必要があります。
- EMQX ダッシュボードで Integration -> Connector をクリックします。
- ページ右上の Create をクリックし、コネクター選択画面で Google PubSub Producer を選択して Next をクリックします。
- 名前と説明を入力します(例:
my-pubsubproducer
)。この名前は GCP Pub/Sub プロデューサー Sink とコネクターを関連付けるために使用され、クラスター内で一意である必要があります。 - GCP Service Account Credentials にて、GCP でのサービスアカウントキーの作成 でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
- Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテスト可能です。
- ページ下部の Create ボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックして Sink を指定したルール作成を続行できます。詳細は GCP Pub/Sub プロデューサー Sink を用いたルール作成 を参照してください。
GCP Pub/Sub プロデューサー Sink を用いたルールの作成
このセクションでは、GCP Pub/Sub に保存するデータを指定するルールの作成方法を説明します。
EMQX ダッシュボードで Integration -> Rules をクリックします。
ページ右上の Create をクリックします。
ルール ID に
my_rule
を入力します。SQL Editor でルールを設定します。例えば、トピック
/devices/+/events
の MQTT メッセージを GCP Pub/Sub に保存したい場合、以下の SQL 文を使用します。注意:独自の SQL 文を指定する場合、
SELECT
部分に Sink のペイロードテンプレートで必要なすべてのフィールドが含まれていることを確認してください。sqlSELECT * FROM "/devices/+/events"
注意:初心者の場合は、SQL Examples と Enable Test をクリックして SQL ルールの学習とテストが可能です。
+ Add Action ボタンをクリックして、ルールでトリガーされるアクションを定義します。Type of Action ドロップダウンリストから
Google PubSub Producer
を選択すると、EMQX はルールで処理されたデータを GCP Pub/Sub に送信します。Action ドロップダウンは
Create Action
のままにします。あるいは、以前に作成した GCP Pub/Sub プロデューサー Sink を選択することも可能です。本デモでは新しい Sink を作成してルールに追加します。Name フィールドに Sink の名前を入力します。名前は英数字の組み合わせで指定してください。
Connector ドロップダウンから先ほど作成した
my_pubsubprodcer
を選択します。隣のボタンから新しいコネクターを作成することも可能です。設定パラメータの詳細は コネクターの作成 を参照してください。GCP PubSub Topic に、GCP でのトピックの作成と管理 で作成したトピック ID
my-iot-core
を入力します。Payload Template にテンプレートを定義するか空欄のままにします。
- 空欄の場合、MQTT メッセージのクライアントID、トピック、ペイロードなどの可視入力をすべて JSON 形式でエンコードします。
- 定義したテンプレートを使用する場合、
${variable_name}
形式のプレースホルダーは MQTT コンテキストの対応する値で置換されます。例:${topic}
は MQTT メッセージのトピックがmy/topic
ならmy/topic
に置換されます。
Attributes Template および Ordering Key Template にて、送信メッセージの属性やオーダーキーのフォーマットテンプレートを定義します(任意)。
- Attributes はキーと値の両方に
${variable_name}
形式のプレースホルダーを使用可能で、MQTT コンテキストから値が抽出されます。キーのテンプレートが空文字列になる場合、そのキーは GCP Pub/Sub 送信メッセージから除外されます。 - Ordering Key は
${variable_name}
形式のプレースホルダーを使用可能で、解決後の値が空文字列の場合は GCP Pub/Sub 送信メッセージのorderingKey
フィールドは設定されません。
- Attributes はキーと値の両方に
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細は フォールバックアクション を参照してください。
詳細設定(任意):詳細は Sink の機能 を参照してください。
Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテスト可能です。
Create ボタンをクリックして Sink の設定を完了すると、新しい Sink が Action Outputs タブに表示されます。
Create Rule ページに戻り、Create をクリックしてルールを作成します。
これでルールが正常に作成されました。Integration -> Rules ページで新規作成したルールを確認できます。Actions(Sink) タブをクリックすると、新しい Google PubSub Producer Sink が表示されます。
また、Integration -> Flow Designer をクリックするとトポロジーが表示され、トピック /devices/+/events
のメッセージがルール my_rule
によって解析され、GCP Pub/Sub に送信・保存されていることが確認できます。
プロデューサールールのテスト
MQTTX を使ってトピック
/devices/+/events
にメッセージを送信します。bashmqttx pub -i emqx_c -t /devices/+/events -m '{ "msg": "hello GCP PubSub" }'
Sink の稼働状況を確認すると、新規の受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。
GCP の Pub/Sub -> Subscriptions に移動し、MESSAGES タブをクリックするとメッセージが確認できます。
GCP Pub/Sub コンシューマーコネクターの作成
GCP Pub/Sub コンシューマー Sink を追加する前に、EMQX と GCP Pub/Sub 間の接続を確立するため、GCP Pub/Sub コンシューマーコネクターを作成する必要があります。
- EMQX ダッシュボードで Integration -> Connector をクリックします。
- ページ右上の Create をクリックし、コネクター選択画面で Google PubSub Consumer を選択して Next をクリックします。
- 名前と説明を入力します(例:
my-pubsubconsumer
)。この名前は GCP Pub/Sub コンシューマー Sink とコネクターを関連付けるために使用され、クラスター内で一意である必要があります。 - GCP Service Account Credentials にて、GCP でのサービスアカウントキーの作成 でエクスポートした JSON 形式のサービスアカウント認証情報をアップロードします。
- Create をクリックする前に、Test Connectivity をクリックしてコネクターが GCP Pub/Sub サーバーに接続できるかテスト可能です。
- ページ下部の Create ボタンをクリックしてコネクターの作成を完了します。ポップアップダイアログで Back to Connector List をクリックするか、Create Rule をクリックして GCP Pub/Sub コンシューマーソースを用いたルール作成を続行できます。詳細は GCP Pub/Sub コンシューマーソースを用いたルール作成 を参照してください。
GCP Pub/Sub コンシューマーソースを用いたルールの作成
このセクションでは、GCP Pub/Sub からメッセージを消費し、EMQX に転送するルールの作成方法を説明します。Google PubSub コンシューマーソースを作成・設定し、ルールのデータ入力として追加します。また、メッセージを EMQX に転送するために Republish アクションをルールに追加します。
EMQX ダッシュボードで Integration -> Rules をクリックします。
ページ右上の Create をクリックします。
ルール ID に
my_rule_source
を入力します。右側の Data Inputs タブでデフォルトの Input
Messages
を削除し、Add Input をクリックします。Input Type ドロップダウンから
Google PubSub Consumer
を選択します。Source ドロップダウンはデフォルトの
Create Source
のままにします。本デモでは新しいソースを作成してルールに追加します。ソースの Name と(任意の)Description を入力します。名前は英数字の組み合わせで、例:
my-gcppubsub-source
。Connector ドロップダウンから先ほど作成した
my_pubsubconsumer
を選択します。隣のボタンから新しいコネクターを作成することも可能です。設定パラメータの詳細は コネクターの作成 を参照してください。GCP Pub/Sub から EMQX へメッセージを消費するため、以下の情報を設定します。
- GCP PubSub Topic:消費対象の GCP Pub/Sub トピック名を入力します(例:
my-iot-core
)。 - Maximum Messages to Pull:1 回のプルリクエストで取得する最大メッセージ数を指定します。実際の取得数は指定値未満の場合があります。
- GCP PubSub Topic:消費対象の GCP Pub/Sub トピック名を入力します(例:
詳細設定(任意):詳細は Sink の機能 を参照してください。
Create をクリックする前に、Test Connectivity をクリックして GCP Pub/Sub サーバーへの接続が成功するかテスト可能です。
Create をクリックしてソースの作成を完了します。ソースはルールの Data Inputs タブに追加され、SQL Editor のルールは以下のようになります。
sqlSELECT * FROM "$bridges/gcppubsub:my-gcppubsub-source"
注意:初心者の場合は、SQL Examples と Enable Test をクリックして SQL ルールの学習とテストが可能です。
my-gcppubsub-source
から、ルール SQL は以下の GCP Pub/Sub から MQTT トピックへのマッピングテーブルに示す GCP Pub/Sub メッセージフィールドにアクセス可能です。ルール SQL を調整してデータ処理を行えます。本例ではデフォルトの SQL を使用します。フィールド名 説明 attributes
(任意)文字列のキー・バリューのペアを含むオブジェクト(存在する場合) message_id
GCP Pub/Sub がこのメッセージに割り当てたメッセージ ID ordering_key
(任意)メッセージの順序キー(存在する場合) publishing_time
GCP Pub/Sub によるメッセージのタイムスタンプ topic
元の GCP Pub/Sub トピック名 value
(任意)メッセージのペイロード(存在する場合) 注意:各 GCP Pub/Sub から MQTT トピックへのマッピングは、ユニークな GCP Pub/Sub トピック名を含む必要があります。つまり、同じ GCP Pub/Sub トピックが複数のマッピングに存在してはなりません。
これで GCP Pub/Sub コンシューマーソースの作成は完了しましたが、メッセージはまだ直接 EMQX にパブリッシュされません。次に、ルールへの Republish アクションの追加 を続けて、Republish アクションを作成しルールに追加してください。
ルールへの Republish アクションの追加
このセクションでは、GCP Pub/Sub コンシューマーソースから消費したメッセージを転送し、EMQX トピック t/1
にパブリッシュするための Republish アクションをルールに追加する方法を説明します。
ページ右側の Action Output タブを選択し、Add Action ボタンをクリックして、Type of Action ドロップダウンリストから
Republish
アクションを選択します。メッセージ再パブリッシュの設定を入力します。
Topic:MQTT にパブリッシュするトピックを指定します。ここでは
t/1
と入力します。QoS:
0
、1
、2
、${qos}
のいずれかを選択、または他のフィールドから QoS を設定するためのプレースホルダーを入力します。${qos}
を選択すると元のメッセージの QoS に従います。Retain:
true
またはfalse
を選択します。メッセージをリテインメッセージとしてパブリッシュするかどうかを決定します。プレースホルダーも使用可能です。本例ではfalse
を選択します。Payload:転送するメッセージペイロードのテンプレートを設定します。空欄の場合はルールの出力結果をそのまま転送します。ここでは
${payload}
を入力し、ペイロードのみを転送します。MQTT ペイロードテンプレートのデフォルト値は
${.}
で、利用可能なすべてのデータを JSON オブジェクトとしてエンコードします。例えば、すべてのオプションフィールドを含む GCP Pub/Sub メッセージに対して${.}
をテンプレートに指定すると、以下のような JSON が生成されます。json{ "attributes": {"attribute_key": "attribute_value"}, "message_id": "1679665968238", "ordering_key": "my-ordering-key", "topic": "my-pubsub-topic", "publishing_time": "2023-08-18T14:15:18.470Z", "value": "my payload" }
GCP Pub/Sub メッセージのサブフィールドはドット表記でアクセス可能です。例:
${.value}
は GCP Pub/Sub メッセージの値に解決され、${.attributes.h1}
はh1
というメッセージ属性キーの値に解決されます。存在しない値は空文字列に置換されます。MQTT 5.0 メッセージプロパティ:デフォルトで無効です。詳細設定は Republish アクションの追加 を参照してください。
Create をクリックしてアクション作成を完了します。作成成功後、ルール作成ページに戻り、Republish アクションが Action Outputs タブに追加されます。
ルール作成ページで Create ボタンをクリックしてルール全体の作成を完了します。
これでルールが正常に作成されました。Rules ページで新規作成したルールを確認できます。Sources タブで新しく作成した GCP Pub/Sub コンシューマーソースを確認できます。
また、Integrate -> Flow Designer をクリックするとトポロジーが表示され、GCP Pub/Sub コンシューマーソースからのメッセージが Republish を経由して t/1
にパブリッシュされる様子を直感的に確認できます。