Skip to content

ExProto ゲートウェイ

Extension Protocol(ExProto)は、gRPC通信を用いて実装されたカスタムプロトコル解析ゲートウェイです。Java、Python、Goなど、ユーザーが好むプログラミング言語でgRPCサービスを開発できるようにします。これらのサービスは、デバイスのネットワークプロトコルを解析し、デバイス接続、認証、メッセージ送信などの機能を実現します。

本ページでは、ExProtoゲートウェイの動作原理と、EMQXにおけるExProtoゲートウェイの設定および使用方法を紹介します。

重要なお知らせ

ExProtoゲートウェイはEMQX 6.2.0以降非推奨となり、EMQX 7で削除される予定です。

ExProtoゲートウェイとgRPCサービスの動作

EMQXでExProtoゲートウェイを有効にすると、特定のポート(例:7993)でデバイス接続を待ち受けます。クライアントデバイスからの接続を受け取ると、クライアントデバイスから生成されたバイトデータやイベントをユーザーのgRPCサービスに渡します。これには、ExProtoゲートウェイ内にgRPCクライアントが必要で、ユーザーのgRPCサーバーで実装されたConnectionUnaryHandlerサービスのメソッドを呼び出します。

ユーザーのgRPCサーバー内のgRPCサービスは、ExProtoゲートウェイから受け取ったバイトデータやイベントを解析し、クライアントのネットワークプロトコルを解釈してバイトデータやイベントをPub/Subリクエストに変換し、ExProtoゲートウェイに返送します。ExProtoゲートウェイで実装されたConnectionAdapterサービスは、ユーザーのgRPCサーバーとやり取りするためのインターフェースを提供します。これにより、クライアントデバイスはEMQXにメッセージをパブリッシュし、トピックをサブスクライブし、クライアント接続を管理できます。

以下の図は、ExProtoゲートウェイとgRPCサービスの動作アーキテクチャを示しています。

exproto-gateway-architecture

exproto.proto ファイル

exproto.protoファイルは、ExProtoゲートウェイとユーザーのgRPCサービス間のインターフェースを定義します。ファイルには以下の2つのサービスが指定されています。

  • ConnectionAdapterサービス:ExProtoゲートウェイによって実装され、gRPCサーバーへのインターフェースを提供します。
  • ConnectionUnaryHandlerサービス:ユーザーのgRPCサーバーによって実装され、クライアントソケットの接続処理およびバイト解析のメソッドを定義します。

ConnectionUnaryHandler サービス

ConnectionUnaryHandlerサービスは、ユーザーのgRPCサーバーで実装され、クライアントソケットの接続処理およびバイト解析を担当します。

このサービスには以下のメソッドが含まれます。

メソッド名説明
OnSocketCreated新しいソケットがExProtoゲートウェイに接続されるたびに呼び出されます。
OnSocketClosedソケットが閉じられるたびに呼び出されます。
OnReceivedBytesクライアントのソケットからデータを受信するたびに呼び出されます。
OnTimerTimeoutタイマーがタイムアウトするたびに呼び出されます。
OnReceivedMessagesサブスクライブされたトピックにメッセージが届くたびに呼び出されます。

ExProtoゲートウェイがこれらのメソッドを呼び出す際、どのソケットがイベントを送出したかを識別するために、パラメータに一意の識別子connを渡します。例えば、OnSocketCreated関数のパラメータは以下のようになります。

message SocketCreatedRequest {
  string conn = 1;
  ConnInfo conninfo = 2;
}

TIP

ExProtoゲートウェイはプライベートプロトコルのメッセージフレームの開始と終了を認識できないため、TCPパケットのスティッキングやスプリッティングが発生する場合は、OnReceivedBytesコールバック内で処理する必要があります。

ConnectionAdapter サービス

ConnectionAdapterサービスはExProtoゲートウェイによって実装され、gRPCサービスがサブスクリプションの開始、メッセージのパブリッシュ、タイマーの開始、接続のクローズなどの接続管理機能を呼び出せるようにします。以下のメソッドを含みます。

メソッド名説明
Send指定された接続にバイトデータを送信します。
Close指定された接続を閉じます。
AuthenticateクライアントをExProtoゲートウェイに登録し、認証を完了します。
StartTimer指定された接続のタイマーを開始します。通常は生存確認に使用されます。
Publish指定された接続からEMQXにメッセージをパブリッシュします。
Subscribe指定された接続のサブスクリプションを作成します。
Unsubscribe指定された接続のサブスクリプションを削除します。
RawPublishEMQXにメッセージをパブリッシュします。

ExProtoゲートウェイの有効化

EMQXのExProtoゲートウェイは、ダッシュボード、REST API、または設定ファイルbase.hoconを通じて設定および有効化できます。本節では、ダッシュボードを使ったExProtoゲートウェイの有効化方法を説明します。

EMQXダッシュボードの左側ナビゲーションメニューから Management -> Gateways をクリックします。Gatewaysページにはサポートされているすべてのゲートウェイが一覧表示されます。ExProtoを探し、Actions列のSetupをクリックします。すると、Initialize ExProtoページに遷移します。

TIP

EMQXをクラスターで運用している場合、ダッシュボードやREST APIで行った設定はクラスター全体に影響します。特定のノードだけ設定を変更したい場合は、base.hoconでゲートウェイを設定してください。

設定を簡略化するために、EMQXはGatewaysページのすべての必須フィールドにデフォルト値を用意しています。大幅なカスタマイズが不要な場合は、以下の3クリックでExProtoゲートウェイを有効化できます。

  1. Basic ConfigurationステップページでNextをクリックし、すべてのデフォルト設定を受け入れます。
  2. 次にListenersステップページに遷移し、EMQXはポート7993でTCPリスナーを事前設定しています。設定を確認してNextをクリックします。
  3. EnableボタンをクリックしてExProtoゲートウェイを有効化します。

ゲートウェイの有効化が完了すると、Gatewaysページに戻り、ExProtoゲートウェイのステータスがEnabledと表示されます。

Enabled ExProto gateway

上記の設定はREST APIでも行えます。

例:

bash
curl -X 'PUT' 'http://127.0.0.1:18083/api/v5/gateway/exproto' \
  -u <your-application-key>:<your-security-key> \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "exproto",
  "enable": true,
  "mountpoint": "exproto/",
  "server": {
    "bind": "0.0.0.0:9100"
  }
  "handler": {
    "address": "http://127.0.0.1:9001"
    "ssl_options": {"enable": false}
  }
  "listeners": [
    {
      "type": "tcp",
      "bind": "7993",
      "name": "default",
      "max_conn_rate": 1000,
      "max_connections": 1024000
    }
  ]
}'

詳細なREST APIの説明はREST APIを参照してください。

カスタマイズが必要な場合やリスナーの追加、認証ルールの追加を行いたい場合は、ExProtoゲートウェイのカスタマイズを参照してください。

ExProtoゲートウェイのカスタマイズ

デフォルト設定に加え、EMQXはさまざまな設定オプションを提供し、特定のビジネス要件に柔軟に対応できます。本節では、Gatewaysページで利用可能な設定オプションを詳しく解説します。

基本設定

GatewaysページでExProtoを探し、Actions列のSettingsをクリックします。Settingsタブで、ConnectionUnaryHandlerサービスのアドレス、ConnectionAdapterのリスニングポート、ゲートウェイのMountPoint文字列をカスタマイズできます。

Basic Configuration
  • Enable Statistics:ゲートウェイが統計情報を収集・報告するかどうかを設定します。デフォルトはtrueで、選択肢はtrueまたはfalseです。
  • Idle Timeout:非アクティブ状態が続いた後にクライアントを切断とみなすまでの秒数を設定します。デフォルトは30秒です。
  • MountPoint:パブリッシュやサブスクライブ時にすべてのトピックの前に付加される文字列を設定します。これにより異なるプロトコル間でメッセージルーティングの分離を実現できます(例:mqttsn/)。このトピックプレフィックスはゲートウェイが管理し、クライアントは明示的に付加する必要はありません。
  • gRPC ConnectionAdapterConnectionAdapterサービス起動のための設定を行います。
    • Bind:gRPCサーバーのリッスンアドレスとポート。デフォルトは0.0.0.0:9100です。
      • TLS Verify Client:ピア認証の有効・無効。デフォルトは無効。有効にすると、関連するTLS CertTLS KeyCA Certをファイル内容の入力またはSelect Fileボタンによるアップロードで設定できます。詳細はSSL/TLS接続の有効化を参照してください。
  • gRPC ConnectionHandlerConnectionUnaryHandlerを実装したコールバックサーバーの設定。
    • Server:コールバックgRPCサーバーのアドレス。
      • Enable TLS:gRPCサーバーでTLS接続を有効にするかどうか。デフォルトは無効。有効にすると以下の設定が可能です。
        • TLS Verify:ピア認証の有効・無効。デフォルトは無効。有効にすると、関連するTLS CertTLS KeyCA Certをファイル内容の入力またはアップロードで設定できます。
        • SNI:TLSのServer Name Indication拡張で使用するホスト名を指定します。

リスナーの追加

デフォルトで、ポート7993に名前defaultのTCPリスナーが1つ設定されており、1秒あたり最大1,000接続、最大1,024,000同時接続をサポートします。より詳細な設定を行うには、Listenersタブをクリックし、編集、削除、新規リスナーの追加が可能です。

exproto-listener

+ Add ListenerをクリックするとAdd Listenerページが開き、以下の設定項目を入力できます。

基本設定

  • Name:リスナーの一意識別子を設定します。
  • Type:プロトコルタイプを選択します。ExProtoの場合、udpまたはdtlsが選択可能です。
  • Bind:リスナーが接続を受け付けるポート番号を設定します。
  • MountPoint(任意):パブリッシュやサブスクライブ時にすべてのトピックの前に付加される文字列を設定し、異なるプロトコル間のメッセージルーティング分離を実現します。

リスナー設定

  • Acceptor:アクセプタープールのサイズを設定します。デフォルトは16です。
  • Max Connections:リスナーが処理可能な最大同時接続数を設定します。デフォルトは1024000です。
  • Max Connection Rate:リスナーが1秒あたり受け入れ可能な新規接続の最大レートを設定します。デフォルトは1000です。
  • Proxy Protocol:EMQXクラスターがHAProxyやNGINXの背後にある場合にProxy Protocol V1/V2を有効にします。デフォルトはfalseです。
  • Proxy Protocol Timeout:プロキシプロトコルのタイムアウト時間。タイムアウト内にプロキシプロトコルパケットを受信しない場合、EMQXはTCP接続を閉じます。デフォルトは3秒です。

TCP設定

  • ActiveN:ソケットの{active, N}オプションを設定します。これはソケットが能動的に処理可能な受信パケット数を意味します。詳細はErlangドキュメント - setopts/2を参照してください。
  • Buffer:受信および送信パケットを格納するバッファサイズをKB単位で設定します。
  • TCP_NODELAY:接続に対してTCP_NODELAYフラグを設定します。デフォルトはfalseです。
  • SO_REUSEADDR:ローカルのポート番号再利用を許可するかどうかを設定します。デフォルトはtrueです。
  • Send Timeout:接続のTCP送信タイムアウト時間を秒単位で設定します。デフォルトは15秒です。
  • Send Timeout Close:送信タイムアウト時に接続を閉じるかどうかを設定します。デフォルトはtrueです。

TLS設定(SSLリスナーのみ)

TLS Verifyの有効化はトグルスイッチで設定できますが、その前に関連するTLS CertTLS KeyCA Certをファイル内容の入力またはアップロードで設定する必要があります。詳細はSSL/TLS接続の有効化を参照してください。

続いて以下の設定が可能です。

  • SSL Versions:サポートするTLSバージョンを設定します。デフォルトはtlsv1tlsv1.1tlsv1.2tlsv1.3です。
  • SSL Fail If No Peer Cert:クライアントが空の証明書を送信した場合にEMQXが接続を拒否するかどうかを設定します。デフォルトはfalseで、trueまたはfalseが選択可能です。
  • CACert Depth:ピア証明書に続く有効な認証パスに含まれる自己発行でない中間証明書の最大数を設定します。デフォルトは10です。
  • Key File Passphrase:秘密鍵がパスワード保護されている場合に使用するパスワードを設定します。

認証の設定

ExProtoゲートウェイは以下のような多様な認証方式をサポートしています。

クライアント情報のClient ID、Username、PasswordはすべてConnectionAdapterAuthenticateメソッドで渡されるパラメータから取得されます。

本節ではダッシュボードを例に認証設定方法を説明します。

ExProtoページでAuthenticationタブをクリックします。

+ Create Authenticationをクリックし、MechanismPassword-Basedを選択、BackendHTTP Serverを選択してNextをクリックします。Configurationで認証ルールを設定できます。各項目の詳細はHTTPサーバー認証を参照してください。

mqttsn authentication

上記設定はREST APIでも行えます。

例:

bash
curl -X 'POST' 'http://127.0.0.1:18083/api/v5/gateway/exproto/authentication' \
  -u <your-application-key>:<your-security-key> \
  -H 'Content-Type: application/json' \
  -d '{
  "method": "post",
  "url": "http://127.0.0.1:8080",
  "headers": {
    "content-type": "application/json"
  },
  "body": {
    "username": "${username}",
    "password": "${password}"
  },
  "pool_size": 8,
  "connect_timeout": "5s",
  "request_timeout": "5s",
  "enable_pipelining": 100,
  "ssl": {
    "enable": false,
    "verify": "verify_none"
  },
  "backend": "http",
  "mechanism": "password_based",
  "enable": true
}'

テスト用のサンプルgRPCサービスの起動

本節では、ExProtoゲートウェイとgRPCサービスがどのように連携するかを示すために、サンプルgRPCサービスの起動方法を紹介します。

このデモでは、telnetコマンドを用いてTCPプロトコルでメッセージを送受信するクライアントをシミュレートします。実際の環境では、カスタムプライベートプロトコルを実装したデバイスがポート7993のTCPリスナーに接続します。ExProtoゲートウェイはポート7993でクライアント接続を受け付け、ポート9100でexproto.protoファイルで定義されたConnectionAdapterサービスを提供します。

emqx-extension-examplesにはさまざまな言語のサンプルgRPCサービスが用意されています。本デモでは、PythonでConnectionUnaryHandlerサービスを実装したエコープログラムexproto-svr-pythonを例に使います。これはTCPクライアントから受信したデータをそのまま返すだけのシンプルなサービスです。実際の環境では、これらのアップストリームメッセージをEMQXにパブリッシュしたり、トピックをサブスクライブしてEMQXからのメッセージをクライアント接続に届けたりします。

以下はexproto-svr-pythonを例にした手順です。

前提条件

開始前に以下を完了していることを確認してください。

  • EMQX 5.1.0以降を実行し、デフォルト設定でExProtoゲートウェイを有効化していること。

  • Python 3.7以降をインストールし、以下の依存関係をインストールしていること。

    python -m pip install grpcio
    python -m pip install grpcio-tools
  1. EMQXが動作している同一マシンで、サンプルコードをクローンし、exproto-svr-pythonディレクトリに移動します。

    bash
    git clone https://github.com/emqx/emqx-extension-examples
    cd exproto-svr-python
  2. 以下のコマンドでgRPCサーバーを起動します。

    python exproto_server.py

    起動に成功すると、以下のような出力が表示されます。

    ConnectionUnaryHandler started successfully, listening on 9001
    
    Tips: If the Listener of EMQX ExProto gateway listen on 7993:
          You can use the telnet to test the server, for example:
    
          telnet 127.0.0.1 7993
    
    Waiting for client connections...
  3. telnetを使ってExProtoゲートウェイがリッスンしているポート7993にアクセスします。Hi, this is tcp client!と入力し、gRPCサーバーが正常に動作しているかを確認します。例:

    $ telnet 127.0.0.1 7993
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    Hi, this is tcp client!
    Hi, this is tcp client!
  4. EMQXダッシュボードで左側ナビゲーションメニューの Management -> Gateways をクリックし、ExProtoのClientsをクリックします。ExProtoページで、telnetで接続したクライアントが表示されます。

    Connected ExProto Client

サンプルのシーケンス図

以下の図は、本サンプルにおける接続およびメッセージ配信のシーケンスを示しています。

exproto-sequence-diagram > exproto-svr-python: Succeed

exproto-svr-python ->> ExProto Gateway: Call 'Subscribe' to subscribe 'test/echo' ExProto Gateway -->> exproto-svr-python: Succeed exproto-svr-python ->> ExProto Gateway: Call 'StartTimer' to start keepalive timer ExProto Gateway -->> exproto-svr-python: Succeed exproto-svr-python -->> ExProto Gateway: OnSocketCreated return end Telnet ->> ExProto Gateway: Send 'Hi, this is...' rect rgb(100,150, 240) ExProto Gateway ->> exproto-svr-python: Call OnReceivedBytes exproto-svr-python --> exproto-svr-python: Use 'Hi, this is...' to create a message exproto-svr-python ->> ExProto Gateway: Call Publish to publish message to 'test/echo' ExProto Gateway -->> ExProto Gateway: Route the message ExProto Gateway -->> exproto-svr-python: Succeed exproto-svr-python -->> ExProto Gateway: OnReceivedBytes return end rect rgb(100, 150, 200) ExProto Gateway ->> exproto-svr-python: Call OnReceivedMessages exproto-svr-python -->> exproto-svr-python: Use message payload exproto-svr-python ->> ExProto Gateway: Call Send to deliver bytes 'Hi, this is ...' ExProto Gateway -->> exproto-svr-python: Succeed ExProto Gateway ->> Telnet: Deliver 'Hi, this is...' exproto-svr-python -->> ExProto Gateway: OnReceivedMessages return end ```-->