summaryrefslogtreecommitdiff
path: root/internal/server/handlers/sessioncommand.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/sessioncommand.go')
-rw-r--r--internal/server/handlers/sessioncommand.go23
1 files changed, 19 insertions, 4 deletions
diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go
index 351f27e..0d54963 100644
--- a/internal/server/handlers/sessioncommand.go
+++ b/internal/server/handlers/sessioncommand.go
@@ -11,6 +11,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/session"
)
@@ -105,6 +106,11 @@ func validateSessionSpec(spec session.Spec) error {
if spec.Query == "" && spec.Mode == omode.MapClient {
return fmt.Errorf("missing session query")
}
+ if spec.Query != "" {
+ if _, err := mapr.NewQuery(spec.Query); err != nil {
+ return fmt.Errorf("invalid session spec")
+ }
+ }
if err := validateSessionOptions(spec.Options); err != nil {
return err
@@ -135,6 +141,7 @@ func (s *sessionCommandState) start(handler *ServerHandler, spec session.Spec) (
s.cancel = cancel
s.mu.Unlock()
+ handler.resetSessionAggregates()
if err := handler.dispatchSessionCommands(ctx, commands); err != nil {
cancel()
s.reset()
@@ -170,6 +177,7 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec,
oldCancel()
}
+ handler.resetSessionAggregates()
if err := handler.dispatchSessionCommands(ctx, commands); err != nil {
cancel()
s.reset()
@@ -180,10 +188,6 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec,
}
func prepareSessionCommands(spec session.Spec) ([]string, error) {
- if spec.Query != "" {
- return nil, fmt.Errorf("query sessions not supported yet")
- }
-
commands, err := spec.Commands()
if err != nil {
return nil, fmt.Errorf("invalid session spec")
@@ -234,3 +238,14 @@ func (h *ServerHandler) dispatchSessionCommands(ctx context.Context, commands []
}
return nil
}
+
+func (h *ServerHandler) resetSessionAggregates() {
+ if h.aggregate != nil {
+ h.aggregate.Shutdown()
+ h.aggregate = nil
+ }
+ if h.turboAggregate != nil {
+ h.turboAggregate.Shutdown()
+ h.turboAggregate = nil
+ }
+}