MQTTデータをディスクログに取り込む
ディスクログのデータ統合により、EMQXはイベントデータをJSON Lines形式でディスクに永続化できます。これは従来のローテーティングログファイルに似ており、トラブルシューティングや履歴追跡のための長期的なイベント保持を可能にします。
本ページでは、EMQXとディスクログ間のデータ統合について詳細に解説し、ルールおよびSinkの作成方法について実践的なガイダンスを提供します。
動作概要
EMQXにはエラー、警告、システムアクティビティなどの運用イベントを監視するための組み込みシステムログがありますが、ディスクログ統合は異なる目的を持ちます。実際のMQTTメッセージデータやクライアントレベルのイベントをディスクに永続化し、保持およびオフライン処理を可能にします。
ディスクログ統合はEMQXのルールエンジンとSink機構を利用して実装されており、ユーザーはどのデータをキャプチャし、どのように保存するかを正確に定義できます。
- ルールでMQTTメッセージやクライアントイベントから関心のあるデータをフィルタリング、変換、抽出します。
- ルールにディスクログSinkを紐付け、データの保存方法と場所を定義します。Sinkは整形済みデータ(JSON形式)を対応するコネクターへ転送します。
- ディスクログコネクターがファイルシステムへの物理的な書き込みを管理します。ログファイルのパス設定やローテーションポリシーなどを処理します。
- ルールがトリガーされデータがSinkに渡されると、Sinkは設定されたコネクターを呼び出し、JSON Lines形式で指定されたローカルディレクトリに書き込みます。これにより標準ツールや下流のデータシステムでの利用が容易になります。
ログローテーション
ディスクログ統合はローカルファイルシステムの指定ディレクトリにメッセージを書き込みます。ストレージ使用量を管理するため、各ログファイルはファイルサイズとファイル数の閾値に基づいてローテーションされます。
- 設定された最大ファイルサイズに達すると、EMQXは新しいファイルを開き書き込みを続けます。
- 最大ファイル数に達すると、最も古いファイルが切り詰められ、新しいエントリの書き込みに使用されます。
- 各ログファイルには少なくとも1つの完全なエントリが含まれます。たとえそのエントリが指定サイズを超える場合でも保証されます。
特長と利点
ディスクログ統合はMQTTメッセージの永続化に対して柔軟で軽量、かつローカルファーストなソリューションを提供します。主な特長と利点は以下の通りです。
- 細粒度のデータ制御:SQLベースのルールを使い、必要なメッセージやイベントのみをログに記録。ログ前に変換、フィルタリング、データの付加も可能です。
- 構造化された出力フォーマット:JSON Lines形式でデータを保存し、機械処理が容易です。
- 軽量かつ自己完結型:外部ストレージシステムやデータベースへの接続は不要です。
- 可観測性とデバッグ:メッセージレベルの可視化によりトラブルシューティングや監査を支援。EMQXのシステムログと補完的に動作し、システムイベントではなくデータフローを記録します。
はじめる前に
このセクションでは、EMQXでディスクログSinkを作成する前の準備について説明します。
前提条件
ログディレクトリの作成
EMQXホスト上にログファイル保存用の書き込み可能なディレクトリを作成してください。EMQXのシステムユーザーがこのディレクトリに対して読み書き権限を持っている必要があります。
コネクターの作成
ディスクログSinkを追加する前に、対応するコネクターを作成する必要があります。
- ダッシュボードの Integration -> Connector ページに移動します。
- 右上の Create ボタンをクリックします。
- コネクタータイプとして Disk Log を選択し、Next をクリックします。
- コネクター名を入力します。英数字の組み合わせで、ここでは
my-disk-logと入力します。 - コネクターのパラメーターを入力します。
- Log Filepath:ログを保存するディレクトリのパス。
- Maximum File Size:ローテーション前の各ファイルの最大サイズ。注意:各ログには少なくとも1エントリが書き込まれるため、単一エントリがこの値を超える場合は最終ファイルサイズがこの最大値を超えることがあります。
- Maximum Number of Files:古いログをローテーションする前に保持する最大ファイル数。
- Create をクリックする前に、Test Connectivity をクリックして、設定したパスにログを書き込めるかテストできます。
- 画面下部の Create ボタンをクリックしてコネクター作成を完了します。
これでコネクター作成が完了しました。次に、ディスクログに書き込むデータを指定するためのルールとSinkを作成します。
ディスクログSink付きルールの作成
このセクションでは、EMQXでソースMQTTトピック t/# のメッセージを処理し、処理結果を設定済みSink経由でローカルログファイルに書き込むルールの作成方法を示します。
ダッシュボードの Integration -> Rules ページに移動します。
右上の Create ボタンをクリックします。
ルールIDに
my_ruleを入力し、SQLエディターに以下のルールSQLを入力します。sqlSELECT * FROM "t/#"TIP
SQLに不慣れな場合は、SQL Examples と Enable Debug をクリックしてルールSQLの結果を学習・テストできます。
アクションを追加し、Action Type ドロップダウンから
Disk Logを選択します。アクションのドロップダウンはデフォルトのcreate actionのままにするか、既存のDisk Logアクションを選択します。ここでは新しいSinkを作成してルールに追加します。Sinkの名前と説明を入力します。
コネクタードロップダウンから先ほど作成した
my-disk-logコネクターを選択します。ドロップダウン横の作成ボタンをクリックするとポップアップで新規コネクターを素早く作成できます。必要な設定パラメーターはコネクターの作成を参照してください。希望の Write Mode(非同期または同期)を選択します。
有効なJSONオブジェクトにレンダリングされる必要がある Message Template を設定します。
フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。プライマリSinkがメッセージ処理に失敗した場合にこれらのアクションがトリガーされます。詳細はフォールバックアクションを参照してください。
詳細設定を展開し、必要に応じて高度な設定オプションを構成します(任意)。詳細は詳細設定を参照してください。
残りの設定はデフォルト値のままにし、Create ボタンをクリックしてSink作成を完了します。作成成功後、ルール作成画面に戻り、新しいSinkがルールのアクションに追加されます。
ルール作成画面で Create ボタンをクリックし、ルール作成全体を完了します。
これでルールの作成が完了しました。Rules ページで新規作成したルールを確認でき、Actions (Sink) タブで新しいディスクログSinkを確認できます。
また、Integration -> Flow Designer をクリックするとトポロジーを確認できます。トポロジーはトピック t/# のメッセージがルール my_rule によって解析され、ディスクログに書き込まれる流れを視覚的に示します。
ルールのテスト
このセクションでは、直接アップロード方式で設定したルールのテスト方法を示します。
MQTTXを使い、トピック t/1 にメッセージをパブリッシュします。
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Disk Log" }'数件メッセージを送信した後、設定したディスクログディレクトリの最終更新ファイルを確認し、生成されたイベント内容をチェックしてください。
詳細設定
このセクションでは、ディスクログSinkの詳細設定オプションについて説明します。ダッシュボードのSink設定画面で詳細設定を展開し、用途に応じて以下のパラメーターを調整できます。
| フィールド名 | 説明 | デフォルト値 |
|---|---|---|
| Buffer Pool Size | EMQXとディスクログ間のデータフローを管理するバッファワーカープロセスの数を指定します。これらのワーカーはデータを一時的に保持・処理し、ディスクログへの送信を最適化するために重要です。 | 16 |
| Request TTL | バッファに入ったリクエストが有効とみなされる最大時間(秒)を指定します。リクエストはバッファに入った時点からTTLがカウントされ、TTLを超えるか、ディスクログからの応答やアック(ACK)がタイムリーに得られない場合は期限切れと見なされます。 | |
| Health Check Interval | Sinkがディスクログの自動ヘルスチェックを行う間隔(秒)を指定します。 | 15 |
| Max Buffer Queue Size | ディスクログSinkの各バッファワーカーがバッファリング可能な最大バイト数を指定します。バッファワーカーはデータを一時的に保持し、効率的にディスクログへ送信するための仲介役を担います。システム性能やデータ転送要件に応じて調整してください。 | 256 |
| Query Mode | 同期(synchronous)または非同期(asynchronous)のリクエストモードを選択し、メッセージ送信を最適化します。非同期モードではディスクログへの書き込みがMQTTメッセージのパブリッシュ処理をブロックしませんが、クライアントがメッセージを受け取るタイミングがディスクログ書き込み前になる可能性があります。 | Asynchronous |
| Batch Size | EMQXからディスクログへ一度に書き込むデータの最大バッチサイズを指定します。サイズを調整することでデータ転送の効率と性能を微調整できます。 「Batch Size」が「1」の場合、データレコードはバッチ化されず個別に送信されます。このアクションは十分なバッチサイズを設定することで効果を発揮します。 | 1000 |
| Inflight Window | 「インフライトキューリクエスト」とは、開始されたがまだ応答やアック(ACK)を受け取っていないリクエストを指します。この設定はSinkとディスクログ間の通信で同時に存在可能なインフライトリクエストの最大数を制御します。 Request Modeが asynchronousの場合、このパラメーターは特に重要です。同一MQTTクライアントからのメッセージを厳密に順序処理する必要がある場合は、この値を1に設定してください。 | 100 |