summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 09:35:18 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 09:35:18 +0200
commit77b41ce17090008470c261d1f185da43f76ec0c2 (patch)
tree776f558ce855b52fc4b25281c08c93d81c69eafa
parentf7f98ccaffc1be88db6f9814fb3c88b5f0a6ea34 (diff)
task: propagate dropped processing and flush errors (task 379)
-rw-r--r--internal/io/fs/readfile_processor.go4
-rw-r--r--internal/io/fs/readfile_processor_optimized.go4
-rw-r--r--internal/server/handlers/turbo_writer.go3
3 files changed, 8 insertions, 3 deletions
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index 94a28dc..c68048d 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -156,7 +156,9 @@ func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd *
if len(message.Bytes()) > 0 {
// Process the last line if it doesn't end with newline
f.updatePosition()
- processor.ProcessFilteredLine(message)
+ if processErr := processor.ProcessFilteredLine(message); processErr != nil {
+ return abortReading, processErr
+ }
}
return abortReading, nil
}
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 3e460ed..6447f89 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -368,7 +368,9 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
f.updatePosition()
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(partialLine.Bytes())
- filterProcessor.ProcessFilteredLine(lineBuf)
+ if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
+ return err
+ }
}
return nil
default:
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index c687edf..af124e9 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -2,6 +2,7 @@ package handlers
import (
"bytes"
+ "errors"
"fmt"
"io"
"sync"
@@ -170,7 +171,7 @@ func (w *DirectTurboWriter) Flush() error {
if w.serverless {
// Ensure writer is flushed if it supports it
if flusher, ok := w.writer.(interface{ Flush() error }); ok {
- flusher.Flush()
+ err = errors.Join(err, flusher.Flush())
}
}