summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 22:44:34 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 22:44:34 +0200
commit91296d85e8a6f1aca5beaeeecf648683c83c75bc (patch)
treed6bf0b0be51a72d0a597402e84b3664145d8e041 /internal/server
parent1b34e1f2501b8def0a0fb4eae28bf6c19a8adde2 (diff)
Fix mapreduce integration drain race
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/generation_output_test.go45
-rw-r--r--internal/server/handlers/serverhandler.go3
2 files changed, 48 insertions, 0 deletions
diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go
index 6020c09..393afac 100644
--- a/internal/server/handlers/generation_output_test.go
+++ b/internal/server/handlers/generation_output_test.go
@@ -2,8 +2,10 @@ package handlers
import (
"bytes"
+ "context"
"strings"
"testing"
+ "time"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/line"
@@ -48,6 +50,49 @@ func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) {
}
}
+func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) {
+ handler := &ServerHandler{
+ baseHandler: baseHandler{
+ done: internal.NewDone(),
+ maprMessages: make(chan string),
+ },
+ }
+
+ generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7)
+ generated <- "final aggregate"
+
+ closed := make(chan struct{})
+ go func() {
+ closeGenerated()
+ close(closed)
+ }()
+
+ select {
+ case <-closed:
+ t.Fatal("closeGenerated returned before mapreduce payload was forwarded")
+ case <-time.After(20 * time.Millisecond):
+ }
+
+ select {
+ case message := <-handler.maprMessages:
+ generation, payload := decodeGeneratedMessage(message)
+ if generation != 7 {
+ t.Fatalf("unexpected generation: %d", generation)
+ }
+ if payload != "final aggregate" {
+ t.Fatalf("unexpected payload: %q", payload)
+ }
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for forwarded mapreduce payload")
+ }
+
+ select {
+ case <-closed:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for closeGenerated to finish")
+ }
+}
+
func TestBaseHandlerReadDropsStaleLine(t *testing.T) {
handler := newGenerationTestHandler(4)
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index ef64468..cd930f9 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -211,7 +211,9 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont
func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, generation uint64) (chan string, func()) {
maprMessages := make(chan string, 16)
+ done := make(chan struct{})
go func() {
+ defer close(done)
for {
select {
case message, ok := <-maprMessages:
@@ -228,5 +230,6 @@ func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, gen
}()
return maprMessages, func() {
close(maprMessages)
+ <-done
}
}