summaryrefslogtreecommitdiff
path: root/internal/io/fs/directprocessor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io/fs/directprocessor.go')
-rw-r--r--internal/io/fs/directprocessor.go100
1 files changed, 50 insertions, 50 deletions
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index 762fa8f..9c564e7 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -63,9 +63,9 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
if catProcessor, ok := dp.processor.(*CatProcessor); ok && catProcessor.plain {
return dp.processReaderPreservingLineEndings(ctx, reader, filePath)
}
-
+
scanner := bufio.NewScanner(reader)
-
+
// Set buffer size respecting MaxLineLength configuration
maxLineLength := config.Server.MaxLineLength
initialBufSize := 64 * 1024
@@ -73,7 +73,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
initialBufSize = maxLineLength
}
scanner.Buffer(make([]byte, initialBufSize), maxLineLength)
-
+
lineNum := 0
for scanner.Scan() {
select {
@@ -81,35 +81,35 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
return ctx.Err()
default:
}
-
+
lineNum++
line := scanner.Bytes()
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
- // Process line directly
+
+ // Process line directly
if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
}
-
+
// Flush any buffered output
if final := dp.processor.Flush(); len(final) > 0 {
if _, err := dp.output.Write(final); err != nil {
return err
}
}
-
+
return scanner.Err()
}
@@ -121,20 +121,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
lineNum := 0
maxLineLength := config.Server.MaxLineLength
warnedAboutLongLine := false
-
-
+
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
-
+
n, err := reader.Read(buf)
if n > 0 {
data := append(remaining, buf[:n]...)
remaining = remaining[:0]
-
+
// Process complete lines
for {
// Find next line ending (LF or CRLF)
@@ -153,24 +152,24 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
splitLine := make([]byte, maxLineLength+1)
copy(splitLine, data[:maxLineLength])
splitLine[maxLineLength] = '\n'
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process the split line
if result, shouldSend := dp.processor.ProcessLine(splitLine, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
-
+
// Continue with remaining data
data = data[maxLineLength:]
continue
@@ -180,19 +179,20 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
break
}
}
-
- line := data[:lfIndex+1] // Include the LF
- data = data[lfIndex+1:] // Continue with remaining data
-
+
+ // Extract the line including its original line ending (CRLF or LF)
+ line := data[:lfIndex+1] // Include the LF (and CR if present before it)
+ data = data[lfIndex+1:] // Continue with remaining data
+
// Reset warning flag for new line
warnedAboutLongLine = false
-
+
// Check if this line exceeds max length and needs to be split
if len(line) > maxLineLength {
// Split the long line into chunks
lineContent := line[:len(line)-1] // Remove the LF
lineEnding := line[len(line)-1:] // Keep the LF
-
+
for len(lineContent) > 0 {
lineNum++
var chunk []byte
@@ -207,18 +207,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
copy(chunk[len(lineContent):], lineEnding)
lineContent = nil
}
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process the chunk
if result, shouldSend := dp.processor.ProcessLine(chunk, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -228,18 +228,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
} else {
// Normal line processing
lineNum++
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process line directly (line includes original line ending)
if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -248,12 +248,12 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
}
}
}
-
+
if err == io.EOF {
// Process any remaining data as the last line, respecting line length limit
for len(remaining) > 0 {
lineNum++
-
+
var lineToProcess []byte
if len(remaining) > maxLineLength {
// Split the remaining data
@@ -266,17 +266,17 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
lineToProcess = remaining
remaining = nil
}
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
if result, shouldSend := dp.processor.ProcessLine(lineToProcess, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -285,19 +285,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
}
break
}
-
+
if err != nil {
return err
}
}
-
+
// Flush any buffered output
if final := dp.processor.Flush(); len(final) > 0 {
if _, err := dp.output.Write(final); err != nil {
return err
}
}
-
+
return nil
}
@@ -323,7 +323,7 @@ func (dp *DirectProcessor) ProcessFileWithTailing(ctx context.Context, filePath
func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) error {
// Track our current position in the file
var lastSize int64
-
+
for {
select {
case <-ctx.Done():
@@ -334,7 +334,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
if err != nil {
continue
}
-
+
currentSize := fileInfo.Size()
if currentSize > lastSize {
// File has new content, read it
@@ -342,19 +342,19 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
if err != nil {
continue
}
-
+
// Seek to where we left off
if _, err := file.Seek(lastSize, 0); err != nil {
file.Close()
continue
}
-
+
// Process new content
if err := dp.processNewContent(ctx, file, filePath); err != nil {
file.Close()
continue
}
-
+
lastSize = currentSize
file.Close()
}
@@ -365,35 +365,35 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
// processNewContent processes new content that was added to the file
func (dp *DirectProcessor) processNewContent(ctx context.Context, file *os.File, filePath string) error {
scanner := bufio.NewScanner(file)
-
+
// Start line counting from where we left off (simplified approach)
lineNum := 1
-
+
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
-
+
lineBuf := scanner.Bytes()
if result, shouldSend := dp.processor.ProcessLine(lineBuf, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
lineNum++
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
}
-
+
return scanner.Err()
-} \ No newline at end of file
+}