From 77b41ce17090008470c261d1f185da43f76ec0c2 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 8 Mar 2026 09:35:18 +0200 Subject: task: propagate dropped processing and flush errors (task 379) --- internal/io/fs/readfile_processor.go | 4 +++- internal/io/fs/readfile_processor_optimized.go | 4 +++- internal/server/handlers/turbo_writer.go | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 94a28dc..c68048d 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -156,7 +156,9 @@ func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd * if len(message.Bytes()) > 0 { // Process the last line if it doesn't end with newline f.updatePosition() - processor.ProcessFilteredLine(message) + if processErr := processor.ProcessFilteredLine(message); processErr != nil { + return abortReading, processErr + } } return abortReading, nil } diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 3e460ed..6447f89 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -368,7 +368,9 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - filterProcessor.ProcessFilteredLine(lineBuf) + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } } return nil default: diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index c687edf..af124e9 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "errors" "fmt" "io" "sync" @@ -170,7 +171,7 @@ func (w *DirectTurboWriter) Flush() error { if w.serverless { // Ensure writer is flushed if it supports it if flusher, ok := w.writer.(interface{ Flush() error }); ok { - flusher.Flush() + err = errors.Join(err, flusher.Flush()) } } -- cgit v1.2.3