diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
| commit | 6c3bc11f736040a09fd839832a6be01e434e8aab (patch) | |
| tree | 6b856c2f79d2f75ccd8ba89c638ee18839b4d061 /internal/mapr/server | |
| parent | a5a405d79fe3d9e0c6ea081b425d36bd67d8671d (diff) | |
Stop stale query work promptly on generation cancel
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 45 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate_test.go | 19 |
2 files changed, 61 insertions, 3 deletions
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index 9b5afe7..188be1c 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -56,6 +56,21 @@ type rawLine struct { sourceID string } +func (a *TurboAggregate) stopping() bool { + select { + case <-a.done.Done(): + return true + default: + return false + } +} + +func (a *TurboAggregate) stopSerializeTicker() { + if a.serializeTicker != nil { + a.serializeTicker.Stop() + } +} + // NewTurboAggregate returns a new turbo mode aggregator. func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) { query, err := mapr.NewQuery(queryStr) @@ -130,9 +145,7 @@ func (a *TurboAggregate) Shutdown() { a.done.Shutdown() // Stop the ticker - if a.serializeTicker != nil { - a.serializeTicker.Stop() - } + a.stopSerializeTicker() // Wait for active processors to finish for a.activeProcessors.Load() > 0 { @@ -185,6 +198,19 @@ func (a *TurboAggregate) Shutdown() { } } +// Abort stops background processing without waiting for final serialization. +// Session generation replacement uses this to preempt old query work immediately. +func (a *TurboAggregate) Abort() { + dlog.Server.Info("TurboAggregate: Abort called", + "linesProcessed", a.linesProcessed.Load(), + "filesProcessed", a.filesProcessed.Load(), + "activeProcessors", a.activeProcessors.Load(), + "currentGroups", a.countGroups()) + + a.done.Shutdown() + a.stopSerializeTicker() +} + // Start the turbo aggregation. func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { a.maprMessages = maprMessages @@ -206,6 +232,10 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) // ProcessLineDirect processes a line directly without channels. // This is called from the TurboAggregateProcessor. func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { + if a.stopping() { + return nil + } + // Increment counter first a.linesProcessed.Add(1) @@ -642,6 +672,11 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo // ProcessLine processes a line directly to the turbo aggregate. func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { + if p.aggregate.stopping() { + pool.RecycleBytesBuffer(lineContent) + return nil + } + // Debug: Log when ProcessLine is called if lineNum == 1 || lineNum%1000 == 0 { dlog.Server.Info("TurboAggregateProcessor: ProcessLine called", @@ -661,6 +696,10 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum // Flush ensures all buffered data is processed. func (p *TurboAggregateProcessor) Flush() error { + if p.aggregate.stopping() { + return nil + } + // Log flush call for debugging dlog.Server.Info("TurboAggregateProcessor: Flush called", "globID", p.globID, diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go index f556f50..7ae4b5a 100644 --- a/internal/mapr/server/turbo_aggregate_test.go +++ b/internal/mapr/server/turbo_aggregate_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" @@ -311,3 +312,21 @@ func TestTurboAggregateConcurrency(t *testing.T) { t.Error("Did not find expected count of 1000 in results") } } + +func TestTurboAggregateAbortReturnsPromptlyWithActiveProcessors(t *testing.T) { + aggregate := &TurboAggregate{} + aggregate.done = internal.NewDone() + aggregate.activeProcessors.Store(1) + + done := make(chan struct{}) + go func() { + aggregate.Abort() + close(done) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatal("Abort did not return promptly while processors were still active") + } +} |
