diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 08:57:01 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 08:57:01 +0200 |
| commit | 9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (patch) | |
| tree | 7e516251fc751c1f4c643eedec5e0333ffd23f2e /internal/server/handlers/sessioncommand.go | |
| parent | 4b15cf31f069bb47a47f3fced9322091997edf15 (diff) | |
task 58076a44: enable query session workloads
Diffstat (limited to 'internal/server/handlers/sessioncommand.go')
| -rw-r--r-- | internal/server/handlers/sessioncommand.go | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go index 351f27e..0d54963 100644 --- a/internal/server/handlers/sessioncommand.go +++ b/internal/server/handlers/sessioncommand.go @@ -11,6 +11,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/session" ) @@ -105,6 +106,11 @@ func validateSessionSpec(spec session.Spec) error { if spec.Query == "" && spec.Mode == omode.MapClient { return fmt.Errorf("missing session query") } + if spec.Query != "" { + if _, err := mapr.NewQuery(spec.Query); err != nil { + return fmt.Errorf("invalid session spec") + } + } if err := validateSessionOptions(spec.Options); err != nil { return err @@ -135,6 +141,7 @@ func (s *sessionCommandState) start(handler *ServerHandler, spec session.Spec) ( s.cancel = cancel s.mu.Unlock() + handler.resetSessionAggregates() if err := handler.dispatchSessionCommands(ctx, commands); err != nil { cancel() s.reset() @@ -170,6 +177,7 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec, oldCancel() } + handler.resetSessionAggregates() if err := handler.dispatchSessionCommands(ctx, commands); err != nil { cancel() s.reset() @@ -180,10 +188,6 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec, } func prepareSessionCommands(spec session.Spec) ([]string, error) { - if spec.Query != "" { - return nil, fmt.Errorf("query sessions not supported yet") - } - commands, err := spec.Commands() if err != nil { return nil, fmt.Errorf("invalid session spec") @@ -234,3 +238,14 @@ func (h *ServerHandler) dispatchSessionCommands(ctx context.Context, commands [] } return nil } + +func (h *ServerHandler) resetSessionAggregates() { + if h.aggregate != nil { + h.aggregate.Shutdown() + h.aggregate = nil + } + if h.turboAggregate != nil { + h.turboAggregate.Shutdown() + h.turboAggregate = nil + } +} |
