summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
commitf6e23930da2900c43a5389a2e7d1e38d8221a76f (patch)
tree3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/io
parent1fc24f9affed5128702e4de80572cac8c82d399e (diff)
Refactor server-side config singleton reads
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/catfile.go5
-rw-r--r--internal/io/fs/readfile.go19
-rw-r--r--internal/io/fs/readfile_processor.go3
-rw-r--r--internal/io/fs/readfile_processor_optimized.go7
-rw-r--r--internal/io/fs/readfile_processor_test.go39
-rw-r--r--internal/io/fs/tailfile.go5
6 files changed, 47 insertions, 31 deletions
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index e4676f3..1f35a95 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,7 +6,9 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile {
+func NewCatFile(filePath string, globID string, serverMessages chan<- string,
+ maxLineLength int) CatFile {
+
return CatFile{
readFile: readFile{
filePath: filePath,
@@ -15,6 +17,7 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string) Ca
retry: false,
canSkipLines: false,
seekEOF: false,
+ maxLineLength: maxLineLength,
},
}
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index ee486bc..47a999d 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -13,7 +13,6 @@ import (
"sync"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -26,9 +25,10 @@ import (
type readStatus int
const (
- nothing readStatus = iota
- abortReading readStatus = iota
- continueReading readStatus = iota
+ nothing readStatus = iota
+ abortReading readStatus = iota
+ continueReading readStatus = iota
+ defaultMaxLineLength = 1024 * 1024
)
// Used to tail and filter a local log file.
@@ -49,6 +49,8 @@ type readFile struct {
seekEOF bool
// Warned already about a long line.
warnedAboutLongLine bool
+ // Maximum line length before a line is split.
+ maxLineLength int
}
// String returns the string representation of the readFile
@@ -72,6 +74,13 @@ func (f readFile) Retry() bool {
return f.retry
}
+func (f *readFile) lineLimit() int {
+ if f.maxLineLength <= 0 {
+ return defaultMaxLineLength
+ }
+ return f.maxLineLength
+}
+
// Start tailing a log file.
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
lines chan<- *line.Line, re regex.Regex) error {
@@ -328,7 +337,7 @@ func (f *readFile) handleReadByte(ctx context.Context, b byte,
return abortReading, message
}
default:
- if message.Len() >= config.Server.MaxLineLength {
+ if message.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index c68048d..8f56bdd 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -8,7 +8,6 @@ import (
"os"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -112,7 +111,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
return continueReading
default:
- if message.Len() >= config.Server.MaxLineLength {
+ if message.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 6447f89..2e880e7 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -8,7 +8,6 @@ import (
"os"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -106,7 +105,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
return 0, nil, nil
}
- maxLineLen := config.Server.MaxLineLength
+ maxLineLen := f.lineLimit()
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
@@ -154,7 +153,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
return 0, nil, nil
}
- maxLineLen := config.Server.MaxLineLength
+ maxLineLen := f.lineLimit()
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
@@ -312,7 +311,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
partialLine.Write(data)
// Check if line is too long
- if partialLine.Len() >= config.Server.MaxLineLength {
+ if partialLine.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor_test.go b/internal/io/fs/readfile_processor_test.go
index 2e3cb80..ae34d77 100644
--- a/internal/io/fs/readfile_processor_test.go
+++ b/internal/io/fs/readfile_processor_test.go
@@ -9,7 +9,6 @@ import (
"reflect"
"testing"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
@@ -41,12 +40,10 @@ func (p *captureProcessor) Close() error {
}
func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
filePath := writeProcessorTestFile(t, "alpha\nbeta\n")
re := regex.NewNoop()
- cat := NewCatFile(filePath, "glob-id", make(chan string, 1))
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength)
processor := &captureProcessor{}
if err := cat.readFile.StartWithProcessorOptimized(
@@ -65,8 +62,6 @@ func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) {
}
func TestProcessorVariantsReturnOpenError(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
re := regex.NewNoop()
missingFile := filepath.Join(t.TempDir(), "missing.log")
@@ -90,7 +85,7 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- cat := NewCatFile(missingFile, "glob-id", make(chan string, 1))
+ cat := NewCatFile(missingFile, "glob-id", make(chan string, 1), defaultMaxLineLength)
err := tt.start(&cat.readFile, context.Background(), lcontext.LContext{}, &captureProcessor{}, re)
if err == nil {
t.Fatalf("expected error for missing file")
@@ -100,13 +95,11 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) {
}
func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
filePath := writeProcessorTestFile(t, "alpha\nbeta\n")
re := regex.NewNoop()
expectedErr := errors.New("processor failure")
- cat := NewCatFile(filePath, "glob-id", make(chan string, 1))
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength)
processor := &captureProcessor{
errAtLine: 1,
processErr: expectedErr,
@@ -123,16 +116,26 @@ func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) {
}
}
-func setServerConfigForProcessorTests(t *testing.T) {
- t.Helper()
+func TestStartWithProcessorOptimizedUsesInjectedMaxLineLength(t *testing.T) {
+ filePath := writeProcessorTestFile(t, "abcdef\n")
+ re := regex.NewNoop()
+
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), 3)
+ processor := &captureProcessor{}
+
+ if err := cat.readFile.StartWithProcessorOptimized(
+ context.Background(),
+ lcontext.LContext{},
+ processor,
+ re,
+ ); err != nil {
+ t.Fatalf("optimized reader start failed: %v", err)
+ }
- previousServer := config.Server
- config.Server = &config.ServerConfig{
- MaxLineLength: 1024 * 1024,
+ want := []string{"abc", "def\n"}
+ if !reflect.DeepEqual(processor.lines, want) {
+ t.Fatalf("unexpected processed lines: got=%v want=%v", processor.lines, want)
}
- t.Cleanup(func() {
- config.Server = previousServer
- })
}
func writeProcessorTestFile(t *testing.T, content string) string {
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index 7a40ac4..b2e9910 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,7 +6,9 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile {
+func NewTailFile(filePath string, globID string, serverMessages chan<- string,
+ maxLineLength int) TailFile {
+
return TailFile{
readFile: readFile{
filePath: filePath,
@@ -15,6 +17,7 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string) T
retry: true,
canSkipLines: true,
seekEOF: true,
+ maxLineLength: maxLineLength,
},
}
}