Skip to content

MQTTデータをディスクログに取り込む

ディスクログデータ統合により、EMQXはイベントデータをJSON Lines形式でディスクに永続化できます。これは従来のローテートログファイルに似ており、トラブルシューティングや履歴追跡のための長期的なイベント保持を可能にします。

本ページでは、EMQXとディスクログ間のデータ統合について詳しく解説し、ルールおよびSinkの作成方法を実践的に案内します。

動作概要

EMQXにはエラーや警告、システムアクティビティなどの運用イベントを監視するための組み込みシステムログがありますが、ディスクログ統合は異なる目的を持ちます。実際のMQTTメッセージデータやクライアントレベルのイベントをディスクに永続化し、保持およびオフライン処理を可能にします。

EMQXのルールエンジンとSink機構を用いて実装されており、ユーザーはどのデータをキャプチャし、どのように保存するかを正確に定義できます。

  1. ルールでMQTTメッセージやクライアントイベントから関心のあるデータをフィルター、変換、抽出します。
  2. ルールにディスクログSinkを紐付け、データの保存方法と場所を定義します。Sinkはフォーマット済みデータ(JSON形式)を対応するコネクターに転送します。
  3. ディスクログコネクターがファイルシステムへの物理的な書き込みを管理し、ログファイルパス設定やログローテーションポリシーなどを処理します。
  4. ルールがトリガーされデータがSinkに渡されると、Sinkは設定されたコネクターを呼び出してJSON Lines形式で指定のローカルディレクトリに書き込みます。これにより標準ツールや下流のデータシステムでの利用が容易になります。

ログローテーション

ディスクログ統合はローカルファイルシステム上の指定ディレクトリにメッセージを書き込みます。ストレージ使用量を管理するため、ログファイルはファイルサイズとファイル数の閾値に基づいてローテーションされます。

  • 設定された最大ファイルサイズに達すると、新しいファイルを開いて書き込みを続けます。
  • 設定された最大ファイル数に達すると、最も古いファイルが切り詰められ、新しいエントリの書き込みに使われます。
  • 各ログファイルには少なくとも1つの完全なエントリが含まれることが保証されます。たとえそのエントリが指定サイズを超えていてもです。

特長とメリット

ディスクログ統合はMQTTメッセージの永続化に対して柔軟で軽量、かつローカルファーストなソリューションを提供します。主な特長と利点は以下の通りです。

  • 細粒度なデータ制御:SQLベースのルールで必要なメッセージやイベントのみをログ化。ログ前に変換・フィルター・拡張が可能。
  • 構造化された出力形式:JSON Lines形式でデータを保存し、機械処理が容易。
  • 軽量かつ自己完結型:外部ストレージシステムやデータベースへの接続不要。
  • 可観測性とデバッグ:メッセージレベルの可視化を実現し、トラブルシューティングや監査に有用。システムログと補完的に動作し、システムイベントではなくデータフローを記録。

はじめる前に

このセクションでは、EMQXでディスクログSinkを作成する前の準備について説明します。

前提条件

ログディレクトリの作成

EMQXホスト上にログファイルを保存するための書き込み可能なディレクトリを作成してください。EMQXのシステムユーザーがこのディレクトリに対して読み書き権限を持っている必要があります。

コネクターの作成

ディスクログSinkを追加する前に、対応するコネクターを作成する必要があります。

  1. ダッシュボードの Integration -> Connector ページに移動します。
  2. 右上の Create ボタンをクリックします。
  3. コネクタータイプとして Disk Log を選択し、Next をクリックします。
  4. コネクター名を入力します。英数字の組み合わせで、ここでは my-disk-log と入力します。
  5. コネクターのパラメーターを入力します。
    • Log Filepath:ログを保存するディレクトリのパス。
    • Maximum File Size:ローテーション前の各ファイルの最大サイズ。注意:各ログには少なくとも1つのエントリが書き込まれるため、単一のログエントリがこの値を超える場合、最終的なファイルサイズはこの最大値を超えることがあります。
    • Maximum Number of Files:古いログを上書きする前に保持するファイルの最大数。
  6. Create をクリックする前に、Test Connectivity をクリックして、設定したパスにログを書き込めるかテストできます。
  7. 画面下部の Create ボタンをクリックしてコネクター作成を完了します。

これでコネクターの作成が完了しました。次に、ディスクログに書き込むデータを指定するためのルールとSinkの作成に進みます。

ディスクログSink付きルールの作成

このセクションでは、EMQXでソースMQTTトピック t/# からメッセージを処理し、処理結果を設定済みのSinkを通じてローカルログファイルに書き込むルールの作成方法を示します。

  1. ダッシュボードの Integration -> Rules ページに移動します。

  2. 右上の Create ボタンをクリックします。

  3. ルールIDに my_rule を入力し、SQLエディターに以下のルールSQLを入力します。

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    SQLに不慣れな場合は、SQL ExamplesEnable Debug をクリックしてルールSQLの学習や結果のテストが可能です。

  4. アクションを追加し、Action Type ドロップダウンから Disk Log を選択します。アクションのドロップダウンはデフォルトの create action のままにするか、既存のDisk Logアクションを選択します。ここでは新しいSinkを作成し、ルールに追加します。

  5. Sinkの名前と説明を入力します。

  6. コネクタードロップダウンから先ほど作成した my-disk-log コネクターを選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成できます。必要な設定パラメーターは コネクターの作成 を参照してください。

  7. 希望する Write Mode(非同期または同期)を選択します。

  8. 有効なJSONオブジェクトにレンダリングされる必要がある Message Template を設定します。

  9. フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のために、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらのアクションがトリガーされます。詳細は フォールバックアクション を参照してください。

  10. 詳細設定 を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細は 詳細設定 をご覧ください。

  11. 残りの設定はデフォルト値のままにし、Create ボタンをクリックしてSink作成を完了します。作成成功後、ルール作成画面に戻り、新しいSinkがルールアクションに追加されます。

  12. ルール作成画面で Create ボタンをクリックし、ルール作成全体を完了します。

これでルールの作成に成功しました。Rules ページで新規作成されたルールを確認でき、Actions (Sink) タブで新しいディスクログSinkも確認できます。

また、Integration -> Flow Designer をクリックするとトポロジーを視覚的に確認できます。トポロジーはトピック t/# のメッセージがルール my_rule によって解析され、ディスクログに書き込まれる流れを示します。

ルールのテスト

このセクションでは、直接アップロード方式で設定したルールのテスト方法を示します。

MQTTXを使ってトピック t/1 にメッセージをパブリッシュします。

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Disk Log" }'

数件メッセージを送信した後、設定したディスクログディレクトリ内の最終更新ファイルを確認し、生成されたイベント内容を確認してください。

詳細設定

このセクションでは、ディスクログSinkの高度な設定オプションについて説明します。ダッシュボードのSink設定画面で Advanced Settings を展開し、用途に応じて以下のパラメーターを調整できます。

フィールド名説明デフォルト値
Buffer Pool SizeEMQXとディスクログ間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保存・処理し、ディスクログへの送信を最適化するために重要です。16
Request TTLリクエストTTL(Time To Live)は、リクエストがバッファに入ってから有効とみなされる最大時間(秒)を指定します。この時間を超えるか、ディスクログへの永続化応答やアックがタイムリーに得られない場合、リクエストは期限切れと見なされます。
Health Check IntervalSinkがディスクログの自動ヘルスチェックを行う間隔(秒)を指定します。15
Max Buffer Queue Size各バッファワーカープロセスがディスクログSinkでバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時保存し、効率的なデータストリーム処理を行います。システム性能やデータ転送要件に応じて調整してください。256
Query Mode同期(synchronous)または非同期(asynchronous)のリクエストモードを選択し、メッセージ送信を最適化します。非同期モードではディスクログへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受信してからディスクログへの書き込みが完了するまでのタイムラグが生じる可能性があります。Asynchronous
Batch SizeEMQXからディスクログへ一度に書き込むデータバッチの最大サイズを指定します。サイズ調整によりデータ転送の効率と性能を微調整できます。
「Batch Size」を「1」に設定すると、データレコードはバッチ化せず個別に送信されます。このアクションは大きめのバッチサイズで効果を発揮します。
1000
Inflight Window「インフライトキューリクエスト」とは、開始されたがまだ応答やアックを受け取っていないリクエストを指します。この設定はSinkとディスクログ間の通信で同時に存在可能なインフライトリクエストの最大数を制御します。
Request Modeasynchronous の場合、このパラメーターは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、この値を 1 に設定してください。
100