summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/handlers/maprhandler.go10
-rw-r--r--internal/clients/handlers/maprhandler_test.go55
2 files changed, 65 insertions, 0 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index d4e171c..4391a34 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -75,3 +75,13 @@ func (h *MaprHandler) handleAggregateMessage(message string) {
dlog.Client.Error("Unable to aggregate data", h.server, message, err)
}
}
+
+// Shutdown flushes any pending aggregate state before marking the handler done.
+func (h *MaprHandler) Shutdown() {
+ if h.aggregate != nil {
+ if err := h.aggregate.Flush(); err != nil {
+ dlog.Client.Error("Unable to flush aggregate data on shutdown", h.server, err)
+ }
+ }
+ h.baseHandler.Shutdown()
+}
diff --git a/internal/clients/handlers/maprhandler_test.go b/internal/clients/handlers/maprhandler_test.go
new file mode 100644
index 0000000..7b7b211
--- /dev/null
+++ b/internal/clients/handlers/maprhandler_test.go
@@ -0,0 +1,55 @@
+package handlers
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/mimecast/dtail/internal/mapr"
+ maprclient "github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+func TestMaprHandlerShutdownFlushesPendingAggregateState(t *testing.T) {
+ query, err := mapr.NewQuery("select status,count(status) from stats group by status")
+ if err != nil {
+ t.Fatalf("NewQuery() error = %v", err)
+ }
+
+ session := maprclient.NewSessionState(query)
+ handler := NewMaprHandler("srv1", session)
+ countStorage := handlerCountStorage(t, query)
+
+ message := strings.Join([]string{
+ "ERROR",
+ "2",
+ countStorage + protocol.AggregateKVDelimiter + "2",
+ "",
+ }, protocol.AggregateDelimiter)
+ if err := handler.aggregate.Aggregate(message); err != nil {
+ t.Fatalf("Aggregate() error = %v", err)
+ }
+
+ handler.Shutdown()
+
+ result, numRows, err := session.Snapshot().GlobalGroup.Result(query, 10, nil)
+ if err != nil {
+ t.Fatalf("Result() error = %v", err)
+ }
+ if numRows != 1 {
+ t.Fatalf("numRows = %d, want 1", numRows)
+ }
+ if !strings.Contains(result, "2") {
+ t.Fatalf("expected flushed aggregate row, got %q", result)
+ }
+}
+
+func handlerCountStorage(t *testing.T, query *mapr.Query) string {
+ t.Helper()
+ for _, selectCondition := range query.Select {
+ if selectCondition.Operation == mapr.Count {
+ return selectCondition.FieldStorage
+ }
+ }
+ t.Fatalf("query %q does not contain count() storage", query.RawQuery)
+ return ""
+}