diff options
Diffstat (limited to 'internal/io/fs/directprocessor.go')
| -rw-r--r-- | internal/io/fs/directprocessor.go | 100 |
1 files changed, 50 insertions, 50 deletions
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go index 762fa8f..9c564e7 100644 --- a/internal/io/fs/directprocessor.go +++ b/internal/io/fs/directprocessor.go @@ -63,9 +63,9 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, if catProcessor, ok := dp.processor.(*CatProcessor); ok && catProcessor.plain { return dp.processReaderPreservingLineEndings(ctx, reader, filePath) } - + scanner := bufio.NewScanner(reader) - + // Set buffer size respecting MaxLineLength configuration maxLineLength := config.Server.MaxLineLength initialBufSize := 64 * 1024 @@ -73,7 +73,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, initialBufSize = maxLineLength } scanner.Buffer(make([]byte, initialBufSize), maxLineLength) - + lineNum := 0 for scanner.Scan() { select { @@ -81,35 +81,35 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, return ctx.Err() default: } - + lineNum++ line := scanner.Bytes() - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - - // Process line directly + + // Process line directly if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } } - + // Flush any buffered output if final := dp.processor.Flush(); len(final) > 0 { if _, err := dp.output.Write(final); err != nil { return err } } - + return scanner.Err() } @@ -121,20 +121,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex lineNum := 0 maxLineLength := config.Server.MaxLineLength warnedAboutLongLine := false - - + for { select { case <-ctx.Done(): return ctx.Err() default: } - + n, err := reader.Read(buf) if n > 0 { data := append(remaining, buf[:n]...) remaining = remaining[:0] - + // Process complete lines for { // Find next line ending (LF or CRLF) @@ -153,24 +152,24 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex splitLine := make([]byte, maxLineLength+1) copy(splitLine, data[:maxLineLength]) splitLine[maxLineLength] = '\n' - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process the split line if result, shouldSend := dp.processor.ProcessLine(splitLine, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } - + // Continue with remaining data data = data[maxLineLength:] continue @@ -180,19 +179,20 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex break } } - - line := data[:lfIndex+1] // Include the LF - data = data[lfIndex+1:] // Continue with remaining data - + + // Extract the line including its original line ending (CRLF or LF) + line := data[:lfIndex+1] // Include the LF (and CR if present before it) + data = data[lfIndex+1:] // Continue with remaining data + // Reset warning flag for new line warnedAboutLongLine = false - + // Check if this line exceeds max length and needs to be split if len(line) > maxLineLength { // Split the long line into chunks lineContent := line[:len(line)-1] // Remove the LF lineEnding := line[len(line)-1:] // Keep the LF - + for len(lineContent) > 0 { lineNum++ var chunk []byte @@ -207,18 +207,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex copy(chunk[len(lineContent):], lineEnding) lineContent = nil } - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process the chunk if result, shouldSend := dp.processor.ProcessLine(chunk, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -228,18 +228,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } else { // Normal line processing lineNum++ - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process line directly (line includes original line ending) if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -248,12 +248,12 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } } } - + if err == io.EOF { // Process any remaining data as the last line, respecting line length limit for len(remaining) > 0 { lineNum++ - + var lineToProcess []byte if len(remaining) > maxLineLength { // Split the remaining data @@ -266,17 +266,17 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex lineToProcess = remaining remaining = nil } - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + if result, shouldSend := dp.processor.ProcessLine(lineToProcess, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -285,19 +285,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } break } - + if err != nil { return err } } - + // Flush any buffered output if final := dp.processor.Flush(); len(final) > 0 { if _, err := dp.output.Write(final); err != nil { return err } } - + return nil } @@ -323,7 +323,7 @@ func (dp *DirectProcessor) ProcessFileWithTailing(ctx context.Context, filePath func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) error { // Track our current position in the file var lastSize int64 - + for { select { case <-ctx.Done(): @@ -334,7 +334,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro if err != nil { continue } - + currentSize := fileInfo.Size() if currentSize > lastSize { // File has new content, read it @@ -342,19 +342,19 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro if err != nil { continue } - + // Seek to where we left off if _, err := file.Seek(lastSize, 0); err != nil { file.Close() continue } - + // Process new content if err := dp.processNewContent(ctx, file, filePath); err != nil { file.Close() continue } - + lastSize = currentSize file.Close() } @@ -365,35 +365,35 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro // processNewContent processes new content that was added to the file func (dp *DirectProcessor) processNewContent(ctx context.Context, file *os.File, filePath string) error { scanner := bufio.NewScanner(file) - + // Start line counting from where we left off (simplified approach) lineNum := 1 - + for scanner.Scan() { select { case <-ctx.Done(): return ctx.Err() default: } - + lineBuf := scanner.Bytes() if result, shouldSend := dp.processor.ProcessLine(lineBuf, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } lineNum++ - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } } - + return scanner.Err() -}
\ No newline at end of file +} |
