Apache KafkaへのMQTTデータストリーミング
Apache Kafkaは、アプリケーションやシステム間でのリアルタイムなデータストリーム転送を処理できる、広く利用されているオープンソースの分散イベントストリーミングプラットフォームです。しかし、KafkaはエッジIoT通信向けに設計されておらず、Kafkaクライアントは安定したネットワーク接続とより多くのハードウェアリソースを必要とします。IoTの領域では、デバイスやアプリケーションから生成されるデータは軽量なMQTTプロトコルを用いて送信されます。EMQXのKafkaとの統合により、ユーザーはMQTTデータをKafkaにシームレスにストリーミングできます。MQTTのデータストリームはKafkaのトピックに取り込まれ、リアルタイムの処理、保存、分析が可能です。逆に、KafkaのトピックデータはMQTTデバイスによって消費され、タイムリーなアクションを実現します。

本ページでは、EMQXとKafka間のデータ統合について包括的に紹介し、データ統合の作成および検証方法を実践的に解説します。
動作概要
Apache Kafkaとのデータ統合は、MQTTベースのIoTデータとKafkaの強力なデータ処理機能のギャップを埋めるためにEMQXに標準搭載された機能です。組み込みのルールエンジンコンポーネントにより、両プラットフォーム間のデータストリーミングと処理を簡素化し、複雑なコーディングを不要にします。
以下の図は、自動車IoTで利用されるEMQXとKafka間のデータ統合の典型的なアーキテクチャを示しています。

Apache Kafkaへのデータの流入および流出には、それぞれKafka Sink(Kafkaへメッセージを送信)とKafka Source(Kafkaからメッセージを受信)を作成する必要があります。ここではSinkを例に、その処理フローを説明します。
- メッセージのパブリッシュと受信:接続された車載IoTデバイスはMQTTプロトコルを介してEMQXに正常に接続し、定期的に状態データを含むメッセージをMQTTでパブリッシュします。EMQXがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:ブローカーと一体化した組み込みルールエンジンにより、これらのMQTTメッセージはトピックマッチングルールに基づいて処理されます。メッセージが到着するとルールエンジンを通過し、定義されたルールが評価されます。ペイロード変換を指定するルールがあれば、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの変換が適用されます。
- Kafkaへのブリッジ:ルールエンジンで定義されたルールがトリガーされると、メッセージはKafkaに転送されます。Kafkaブリッジ機能を用いて、MQTTトピックは事前定義されたKafkaトピックにマッピングされ、処理済みのすべてのメッセージとデータがKafkaトピックに書き込まれます。
車両データがKafkaに取り込まれた後は、以下のように柔軟にデータへアクセスし活用できます。
- サービスはKafkaクライアントと直接連携し、特定トピックからリアルタイムのデータストリームを消費してカスタマイズされたビジネス処理を実行可能です。
- Kafka Streamsを利用してストリーム処理を行い、車両状態をメモリ上で集約・相関させてリアルタイム監視を実現します。
- Kafka Connectコンポーネントを活用し、MySQLやElasticSearchなど外部システムへのデータ出力用コネクターを選択して保存できます。
特長とメリット
Apache Kafkaとのデータ統合は、以下の特長とメリットをビジネスにもたらします。
- 信頼性の高い双方向IoTデータメッセージング:不安定なモバイルネットワーク上で動作するリソース制約のあるIoTデバイスとKafka間のデータ通信は、不確実なネットワークでのメッセージングに優れたMQTTプロトコルで処理されます。EMQXはMQTTメッセージをバッチでKafkaに転送するだけでなく、バックエンドシステムからのKafkaメッセージをサブスクライブして接続されたIoTクライアントに配信します。
- ペイロード変換:メッセージのペイロードは転送中に定義されたSQLルールで処理可能です。例えば、総メッセージ数、成功/失敗配信数、メッセージレートなどのリアルタイムメトリクスを含むペイロードは、Kafkaに取り込む前にデータ抽出、フィルタリング、強化、変換を経ることができます。
- 効果的なトピックマッピング:多数のIoTビジネストピックは設定されたKafka統合によりKafkaトピックにマッピングされます。EMQXはMQTTユーザープロパティのKafkaヘッダーへのマッピングをサポートし、1対1、1対多、多対多の柔軟なトピックマッピング方法を採用し、MQTTトピックフィルター(ワイルドカード)にも対応しています。
- 柔軟なパーティション選択戦略:MQTTトピックやクライアントに基づいて同じKafkaパーティションにメッセージを転送することをサポートします。
- 高スループット環境での処理能力:EMQX Kafkaプロデューサーは同期・非同期の両書き込みモードをサポートし、リアルタイム優先とパフォーマンス優先のデータ書き込み戦略を区別可能で、シナリオに応じてレイテンシとスループットの柔軟なバランス調整を実現します。
- ランタイムメトリクス:各SinkおよびSourceの総メッセージ数、成功/失敗数、現在のレートなどのランタイムメトリクスの閲覧をサポートします。
- 動的設定:Dashboardまたは設定ファイルでSinkおよびSourceを動的に設定可能です。
これらの機能は統合能力と柔軟性を高め、効果的で堅牢なIoTプラットフォームアーキテクチャの構築を支援します。増大するIoTデータは安定したネットワーク接続のもとで送信され、さらに効果的に保存・管理されます。
はじめる前に
このセクションでは、EMQX DashboardでKafka SinkおよびSourceを作成する前に必要な準備について説明します。
前提条件
Kafkaサーバーのセットアップ
ここではmacOSを例にインストールと起動手順を示します。以下のコマンドでKafkaをインストールし起動できます。
wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
# KRaftモードでKafkaを起動
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
詳細な手順はKafka公式ドキュメントのクイックスタートを参照してください。
Kafkaトピックの作成
EMQXでのデータ統合作成前に、Kafkaトピックを作成しておく必要があります。以下のコマンドでSink用のtesttopic-in
とSource用のtesttopic-out
の2つのトピックを作成します。
bin/kafka-topics.sh --create --topic testtopic-in --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic testtopic-out --bootstrap-server localhost:9092
Kafkaプロデューサーコネクターの作成
Kafka Sinkアクションを追加する前に、EMQXとKafka間の接続を確立するためのKafkaプロデューサーコネクターを作成します。
EMQX DashboardでIntegration -> Connectorをクリックします。
ページ右上のCreateをクリックし、コネクター選択画面でKafka Producerを選択してNextをクリックします。
名前と説明を入力します。例:
my-kafka
。名前はKafka Sinkとコネクターを関連付けるために使用され、クラスター内で一意である必要があります。Kafka接続に必要なパラメータを設定します。
Bootstrap Hosts:
127.0.0.1:9092
を入力します。デモではEMQXとKafkaをローカルで実行している想定です。リモート環境の場合は適宜調整してください。Authentication:Kafkaクラスターで必要な認証方式を選択します。以下の方式をサポートしています。
None
:認証なし。AWS IAM for MSK
:EMQXがEC2インスタンス上で動作し、AWS MSKクラスターを利用する場合。Basic Auth
:mechanism(plain
、scram_sha_256
、scram_sha_512
のいずれか)を選択し、usernameとpasswordを入力。Kerberos
:Kerberos PrincipalとKerberos Keytab fileを指定。
詳細は認証方式を参照してください。
その他のオプションはデフォルトのままか、ビジネス要件に応じて設定してください。
暗号化接続を確立する場合は、Enable TLSのトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
Createをクリックする前に、Test ConnectionをクリックしてKafkaサーバーへの接続が成功するか確認できます。
Createをクリックしてコネクターの作成を完了します。
作成後、コネクターは自動的にKafkaに接続します。次に、このコネクターを基にルールを作成し、Kafkaクラスターへのデータ転送を設定します。
認証方式
EMQXでKafkaコネクターを作成する際、Kafkaクラスターのセキュリティ設定に応じて以下の認証方式を選択できます。
None:認証不要。
MSK IAM:EMQXがAmazon EC2インスタンス上で動作し、Amazon MSKクラスターに接続する場合に使用。
AWS EC2インスタンスのメタデータサービスを利用し、IAMポリシーに基づく認証トークンを生成します。
重要
MSK IAM認証は、EMQXがEC2インスタンス上で動作しMSKクラスターに接続する場合のみサポートされます。AWS Metadata APIに依存しているためです。
Basic Auth:ユーザー名とパスワードによる認証。
選択時は以下を指定する必要があります。
- Mechanism:
plain
、scram_sha_256
、scram_sha_512
のいずれかを選択。 - UsernameとPassword:Kafkaクラスター認証用の資格情報。
- Mechanism:
Kerberos:Kerberos GSSAPIによる認証。
以下を指定する必要があります。
- Kerberos Principal:認証に使用するKerberosのプリンシパル。
- Kerberos Keytab File:非対話認証用のkeytabファイルのパス。
重要
KerberosのkeytabファイルはすべてのEMQXノードで同じパスに配置し、EMQXサービスユーザーが読み取り権限を持つ必要があります。
Kafka Sinkを用いたルールの作成
このセクションでは、MQTTトピックt/#
からのメッセージを処理し、Kafka Sinkを使ってKafkaのtesttopic-in
トピックに送信するルールの作成方法を示します。
EMQX DashboardでIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDを入力します。例:
my_rule
。SQL Editorに以下のステートメントを入力します。これはトピック
t/#
のMQTTメッセージをKafkaに転送する例です。注意:独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを
SELECT
句に含めてください。sqlSELECT * FROM "t/#"
TIP
初心者の方はSQL ExamplesやEnable TestをクリックしてSQLルールの学習とテストが可能です。
TIP
EMQX v5.7.2からルールSQLで環境変数を読み取る機能が追加されました。詳細はルールSQLで環境変数を使うを参照してください。
- Add Actionボタンをクリックしてルールでトリガーされるアクションを定義します。Type of Actionドロップダウンから
Kafka Producer
を選択し、ActionはデフォルトのCreate Action
のままか、既存のKafka Producerアクションを選択します。ここでは新規作成してルールに追加します。
- Add Actionボタンをクリックしてルールでトリガーされるアクションを定義します。Type of Actionドロップダウンから
Sinkの名前と説明を対応するテキストボックスに入力します。
Connectorドロップダウンから先ほど作成した
my-kafka
コネクターを選択します。隣のボタンをクリックするとポップアップで新規コネクター作成も可能です。設定パラメータはKafkaプロデューサーコネクターの作成を参照してください。Sinkのデータ送信方法を設定します。
Kafka Topic:
testtopic-in
を入力します。EMQX v5.7.2以降、このフィールドは動的トピック設定もサポートしています。詳細は変数テンプレートの使用を参照してください。Kafka Headers:Kafkaメッセージに関連するメタデータやコンテキスト情報を入力します(任意)。プレースホルダーの値はオブジェクトである必要があります。ヘッダー値のエンコードタイプはKafka Header Value Encod Typeドロップダウンから選択可能です。Addをクリックしてキー・バリューのペアを追加できます。
Message Key:Kafkaメッセージのキー。純粋な文字列または
${var}
を含む文字列を入力可能です。Message Value:Kafkaメッセージの値。純粋な文字列または
${var}
を含む文字列を入力可能です。Partition Strategy:プロデューサーがKafkaパーティションにメッセージを分配する方法を選択します。
Compression:Kafkaメッセージのレコード圧縮/解凍に使用する圧縮アルゴリズムを指定します。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義可能です。詳細はフォールバックアクションを参照してください。
詳細設定(任意):詳細設定を参照してください。
CreateをクリックしてSinkの作成を完了します。作成後はCreate Rule画面に戻り、新しいSinkがルールアクションに追加されます。
Createをクリックしてルール作成を完了します。
これでルールが正常に作成され、Integration -> Rulesページで新規ルールを確認でき、**Actions(Sink)**タブには新規KafkaプロデューサーSinkが表示されます。
また、Integration -> Flow Designerをクリックするとトポロジーを確認できます。トポロジーでは、トピックt/#
のメッセージがルールmy_rule
で解析されKafkaに送信・保存される様子が直感的に把握できます。
Kafkaの動的トピック設定
EMQX v5.7.2以降、Kafka Producer Sink設定で環境変数や変数テンプレートを用いてKafkaトピックを動的に設定可能です。本節ではこれら2つの動的トピック設定のユースケースを紹介します。
環境変数の利用
EMQX v5.7.2では、ルールSQL処理中に環境変数から取得した値をメッセージ内のフィールドに動的に割り当てる機能が追加されました。この機能はルールエンジンの組み込みSQL関数getenvを利用し、EMQXの環境変数を取得してSQL処理結果にセットします。この機能を応用し、Kafka SinkルールアクションのKafkaトピック設定でルール出力結果のフィールドを参照してKafkaトピックを設定できます。以下はその例です。
注意
ルールエンジンが読み取る環境変数名は、他のシステム環境変数の漏洩を防ぐために固定プレフィックスEMQXVAR_
を付ける必要があります。例えば、getenv
関数で読み取る変数名がKAFKA_TOPIC
の場合、環境変数名はEMQXVAR_KAFKA_TOPIC
と設定してください。
Kafkaを起動し、
testtopic-in
トピックを事前作成します。はじめる前にの手順を参照してください。EMQXを起動し環境変数を設定します。zip版インストールの場合、起動時に直接環境変数を指定可能です。例としてKafkaトピック
testtopic-in
を環境変数EMQXVAR_KAFKA_TOPIC
に設定します。bashEMQXVAR_KAFKA_TOPIC=testtopic-in bin/emqx start
コネクターを作成します。Kafkaプロデューサーコネクターの作成を参照してください。
Kafka Sinkルールを設定します。SQL Editorに以下を入力します。
sqlSELECT getenv('KAFKA_TOPIC') as kafka_topic, payload FROM "t/#"
SQLテストを有効にし、環境変数値
testtopic-in
が正しく取得できることを確認します。Kafka Producer Sinkのアクションを追加します。ルール右側のAction OutputsでAdd Actionをクリックします。
- Connector:先ほど作成した
test-kafka
を選択。 - Kafka Topic:SQLルール出力に基づき変数テンプレート
${kafka_topic}
形式で設定。
- Connector:先ほど作成した
Kafka Sinkを用いたルールの作成を参照して残りの設定を完了し、最後にCreateをクリックしてルール作成を完了します。
Kafkaプロデューサールールのテストの手順に従い、Kafkaにメッセージを送信します。
bashmqttx pub -h 127.0.0.1 -p 1883 -i pub -t t/Connection -q 1 -m 'payload string'
メッセージはKafkaトピック
testtopic-in
で受信されるはずです。bashbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \ --topic testtopic-in {"payload":"payload string","kafka_topic":"testtopic-in"} {"payload":"payload string","kafka_topic":"testtopic-in"}
変数テンプレートの利用
Kafka Topicフィールドに静的なトピック名を設定する代わりに、変数テンプレートを用いて動的にトピックを生成することも可能です。これによりメッセージ内容に基づいてKafkaトピックを構築でき、柔軟なメッセージ処理と振り分けが実現します。例えば、device-${payload.device}
のように指定すると、特定デバイスからのメッセージをdevice-1
などデバイスIDを付加したトピックに簡単に送信できます。
この例では、Kafkaに送信するメッセージのペイロードにdevice
キーが含まれている必要があります。例:
{
"topic": "t/devices/data",
"payload": {
"device": "1",
"temperature": 25.6,
"humidity": 60.2
}
}
このキーがないとトピックのレンダリングに失敗し、メッセージは回復不能なドロップとなります。
また、Kafka側で解決されるすべてのトピック(例:device-1
、device-2
など)を事前に作成しておく必要があります。存在しないトピック名に解決された場合もメッセージは回復不能なエラーでドロップされます。
Kafkaプロデューサールールのテスト
Kafkaプロデューサールールが期待通りに動作するかをテストするには、MQTTXを使ってEMQXにMQTTメッセージをパブリッシュするクライアントをシミュレートします。
- MQTTXでトピック
t/1
にメッセージを送信します。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Kafka" }'
**Actions(Sink)**ページでSink名をクリックし統計情報を確認します。Sinkの稼働状況をチェックし、新規受信メッセージ数と送信メッセージ数が1件ずつ増えていることを確認します。
以下のコマンドでメッセージが
testtopic-in
トピックに書き込まれているか確認します。bashbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testtopic-in
Kafkaコンシューマーコネクターの作成
Kafka Sourceアクションを追加する前に、EMQXとKafka間の接続を確立するKafkaコンシューマーコネクターを作成します。
EMQX DashboardでIntegration -> Connectorをクリックします。
ページ右上のCreateをクリックします。
Create ConnectorページでKafka Consumerを選択し、Nextをクリックします。
ソースの名前を入力します。英数字の組み合わせで例:
my-kafka-source
。ソースの接続情報を入力します。
Bootstrap Hosts:
127.0.0.1:9092
を入力します。デモではEMQXとKafkaをローカルで実行している想定です。リモート環境の場合は適宜調整してください。Authentication:Kafkaクラスターで必要な認証方式を選択します。以下をサポートしています。
None
:認証なし。authentication_msk_iam
:EMQXがEC2インスタンス上で動作し、AWS MSKクラスターを利用する場合。Basic Auth
:Mechanism(plain
、scram_sha_256
、scram_sha_512
のいずれか)を選択し、UsernameとPasswordを入力。Kerberos
:Kerberos PrincipalとKerberos Keytab Fileを指定。
詳細は認証方式を参照してください。
その他のオプションはデフォルトのままか、ビジネス要件に応じて設定してください。
暗号化接続を確立する場合は、Enable TLSのトグルをオンにします。TLS接続の詳細は外部リソースアクセスのTLSを参照してください。
詳細設定(任意):詳細設定を参照してください。
Createをクリックする前に、Test ConnectionをクリックしてKafkaサーバーへの接続が成功するか確認できます。
Createをクリックしてソースの作成を完了します。関連ルールの作成オプションが表示されます。KafkaコンシューマーSourceを用いたルールの作成を参照してください。
KafkaコンシューマーSourceを用いたルールの作成
このセクションでは、KafkaコンシューマーSourceで転送されたメッセージをさらに処理し、MQTTトピックに再パブリッシュするルールの作成方法を示します。
ルールSQLの作成
EMQX DashboardでIntegration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDを入力します。例:
my_rule
。Kafkaソース
$bridges/kafka_consumer:<sourceName>
から変換されたメッセージをEMQXに転送する場合、SQL Editorに以下のステートメントを入力します。注意:独自のSQL構文を指定する場合は、後続の再パブリッシュアクションで必要なすべてのフィールドを
SELECT
句に含めてください。Kafka SourceのSELECT
文では、ts_type
、topic
、ts
、event
、headers
、key
、metadata
、value
、timestamp
、offset
、node
などのフィールドが使用可能です。sqlSELECT * FROM "$bridges/kafka_consumer:<sourceName>"
初心者の方はSQL ExamplesやEnable TestをクリックしてSQLルールの学習とテストが可能です。
KafkaコンシューマーSourceをデータ入力に追加
ルール作成画面右側のData Inputsタブを選択し、Add Inputをクリックします。
Input TypeドロップダウンからKafka Consumerを選択します。Sourceはデフォルトの
Create Source
のままか、既存のKafka Consumerソースを選択します。ここでは新規作成してルールに追加します。ソースの名前と説明を対応するテキストボックスに入力します。
Connectorドロップダウンから先ほど作成した
my-kafka-consumer
コネクターを選択します。隣のボタンをクリックするとポップアップで新規コネクター作成も可能です。設定パラメータはKafkaコンシューマーコネクターの作成を参照してください。以下のフィールドを設定します。
- Kafka Topic:コンシューマーソースがサブスクライブするKafkaトピックを指定します。
- Group ID:このソースのコンシューマーグループIDを指定します。未指定の場合はソース名に基づいて自動生成されます。
- Key Encoding ModeおよびValue Encoding Mode:Kafkaメッセージのキーと値のエンコードモードを選択します。
Offset Reset Policy:Kafkaコンシューマーがオフセットを持たないか無効な場合に読み取り開始位置を決定するポリシーを選択します。
latest
を選択すると、コンシューマーは最新のオフセットから読み始め、開始前のメッセージはスキップします。earliest
を選択すると、コンシューマーはパーティションの先頭から読み始め、開始前のメッセージも含めてすべての履歴データを読み取ります。
詳細設定(任意):詳細設定を参照してください。
Createをクリックする前に、Test ConnectivityをクリックしてKafkaサーバーへの接続が成功するか確認できます。
Createをクリックしてソースの作成を完了します。ルール作成画面のData Inputsタブに新しいソースが表示されます。
再パブリッシュアクションの追加
Action Outputsタブを選択し、+ Add Actionボタンをクリックしてルールでトリガーされるアクションを定義します。
Type of ActionドロップダウンからRepublishを選択します。
TopicおよびPayloadフィールドに再パブリッシュするメッセージのトピックとペイロードを入力します。例として
Topic
にt/1
、Payload
に${.}
を入力します。${}
を用いてMQTTトピックを動的指定することも可能です。例:t/${key}
(${}
内のパラメータはSQLのSELECT
文に含まれている必要があります)。
Addをクリックしてアクションをルールに追加します。
ルール作成画面に戻り、Saveをクリックします。
Kafka Sourceルールのテスト
Kafka Sourceとルールが期待通りに動作するかをテストするには、MQTTXを使ってEMQXのトピックをサブスクライブするクライアントをシミュレートし、KafkaプロデューサーでKafkaトピックにデータを生成します。その後、KafkaからのデータがEMQXによってクライアントがサブスクライブするトピックに再パブリッシュされているか確認します。
MQTTXでトピック
t/1
をサブスクライブします。bashmqttx sub -t t/1 -v
新しいコマンドラインウィンドウを開き、以下のコマンドでKafkaプロデューサーを起動します。
bashbin/kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic testtopic-out
メッセージ入力待ちになります。
{"msg": "Hello EMQX"}
を入力してtesttopic-out
トピックにメッセージを生成し、Enterキーを押します。MQTTXのサブスクリプションを確認します。Kafkaからの以下のメッセージがトピック
t/1
で受信されるはずです。json{ "value": "{\"msg\": \"Hello EMQX\"}", "ts_type": "create", "ts": 1679665968238, "topic": "testtopic-out", "offset": 2, "key": "key", "headers": { "header_key": "header_value" } }
詳細設定
このセクションでは、データ統合のパフォーマンス最適化やシナリオに応じたカスタマイズを可能にする詳細設定オプションを説明します。コネクター、Sink、Source作成時にAdvanced Settingsを展開し、ビジネス要件に応じて以下の設定を行えます。
フィールド名 | 説明 | 推奨値 |
---|---|---|
Min Metadata Refresh Interval | クライアントがKafkaブローカーおよびトピックのメタデータを更新する最小間隔。小さすぎるとKafkaサーバーの負荷が増加する可能性があります。 | 3 秒 |
Metadata Request Timeout | Kafkaからメタデータを要求する際の最大待機時間。 | 5 秒 |
Connect Timeout | TCP接続確立の最大待機時間。認証有効時は認証時間も含みます。 | 5 秒 |
Max Wait Time (Source) | Kafkaブローカーからのフェッチ応答を待つ最大時間。 | 1 秒 |
Fetch Bytes (Source) | Kafkaから1回のフェッチで取得するバイト数。設定値がメッセージサイズより小さいとフェッチ性能に悪影響があります。 | 896 KB |
Max Batch Bytes (Sink) | Kafkaバッチ内で収集するメッセージの最大バイト数。Kafkaブローカーのデフォルトは1MBですが、EMQXはメッセージエンコードのオーバーヘッドを考慮しやや小さめに設定。単一メッセージが制限を超える場合は別バッチで送信。 | 896 KB |
Offset Commit Interval (Source) | コンシューマーグループごとに送信するオフセットコミット要求の間隔。 | 5 秒 |
Required Acks (Sink) | Kafkaパーティションリーダーがフォロワーから待つアックの種類:all_isr :全てのインシンクレプリカからのアックを要求。leader_only :リーダーのみからのアックを要求。none :Kafkaからのアック不要。 | all_isr |
Partition Count Refresh Interval (Source) | Kafkaプロデューサーがパーティション数増加を検知する間隔。増加検知後、指定のpartition_strategy に基づき新パーティションをメッセージ送信に組み込みます。 | 60 秒 |
Max Inflight (Sink) | Kafkaプロデューサーがアック受信前に送信可能な最大バッチ数(パーティションごと)。大きいほどスループット向上。ただし1より大きいとメッセージ順序が入れ替わるリスクあり。 | 10 |
Query Mode (Source) | 非同期または同期クエリモードを選択し、メッセージ送信を最適化。非同期モードではKafka書き込みがMQTTパブリッシュをブロックしませんが、クライアントがKafka到着前にメッセージを受信する可能性があります。 | Async |
Synchronous Query Timeout (Sink) | 同期クエリモード時の最大待機時間。メッセージ送信完了をタイムリーに保証し、長時間待機を防止。同期モード設定時のみ有効。 | 5 秒 |
Buffer Mode (Sink) | メッセージ送信前のバッファリング方式。メモリバッファリングは送信速度向上に寄与。memory :メモリにバッファ。EMQXノード再起動時にメッセージは失われる。disk :ディスクにバッファ。再起動後もメッセージ保持。hybrid :初めはメモリバッファ。一定サイズ超過時に徐々にディスクにオフロード。メモリモード同様、再起動時はメッセージ失われる。 | memory |
Per-partition Buffer Limit (Sink) | Kafkaパーティションごとの最大バッファサイズ(バイト)。上限到達時は古いメッセージを破棄しバッファ領域を確保。メモリ使用量と性能のバランス調整に有効。 | 2 GB |
Segment File Bytes (Sink) | バッファモードがdisk またはhybrid の場合に適用。メッセージ保存用のセグメントファイルサイズを制御し、ディスクストレージの最適化に影響。 | 100 MB |
Memory Overload Protection (Sink) | バッファモードがmemory の場合に適用。メモリ圧迫時に古いバッファメッセージを自動破棄し、システムの安定性を確保。Linuxのみ有効。 | Enabled |
Socket Send / Receive Buffer Size | ネットワーク送受信のソケットバッファサイズを管理し、通信性能を最適化。 | 1024 KB |
TCP Keepalive | Kafkaブリッジ接続のTCPキープアライブを有効化し、長時間のアイドルによる接続切断を防止。値はIdle, Interval, Probes の3つの数値をカンマ区切りで指定。Idle:サーバーがキープアライブプローブを送るまでのアイドル秒数(Linuxデフォルト7200秒)。 Interval:プローブ間隔秒数(Linuxデフォルト75秒)。 Probes:応答なしと判断するまでの最大プローブ回数(Linuxデフォルト9回)。 例: 240,30,5 は240秒のアイドル後にプローブ開始、30秒間隔で最大5回送信。 | none |
Max Linger Time | パーティションごとのプロデューサーがバッチ収集のためにメッセージを待つ最大時間。デフォルト0 は待機なし。メモリバッファ以外では5ms がIOPS削減に効果的だがレイテンシ増加のトレードオフあり。 | 0 ミリ秒 |
Max Linger Bytes | パーティションごとのプロデューサーがバッチ収集のために待つ最大バイト数。 | 10 MB |
Health Check Interval | コネクターの稼働状況チェック間隔。 | 15 秒 |
さらに詳しく
EMQXはApache Kafkaとのデータ統合に関する豊富な学習リソースを提供しています。以下のリンクから詳細を学べます。
ブログ:
- MQTTとKafkaによるコネクテッドビークルのストリーミングデータパイプライン構築:3分ガイド
- MQTTとKafka:IoTデータ統合の強化
- MQTTパフォーマンスベンチマークテスト:EMQX-Kafka統合
ベンチマークレポート:
動画:
- EMQX Cloudルールエンジンを使ったデバイスデータのKafkaブリッジ(本動画はCloudルールエンジンに関するもので、将来的により適切な動画に差し替え予定)