diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
| commit | f6e23930da2900c43a5389a2e7d1e38d8221a76f (patch) | |
| tree | 3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/io | |
| parent | 1fc24f9affed5128702e4de80572cac8c82d399e (diff) | |
Refactor server-side config singleton reads
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/catfile.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 19 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 7 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_test.go | 39 | ||||
| -rw-r--r-- | internal/io/fs/tailfile.go | 5 |
6 files changed, 47 insertions, 31 deletions
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index e4676f3..1f35a95 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + maxLineLength int) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, @@ -15,6 +17,7 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string) Ca retry: false, canSkipLines: false, seekEOF: false, + maxLineLength: maxLineLength, }, } } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index ee486bc..47a999d 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -26,9 +25,10 @@ import ( type readStatus int const ( - nothing readStatus = iota - abortReading readStatus = iota - continueReading readStatus = iota + nothing readStatus = iota + abortReading readStatus = iota + continueReading readStatus = iota + defaultMaxLineLength = 1024 * 1024 ) // Used to tail and filter a local log file. @@ -49,6 +49,8 @@ type readFile struct { seekEOF bool // Warned already about a long line. warnedAboutLongLine bool + // Maximum line length before a line is split. + maxLineLength int } // String returns the string representation of the readFile @@ -72,6 +74,13 @@ func (f readFile) Retry() bool { return f.retry } +func (f *readFile) lineLimit() int { + if f.maxLineLength <= 0 { + return defaultMaxLineLength + } + return f.maxLineLength +} + // Start tailing a log file. func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error { @@ -328,7 +337,7 @@ func (f *readFile) handleReadByte(ctx context.Context, b byte, return abortReading, message } default: - if message.Len() >= config.Server.MaxLineLength { + if message.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index c68048d..8f56bdd 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -8,7 +8,6 @@ import ( "os" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -112,7 +111,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, return continueReading default: - if message.Len() >= config.Server.MaxLineLength { + if message.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 6447f89..2e880e7 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -8,7 +8,6 @@ import ( "os" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -106,7 +105,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in return 0, nil, nil } - maxLineLen := config.Server.MaxLineLength + maxLineLen := f.lineLimit() // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { @@ -154,7 +153,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, return 0, nil, nil } - maxLineLen := config.Server.MaxLineLength + maxLineLen := f.lineLimit() // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { @@ -312,7 +311,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, partialLine.Write(data) // Check if line is too long - if partialLine.Len() >= config.Server.MaxLineLength { + if partialLine.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor_test.go b/internal/io/fs/readfile_processor_test.go index 2e3cb80..ae34d77 100644 --- a/internal/io/fs/readfile_processor_test.go +++ b/internal/io/fs/readfile_processor_test.go @@ -9,7 +9,6 @@ import ( "reflect" "testing" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" @@ -41,12 +40,10 @@ func (p *captureProcessor) Close() error { } func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) { - setServerConfigForProcessorTests(t) - filePath := writeProcessorTestFile(t, "alpha\nbeta\n") re := regex.NewNoop() - cat := NewCatFile(filePath, "glob-id", make(chan string, 1)) + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength) processor := &captureProcessor{} if err := cat.readFile.StartWithProcessorOptimized( @@ -65,8 +62,6 @@ func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) { } func TestProcessorVariantsReturnOpenError(t *testing.T) { - setServerConfigForProcessorTests(t) - re := regex.NewNoop() missingFile := filepath.Join(t.TempDir(), "missing.log") @@ -90,7 +85,7 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cat := NewCatFile(missingFile, "glob-id", make(chan string, 1)) + cat := NewCatFile(missingFile, "glob-id", make(chan string, 1), defaultMaxLineLength) err := tt.start(&cat.readFile, context.Background(), lcontext.LContext{}, &captureProcessor{}, re) if err == nil { t.Fatalf("expected error for missing file") @@ -100,13 +95,11 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) { } func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) { - setServerConfigForProcessorTests(t) - filePath := writeProcessorTestFile(t, "alpha\nbeta\n") re := regex.NewNoop() expectedErr := errors.New("processor failure") - cat := NewCatFile(filePath, "glob-id", make(chan string, 1)) + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength) processor := &captureProcessor{ errAtLine: 1, processErr: expectedErr, @@ -123,16 +116,26 @@ func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) { } } -func setServerConfigForProcessorTests(t *testing.T) { - t.Helper() +func TestStartWithProcessorOptimizedUsesInjectedMaxLineLength(t *testing.T) { + filePath := writeProcessorTestFile(t, "abcdef\n") + re := regex.NewNoop() + + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), 3) + processor := &captureProcessor{} + + if err := cat.readFile.StartWithProcessorOptimized( + context.Background(), + lcontext.LContext{}, + processor, + re, + ); err != nil { + t.Fatalf("optimized reader start failed: %v", err) + } - previousServer := config.Server - config.Server = &config.ServerConfig{ - MaxLineLength: 1024 * 1024, + want := []string{"abc", "def\n"} + if !reflect.DeepEqual(processor.lines, want) { + t.Fatalf("unexpected processed lines: got=%v want=%v", processor.lines, want) } - t.Cleanup(func() { - config.Server = previousServer - }) } func writeProcessorTestFile(t *testing.T, content string) string { diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 7a40ac4..b2e9910 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + maxLineLength int) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, @@ -15,6 +17,7 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string) T retry: true, canSkipLines: true, seekEOF: true, + maxLineLength: maxLineLength, }, } } |
