summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go27
-rw-r--r--internal/server/handlers/readcommand_cancellation_test.go69
-rw-r--r--internal/server/handlers/sessioncommand.go2
-rw-r--r--internal/server/handlers/turbo_writer.go69
-rw-r--r--internal/server/handlers/turbo_writer_test.go44
5 files changed, 183 insertions, 28 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 9c85889..9677718 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -88,7 +89,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
paths, err := filepath.Glob(glob)
if err != nil {
dlog.Server.Warn(r.server.LogContext(), glob, err)
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -101,7 +104,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
default:
}
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -132,6 +137,12 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths))
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
// In turbo mode, signal EOF once all pending file work is drained.
// Active command count may still include side-effect commands (for example AUTHKEY),
// so relying on "active == 1" can skip EOF signaling and lead to dropped output.
@@ -160,7 +171,9 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
if r.server.WaitForTurboEOFAck(timeout) {
dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged")
// Allow transport buffers to flush after acknowledgement.
- time.Sleep(r.server.ShutdownTurboSerializeWait())
+ if !ctxutil.Sleep(ctx, r.server.ShutdownTurboSerializeWait()) {
+ return
+ }
} else {
dlog.Server.Warn(
r.server.LogContext(),
@@ -305,7 +318,9 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
}
}
- time.Sleep(r.server.ReadRetryInterval())
+ if !ctxutil.Sleep(ctx, r.server.ReadRetryInterval()) {
+ return
+ }
dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -362,7 +377,9 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
// Skip this delay in serverless mode since data is written directly to stdout
if !r.server.Serverless() {
dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission")
- time.Sleep(r.server.TurboDataTransmissionDelay())
+ if !ctxutil.Sleep(ctx, r.server.TurboDataTransmissionDelay()) {
+ return startErr
+ }
}
return startErr
diff --git a/internal/server/handlers/readcommand_cancellation_test.go b/internal/server/handlers/readcommand_cancellation_test.go
new file mode 100644
index 0000000..4b77224
--- /dev/null
+++ b/internal/server/handlers/readcommand_cancellation_test.go
@@ -0,0 +1,69 @@
+package handlers
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/fs"
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+type retryOnlyFileReader struct{}
+
+func (retryOnlyFileReader) Start(context.Context, lcontext.LContext, chan<- *line.Line, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) StartWithProcessor(context.Context, lcontext.LContext, line.Processor, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) StartWithProcessorOptimized(context.Context, lcontext.LContext, line.Processor, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) FilePath() string {
+ return ""
+}
+
+func (retryOnlyFileReader) Retry() bool {
+ return true
+}
+
+var _ fs.FileReader = retryOnlyFileReader{}
+
+func TestExecuteReadLoopStopsPromptlyWhenContextCanceledDuringRetrySleep(t *testing.T) {
+ handler := newSessionTestHandler("readcommand-cancel-user")
+ handler.serverCfg.ReadRetryIntervalMs = 1000
+
+ command := newReadCommand(handler, omode.TailClient)
+ reader := retryOnlyFileReader{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
+ strategyCalls := 0
+
+ go func() {
+ command.executeReadLoop(ctx, lcontext.LContext{}, "/var/log/app.log", "app.log", regex.NewNoop(), reader,
+ func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error {
+ strategyCalls++
+ cancel()
+ return nil
+ })
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(150 * time.Millisecond):
+ t.Fatal("executeReadLoop did not stop promptly after cancellation")
+ }
+
+ if strategyCalls != 1 {
+ t.Fatalf("expected one read attempt before cancellation, got %d", strategyCalls)
+ }
+}
diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go
index 25b8d15..76c4109 100644
--- a/internal/server/handlers/sessioncommand.go
+++ b/internal/server/handlers/sessioncommand.go
@@ -253,7 +253,7 @@ func (h *ServerHandler) resetSessionAggregates() {
h.aggregate = nil
}
if h.turboAggregate != nil {
- h.turboAggregate.Shutdown()
+ h.turboAggregate.Abort()
h.turboAggregate = nil
}
}
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index fa12f72..f09a2af 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -399,23 +399,29 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
data := make([]byte, w.writeBuf.Len())
copy(data, w.writeBuf.Bytes())
+ encoded := encodeGeneratedBytes(w.generation, data)
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sending to turboLines channel", "dataLen", len(data))
- // Send data to turbo channel, retry once if full
- select {
- case w.turboLines <- encodeGeneratedBytes(w.generation, data):
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
- w.writeBuf.Reset()
- return nil
- default:
- // Channel full, wait a bit and retry
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
- time.Sleep(time.Millisecond)
- w.turboLines <- encodeGeneratedBytes(w.generation, data)
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
- w.writeBuf.Reset()
- return nil
+ for {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "generation became stale while waiting to send")
+ w.writeBuf.Reset()
+ return nil
+ }
+
+ select {
+ case w.turboLines <- encoded:
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
+ w.writeBuf.Reset()
+ return nil
+ default:
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, time.Millisecond) {
+ w.writeBuf.Reset()
+ return nil
+ }
+ }
}
}
@@ -449,12 +455,9 @@ func (w *TurboNetworkWriter) Flush() error {
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
if w.turboLines != nil {
- data := make([]byte, w.writeBuf.Len())
- copy(data, w.writeBuf.Bytes())
-
- // Force send the data
- w.turboLines <- encodeGeneratedBytes(w.generation, data)
- w.writeBuf.Reset()
+ if err := w.sendToTurboChannel(); err != nil {
+ return err
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
}
}
@@ -464,15 +467,22 @@ func (w *TurboNetworkWriter) Flush() error {
if w.turboLines != nil {
// Wait until channel has been drained somewhat
for i := 0; i < 100 && len(w.turboLines) > 900; i++ {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.turboLines))
- time.Sleep(10 * time.Millisecond)
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) {
+ return nil
+ }
}
dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.turboLines))
}
// Wait a bit to ensure data is processed
// This is crucial for integration tests
- time.Sleep(10 * time.Millisecond)
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) {
+ return nil
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "completed")
return nil
@@ -491,6 +501,21 @@ func shouldWriteGeneration(generation uint64, activeGeneration func() uint64) bo
return currentGeneration == generation
}
+func waitForGenerationRetry(generation uint64, activeGeneration func() uint64, delay time.Duration) bool {
+ if !shouldWriteGeneration(generation, activeGeneration) {
+ return false
+ }
+ if delay <= 0 {
+ return shouldWriteGeneration(generation, activeGeneration)
+ }
+
+ timer := time.NewTimer(delay)
+ defer timer.Stop()
+ <-timer.C
+
+ return shouldWriteGeneration(generation, activeGeneration)
+}
+
// DirectLineProcessor processes lines directly without channels in turbo mode
type DirectLineProcessor struct {
writer TurboWriter
diff --git a/internal/server/handlers/turbo_writer_test.go b/internal/server/handlers/turbo_writer_test.go
index d8dc8a9..23a07d4 100644
--- a/internal/server/handlers/turbo_writer_test.go
+++ b/internal/server/handlers/turbo_writer_test.go
@@ -3,8 +3,11 @@ package handlers
import (
"bytes"
"strings"
+ "sync/atomic"
"testing"
+ "time"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -353,6 +356,47 @@ func TestTurboChannelWriter_Stats(t *testing.T) {
}
}
+func TestTurboNetworkWriterStopsWaitingWhenGenerationBecomesStale(t *testing.T) {
+ originalLogger := dlog.Server
+ dlog.Server = &dlog.DLog{}
+ t.Cleanup(func() {
+ dlog.Server = originalLogger
+ })
+
+ turboLines := make(chan []byte, 1)
+ turboLines <- []byte("occupied")
+
+ var activeGeneration atomic.Uint64
+ activeGeneration.Store(1)
+
+ writer := &TurboNetworkWriter{
+ turboLines: turboLines,
+ generation: 1,
+ activeGeneration: func() uint64 {
+ return activeGeneration.Load()
+ },
+ }
+
+ go func() {
+ time.Sleep(10 * time.Millisecond)
+ activeGeneration.Store(2)
+ }()
+
+ done := make(chan error, 1)
+ go func() {
+ done <- writer.WriteLineData([]byte("stale line"), 1, "app.log")
+ }()
+
+ select {
+ case err := <-done:
+ if err != nil {
+ t.Fatalf("WriteLineData returned unexpected error: %v", err)
+ }
+ case <-time.After(150 * time.Millisecond):
+ t.Fatal("WriteLineData did not stop after the generation became stale")
+ }
+}
+
// TestDirectLineProcessor tests the line processor wrapper
// Note: Skipped because DirectLineProcessor uses dlog.Server which requires initialization
func TestDirectLineProcessor(t *testing.T) {