diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:35:18 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:35:18 +0200 |
| commit | 77b41ce17090008470c261d1f185da43f76ec0c2 (patch) | |
| tree | 776f558ce855b52fc4b25281c08c93d81c69eafa | |
| parent | f7f98ccaffc1be88db6f9814fb3c88b5f0a6ea34 (diff) | |
task: propagate dropped processing and flush errors (task 379)
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 3 |
3 files changed, 8 insertions, 3 deletions
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 94a28dc..c68048d 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -156,7 +156,9 @@ func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd * if len(message.Bytes()) > 0 { // Process the last line if it doesn't end with newline f.updatePosition() - processor.ProcessFilteredLine(message) + if processErr := processor.ProcessFilteredLine(message); processErr != nil { + return abortReading, processErr + } } return abortReading, nil } diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 3e460ed..6447f89 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -368,7 +368,9 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - filterProcessor.ProcessFilteredLine(lineBuf) + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } } return nil default: diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index c687edf..af124e9 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "errors" "fmt" "io" "sync" @@ -170,7 +171,7 @@ func (w *DirectTurboWriter) Flush() error { if w.serverless { // Ensure writer is flushed if it supports it if flusher, ok := w.writer.(interface{ Flush() error }); ok { - flusher.Flush() + err = errors.Join(err, flusher.Flush()) } } |
