diff options
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 3 |
3 files changed, 8 insertions, 3 deletions
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 94a28dc..c68048d 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -156,7 +156,9 @@ func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd * if len(message.Bytes()) > 0 { // Process the last line if it doesn't end with newline f.updatePosition() - processor.ProcessFilteredLine(message) + if processErr := processor.ProcessFilteredLine(message); processErr != nil { + return abortReading, processErr + } } return abortReading, nil } diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 3e460ed..6447f89 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -368,7 +368,9 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - filterProcessor.ProcessFilteredLine(lineBuf) + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } } return nil default: diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index c687edf..af124e9 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "errors" "fmt" "io" "sync" @@ -170,7 +171,7 @@ func (w *DirectTurboWriter) Flush() error { if w.serverless { // Ensure writer is flushed if it supports it if flusher, ok := w.writer.(interface{ Flush() error }); ok { - flusher.Flush() + err = errors.Join(err, flusher.Flush()) } } |
