diff options
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 25 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 25 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 15 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer_test.go | 55 |
4 files changed, 99 insertions, 21 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 66c2cb7..06943b3 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -339,25 +339,28 @@ func (h *baseHandler) flush() { return lineCount + serverCount + maprCount + turboCount } - // Increase iterations for turbo mode to handle large file batches - maxIterations := 100 - if h.turbo.enabled() { - maxIterations = 300 // Give more time for turbo mode to drain + maxWait := time.Second + if h.turbo.enabled() || h.turboAggregate != nil || h.aggregate != nil { + maxWait = 3 * time.Second } - // Also increase iterations if we have MapReduce messages - if h.turboAggregate != nil || h.aggregate != nil { - maxIterations = 300 // Give more time for MapReduce results + if h.serverless && maxWait < 5*time.Second { + maxWait = 5 * time.Second } - for i := 0; i < maxIterations; i++ { - if numUnsentMessages() == 0 { + deadline := time.Now().Add(maxWait) + for i := 0; ; i++ { + unsent := numUnsentMessages() + if unsent == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return } - dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", numUnsentMessages()) + if time.Now().After(deadline) { + dlog.Server.Warn(h.user, "Some lines remain unsent", unsent) + return + } + dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", unsent, "deadline", deadline.Sub(time.Now())) time.Sleep(time.Millisecond * 10) } - dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) } func (h *baseHandler) shutdown() { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 9677718..d4c9c30 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -328,23 +328,24 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext func (r *readCommand) readViaChannels() readStrategy { return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error { var linesCh chan *line.Line - closeLines := false + var closeLines func() if r.server.HasRegularAggregate() { // For MapReduce operations, create a new channel that goes only to the aggregate. linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) r.server.RegisterAggregateLines(linesCh) - closeLines = true + closeLines = func() { + close(linesCh) + } } else { // For non-MapReduce operations, forward lines through a generation-aware channel. - linesCh = r.newGeneratedLinesChannel(ctx) - closeLines = true + linesCh, closeLines = r.newGeneratedLinesChannel(ctx) } err := reader.Start(ctx, ltx, linesCh, re) - if closeLines { + if closeLines != nil { // Closing the aggregate line channel triggers flush. - close(linesCh) + closeLines() } return err @@ -463,7 +464,9 @@ func (r *readCommand) sendServerMessage(message string) { func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) { serverMessages := make(chan string, 16) + done := make(chan struct{}) go func() { + defer close(done) for { select { case message, ok := <-serverMessages: @@ -482,12 +485,15 @@ func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (ch }() return serverMessages, func() { close(serverMessages) + <-done } } -func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line { +func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) (chan *line.Line, func()) { linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) + done := make(chan struct{}) go func() { + defer close(done) for { select { case generatedLine, ok := <-linesCh: @@ -512,7 +518,10 @@ func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.L } } }() - return linesCh + return linesCh, func() { + close(linesCh) + <-done + } } func (r *readCommand) isInputFromPipe() bool { diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index f09a2af..3cd347b 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -205,10 +205,21 @@ func (w *DirectTurboWriter) flushBuffer() error { // In serverless mode with colors, data is already processed line by line // so we don't need to do any additional formatting here - _, err := w.writer.Write(data) + for len(data) > 0 { + n, err := w.writer.Write(data) + if err != nil { + w.writeBuf.Reset() + return err + } + if n <= 0 { + w.writeBuf.Reset() + return io.ErrShortWrite + } + data = data[n:] + } w.writeBuf.Reset() - return err + return nil } // Stats returns writing statistics diff --git a/internal/server/handlers/turbo_writer_test.go b/internal/server/handlers/turbo_writer_test.go index 23a07d4..13460a5 100644 --- a/internal/server/handlers/turbo_writer_test.go +++ b/internal/server/handlers/turbo_writer_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "io" "strings" "sync/atomic" "testing" @@ -221,6 +222,60 @@ func TestDirectTurboWriter_MultipleLines(t *testing.T) { } } +type shortWriter struct { + maxChunk int + buf bytes.Buffer +} + +func (w *shortWriter) Write(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + n := len(p) + if w.maxChunk > 0 && n > w.maxChunk { + n = w.maxChunk + } + w.buf.Write(p[:n]) + return n, nil +} + +func TestDirectTurboWriter_FlushHandlesShortWrites(t *testing.T) { + writer := &shortWriter{maxChunk: 5} + w := NewDirectTurboWriter(writer, "testhost", true, true) + + if err := w.WriteLineData([]byte("abcdefghij"), 1, "source.log"); err != nil { + t.Fatalf("WriteLineData failed: %v", err) + } + + if err := w.Flush(); err != nil { + t.Fatalf("Flush failed: %v", err) + } + + if got, want := writer.buf.String(), "abcdefghij\n"; got != want { + t.Fatalf("expected full output %q, got %q", want, got) + } +} + +type zeroWriter struct{} + +func (zeroWriter) Write(p []byte) (int, error) { + return 0, nil +} + +func TestDirectTurboWriter_FlushFailsOnZeroProgress(t *testing.T) { + w := NewDirectTurboWriter(zeroWriter{}, "testhost", true, true) + + if err := w.WriteLineData([]byte("data"), 1, "source.log"); err != nil { + t.Fatalf("WriteLineData failed: %v", err) + } + + if err := w.Flush(); err == nil { + t.Fatal("expected Flush to fail on zero-progress writes") + } else if err != io.ErrShortWrite { + t.Fatalf("expected io.ErrShortWrite, got %v", err) + } +} + // TestTurboChannelWriter_WriteLineData tests channel writer line data func TestTurboChannelWriter_WriteLineData(t *testing.T) { ch := make(chan []byte, 10) |
