diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 07:40:22 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 07:40:22 +0200 |
| commit | 2a665812a0c224ef32d37b2cca681512c5b7d6c1 (patch) | |
| tree | 8e5cf0c2c28775bff5afeeea4d719c627442adbc | |
| parent | 71e431af2e65196ad4e7bc3404e772b1726d3338 (diff) | |
task 399: add client session spec scaffolding
| -rw-r--r-- | cmd/dcat/main.go | 2 | ||||
| -rw-r--r-- | cmd/dgrep/main.go | 2 | ||||
| -rw-r--r-- | cmd/dmap/main.go | 2 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 2 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 9 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 18 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection_test.go | 12 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 18 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 79 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler_test.go | 39 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 12 | ||||
| -rw-r--r-- | internal/clients/handlers/handler.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 12 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 12 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 16 | ||||
| -rw-r--r-- | internal/clients/maker.go | 4 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 33 | ||||
| -rw-r--r-- | internal/clients/session_spec.go | 124 | ||||
| -rw-r--r-- | internal/clients/session_spec_test.go | 133 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 19 | ||||
| -rw-r--r-- | internal/config/args.go | 4 | ||||
| -rw-r--r-- | internal/protocol/capabilities.go | 9 |
22 files changed, 503 insertions, 62 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index 64c6f10..554887e 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -28,6 +28,7 @@ func main() { flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors") flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature") flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") + flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions") flag.BoolVar(&args.Plain, "plain", false, "Plain output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") @@ -36,6 +37,7 @@ func main() { flag.IntVar(&args.SSHAgentKeyIndex, "agentKeyIndex", -1, "SSH agent key index to use (-1 for all keys)") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") + flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index aadf5c7..804b047 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -29,6 +29,7 @@ func main() { flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature") flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex") + flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions") flag.BoolVar(&args.Plain, "plain", false, "Plain output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") @@ -40,6 +41,7 @@ func main() { flag.IntVar(&args.SSHAgentKeyIndex, "agentKeyIndex", -1, "SSH agent key index to use (-1 for all keys)") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") + flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index e6665ee..088da68 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -32,6 +32,7 @@ func main() { flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors") flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature") flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") + flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions") flag.BoolVar(&args.Plain, "plain", false, "Plain output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") @@ -41,6 +42,7 @@ func main() { flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") + flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index d3fe931..d4dbad0 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -37,6 +37,7 @@ func main() { flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature") flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex") + flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions") flag.BoolVar(&args.Plain, "plain", false, "Plain output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&checkHealth, "checkHealth", false, "Deprecated, flag will be removed soon") @@ -53,6 +54,7 @@ func main() { flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.IntVar(&shutdownAfter, "shutdownAfter", 3600*24, "Shutdown after so many seconds") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") + flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 7358cb3..999c0ed 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -38,6 +38,8 @@ type baseClient struct { throttleCh chan struct{} // Retry connection upon failure? retry bool + // The current connection-wide session specification. + sessionSpec SessionSpec // Connection maker helper. maker maker // Regex is the regular expresion object for line filtering @@ -70,6 +72,13 @@ func (c *baseClient) init() { func (c *baseClient) makeConnections(maker maker) { c.maker = maker + if builder, ok := maker.(sessionSpecMaker); ok { + sessionSpec, err := builder.makeSessionSpec() + if err != nil { + dlog.Client.FatalPanic("unable to build session specification", err) + } + c.sessionSpec = sessionSpec + } discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle) for _, server := range discoveryService.ServerList() { diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 0a62b9d..a258e64 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -2,9 +2,7 @@ package clients import ( "errors" - "fmt" "runtime" - "strings" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" @@ -42,14 +40,18 @@ func (c CatClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } +func (c CatClient) makeSessionSpec() (SessionSpec, error) { + return NewSessionSpec(c.Args), nil +} + func (c CatClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() + sessionSpec, err := c.makeSessionSpec() if err != nil { - dlog.Client.FatalPanic(err) + dlog.Client.FatalPanic("unable to build cat session spec", err) } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) + commands, err = sessionSpec.Commands() + if err != nil { + dlog.Client.FatalPanic("unable to build cat commands from session spec", err) } - return + return commands } diff --git a/internal/clients/connectors/serverconnection_test.go b/internal/clients/connectors/serverconnection_test.go index 227a1e9..be53cd4 100644 --- a/internal/clients/connectors/serverconnection_test.go +++ b/internal/clients/connectors/serverconnection_test.go @@ -175,6 +175,14 @@ func (m *mockHandler) SendMessage(command string) error { return nil } +func (m *mockHandler) Capabilities() []string { + return nil +} + +func (m *mockHandler) HasCapability(string) bool { + return false +} + func (m *mockHandler) Server() string { return "mock" } @@ -191,6 +199,10 @@ func (m *mockHandler) Done() <-chan struct{} { return ch } +func (m *mockHandler) WaitForCapabilities(timeout time.Duration) bool { + return false +} + func (m *mockHandler) Read(_ []byte) (int, error) { return 0, nil } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index f0f08d4..2e234a7 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -2,9 +2,7 @@ package clients import ( "errors" - "fmt" "runtime" - "strings" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" @@ -43,14 +41,18 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } +func (c GrepClient) makeSessionSpec() (SessionSpec, error) { + return NewSessionSpec(c.Args), nil +} + func (c GrepClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() + sessionSpec, err := c.makeSessionSpec() if err != nil { - dlog.Client.FatalPanic(err) + dlog.Client.FatalPanic("unable to build grep session spec", err) } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) + commands, err = sessionSpec.Commands() + if err != nil { + dlog.Client.FatalPanic("unable to build grep commands from session spec", err) } - return + return commands } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 1debb98..923b24a 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -5,7 +5,9 @@ import ( "encoding/base64" "fmt" "io" + "sort" "strings" + "sync" "time" "github.com/mimecast/dtail/internal" @@ -20,6 +22,11 @@ type baseHandler struct { commands chan string receiveBuf bytes.Buffer status int + + capabilitiesMu sync.RWMutex + capabilities map[string]struct{} + capabilitiesCh chan struct{} + capabilitiesOk sync.Once } func (h *baseHandler) String() string { @@ -40,6 +47,26 @@ func (h *baseHandler) Status() int { return h.status } +func (h *baseHandler) Capabilities() []string { + h.capabilitiesMu.RLock() + defer h.capabilitiesMu.RUnlock() + + capabilities := make([]string, 0, len(h.capabilities)) + for capability := range h.capabilities { + capabilities = append(capabilities, capability) + } + sort.Strings(capabilities) + return capabilities +} + +func (h *baseHandler) HasCapability(name string) bool { + h.capabilitiesMu.RLock() + defer h.capabilitiesMu.RUnlock() + + _, ok := h.capabilities[name] + return ok +} + // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -148,16 +175,68 @@ func parseAuthKeyMessage(message string) (isAuthKeyMessage bool, ok bool, detail // to the end user. func (h *baseHandler) handleHiddenMessage(message string) { switch { + case strings.HasPrefix(message, protocol.HiddenCapabilitiesPrefix): + h.handleCapabilitiesMessage(message) case strings.HasPrefix(message, ".syn close connection"): go h.SendMessage(".ack close connection") h.Shutdown() } } +func (h *baseHandler) handleCapabilitiesMessage(message string) { + capabilities := strings.Fields(strings.TrimPrefix(message, protocol.HiddenCapabilitiesPrefix)) + + h.capabilitiesMu.Lock() + defer h.capabilitiesMu.Unlock() + + if h.capabilities == nil { + h.capabilities = make(map[string]struct{}) + } + for _, capability := range capabilities { + if capability == "" { + continue + } + h.capabilities[capability] = struct{}{} + } + + h.capabilitiesOk.Do(func() { + if h.capabilitiesCh != nil { + close(h.capabilitiesCh) + } + }) +} + func (h *baseHandler) Done() <-chan struct{} { return h.done.Done() } +func (h *baseHandler) WaitForCapabilities(timeout time.Duration) bool { + if h.capabilitiesCh == nil { + return false + } + + if timeout <= 0 { + select { + case <-h.capabilitiesCh: + return true + default: + return false + } + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-h.capabilitiesCh: + return true + case <-h.Done(): + return false + case <-timer.C: + return false + } +} + func (h *baseHandler) Shutdown() { h.done.Shutdown() } diff --git a/internal/clients/handlers/basehandler_test.go b/internal/clients/handlers/basehandler_test.go index 996cd23..fddc890 100644 --- a/internal/clients/handlers/basehandler_test.go +++ b/internal/clients/handlers/basehandler_test.go @@ -3,7 +3,9 @@ package handlers import ( "fmt" "testing" + "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/protocol" ) @@ -57,3 +59,40 @@ func TestParseAuthKeyMessage(t *testing.T) { }) } } + +func TestHandleCapabilitiesMessage(t *testing.T) { + handler := baseHandler{ + done: internal.NewDone(), + capabilities: make(map[string]struct{}), + capabilitiesCh: make(chan struct{}), + } + + handler.handleHiddenMessage(".syn capabilities query-update-v1 feature-two") + + if !handler.HasCapability(protocol.CapabilityQueryUpdateV1) { + t.Fatalf("expected handler to track %q", protocol.CapabilityQueryUpdateV1) + } + if !handler.HasCapability("feature-two") { + t.Fatalf("expected handler to track feature-two") + } + if handler.WaitForCapabilities(10*time.Millisecond) != true { + t.Fatalf("expected capabilities wait to succeed") + } + + capabilities := handler.Capabilities() + if len(capabilities) != 2 { + t.Fatalf("unexpected capabilities: %#v", capabilities) + } +} + +func TestWaitForCapabilitiesTimeout(t *testing.T) { + handler := baseHandler{ + done: internal.NewDone(), + capabilities: make(map[string]struct{}), + capabilitiesCh: make(chan struct{}), + } + + if handler.WaitForCapabilities(5 * time.Millisecond) { + t.Fatalf("expected capabilities wait to time out") + } +} diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 4d29429..61bbc50 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -18,11 +18,13 @@ func NewClientHandler(server string) *ClientHandler { return &ClientHandler{ baseHandler{ - server: server, - shellStarted: false, - commands: make(chan string), - status: -1, - done: internal.NewDone(), + server: server, + shellStarted: false, + commands: make(chan string), + status: -1, + done: internal.NewDone(), + capabilities: make(map[string]struct{}), + capabilitiesCh: make(chan struct{}), }, } } diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go index afa87e2..aebebaa 100644 --- a/internal/clients/handlers/handler.go +++ b/internal/clients/handlers/handler.go @@ -2,14 +2,18 @@ package handlers import ( "io" + "time" ) // Handler provides all methods which can be run on any client handler. type Handler interface { io.ReadWriter + Capabilities() []string + HasCapability(name string) bool SendMessage(command string) error Server() string Status() int Shutdown() Done() <-chan struct{} + WaitForCapabilities(timeout time.Duration) bool } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 47b594e..cd5605a 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,11 +19,13 @@ func NewHealthHandler(server string) *HealthHandler { dlog.Client.Debug(server, "Creating new health handler") return &HealthHandler{ baseHandler: baseHandler{ - server: server, - shellStarted: false, - commands: make(chan string), - status: 2, // Assume CRITICAL status by default. - done: internal.NewDone(), + server: server, + shellStarted: false, + commands: make(chan string), + status: 2, // Assume CRITICAL status by default. + done: internal.NewDone(), + capabilities: make(map[string]struct{}), + capabilitiesCh: make(chan struct{}), }, } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 4c11470..9e9a0d1 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -25,11 +25,13 @@ func NewMaprHandler(server string, query *mapr.Query, return &MaprHandler{ baseHandler: baseHandler{ - server: server, - shellStarted: false, - commands: make(chan string), - status: -1, - done: internal.NewDone(), + server: server, + shellStarted: false, + commands: make(chan string), + status: -1, + done: internal.NewDone(), + capabilities: make(map[string]struct{}), + capabilitiesCh: make(chan struct{}), }, query: query, aggregate: client.NewAggregate(server, query, globalGroup), diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index f699912..7fc2e97 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -41,9 +42,20 @@ func (c HealthClient) makeHandler(server string) handlers.Handler { return handlers.NewHealthHandler(server) } +func (c HealthClient) makeSessionSpec() (SessionSpec, error) { + return NewSessionSpec(c.Args), nil +} + func (c HealthClient) makeCommands() (commands []string) { - commands = append(commands, "health") - return + sessionSpec, err := c.makeSessionSpec() + if err != nil { + dlog.Client.FatalPanic("unable to build health session spec", err) + } + commands, err = sessionSpec.Commands() + if err != nil { + dlog.Client.FatalPanic("unable to build health commands from session spec", err) + } + return commands } // Start the health client. diff --git a/internal/clients/maker.go b/internal/clients/maker.go index d5ffd8b..6d75fc1 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -11,3 +11,7 @@ type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) } + +type sessionSpecMaker interface { + makeSessionSpec() (SessionSpec, error) +} diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 2757229..4bf0cf0 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "runtime" - "strings" "time" "github.com/mimecast/dtail/internal/clients/handlers" @@ -118,28 +117,22 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } +func (c MaprClient) makeSessionSpec() (SessionSpec, error) { + sessionSpec := NewSessionSpec(c.Args) + sessionSpec.Query = c.query.RawQuery + return sessionSpec, nil +} + func (c MaprClient) makeCommands() (commands []string) { - // Include options with the map command so serverless mode is properly detected - commands = append(commands, fmt.Sprintf("map:%s %s", c.Args.SerializeOptions(), c.query.RawQuery)) - modeStr := "cat" - if c.Mode == omode.TailClient { - modeStr = "tail" + sessionSpec, err := c.makeSessionSpec() + if err != nil { + dlog.Client.FatalPanic("unable to build map session spec", err) } - - for _, file := range strings.Split(c.What, ",") { - regex, err := c.Regex.Serialize() - if err != nil { - dlog.Client.FatalPanic(err) - } - if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, - modeStr, file, regex)) - continue - } - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - modeStr, c.Args.SerializeOptions(), file, regex)) + commands, err = sessionSpec.Commands() + if err != nil { + dlog.Client.FatalPanic("unable to build map commands from session spec", err) } - return + return commands } func (c *MaprClient) periodicReportResults(ctx context.Context) { diff --git a/internal/clients/session_spec.go b/internal/clients/session_spec.go new file mode 100644 index 0000000..37b1803 --- /dev/null +++ b/internal/clients/session_spec.go @@ -0,0 +1,124 @@ +package clients + +import ( + "fmt" + "strings" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/regex" +) + +// SessionSpec captures the mutable, per-connection workload a DTail client wants to run. +type SessionSpec struct { + Mode omode.Mode + Files []string + Options string + Query string + Regex string + RegexInvert bool + Timeout int +} + +// NewSessionSpec returns a session specification from client args. +func NewSessionSpec(args config.Args) SessionSpec { + return SessionSpec{ + Mode: args.Mode, + Files: splitSessionFiles(args.What), + Options: args.SerializeOptions(), + Query: strings.TrimSpace(args.QueryStr), + Regex: args.RegexStr, + RegexInvert: args.RegexInvert, + Timeout: args.Timeout, + } +} + +// Commands returns the legacy command stream for this session specification. +func (s SessionSpec) Commands() ([]string, error) { + switch { + case s.Mode == omode.HealthClient: + return []string{"health"}, nil + case s.Query != "": + return s.queryCommands() + default: + return s.readCommands(s.Mode.String()) + } +} + +func (s SessionSpec) queryCommands() ([]string, error) { + if s.Mode != omode.MapClient && s.Mode != omode.TailClient { + return nil, fmt.Errorf("session spec query mode requires map or tail mode, got %s", s.Mode) + } + + regexValue, err := s.serializedRegex() + if err != nil { + return nil, err + } + + commands := []string{fmt.Sprintf("map:%s %s", s.Options, s.Query)} + readMode := "cat" + if s.Mode == omode.TailClient { + readMode = "tail" + } + + for _, file := range s.Files { + if s.Timeout > 0 { + commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", s.Timeout, readMode, file, regexValue)) + continue + } + commands = append(commands, fmt.Sprintf("%s:%s %s %s", readMode, s.Options, file, regexValue)) + } + + return commands, nil +} + +func (s SessionSpec) readCommands(mode string) ([]string, error) { + switch s.Mode { + case omode.TailClient, omode.CatClient, omode.GrepClient: + default: + return nil, fmt.Errorf("unsupported session mode %s", s.Mode) + } + + regexValue, err := s.serializedRegex() + if err != nil { + return nil, err + } + + var commands []string + for _, file := range s.Files { + commands = append(commands, fmt.Sprintf("%s:%s %s %s", mode, s.Options, file, regexValue)) + } + + return commands, nil +} + +func (s SessionSpec) serializedRegex() (string, error) { + flag := regex.Default + if s.RegexInvert { + flag = regex.Invert + } + + re, err := regex.New(s.Regex, flag) + if err != nil { + return "", err + } + + return re.Serialize() +} + +func splitSessionFiles(what string) []string { + if strings.TrimSpace(what) == "" { + return nil + } + + rawFiles := strings.Split(what, ",") + files := make([]string, 0, len(rawFiles)) + for _, file := range rawFiles { + file = strings.TrimSpace(file) + if file == "" { + continue + } + files = append(files, file) + } + return files +} diff --git a/internal/clients/session_spec_test.go b/internal/clients/session_spec_test.go new file mode 100644 index 0000000..aa3c45d --- /dev/null +++ b/internal/clients/session_spec_test.go @@ -0,0 +1,133 @@ +package clients + +import ( + "testing" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/omode" +) + +func TestSessionSpecCommands(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + spec SessionSpec + want []string + wantErr bool + }{ + { + name: "tail read commands", + spec: SessionSpec{ + Mode: omode.TailClient, + Files: []string{"/var/log/app.log"}, + Options: "plain=true", + Regex: "ERROR", + }, + want: []string{"tail:plain=true /var/log/app.log regex:default,literal ERROR"}, + }, + { + name: "map client commands", + spec: SessionSpec{ + Mode: omode.MapClient, + Files: []string{"/var/log/app.log"}, + Options: "plain=true", + Query: "from STATS select count(*)", + Regex: ".", + }, + want: []string{ + "map:plain=true from STATS select count(*)", + "cat:plain=true /var/log/app.log regex:noop ", + }, + }, + { + name: "tail query with timeout", + spec: SessionSpec{ + Mode: omode.TailClient, + Files: []string{"/var/log/app.log"}, + Options: "plain=true", + Query: "from STATS select count(*)", + Regex: "WARN", + Timeout: 15, + }, + want: []string{ + "map:plain=true from STATS select count(*)", + "timeout 15 tail /var/log/app.log regex:default,literal WARN", + }, + }, + { + name: "health command", + spec: SessionSpec{ + Mode: omode.HealthClient, + }, + want: []string{"health"}, + }, + { + name: "invalid regex returns error", + spec: SessionSpec{ + Mode: omode.GrepClient, + Files: []string{"/var/log/app.log"}, + Regex: "[", + }, + wantErr: true, + }, + { + name: "query with unsupported mode returns error", + spec: SessionSpec{ + Mode: omode.GrepClient, + Files: []string{"/var/log/app.log"}, + Query: "from STATS select count(*)", + Regex: ".", + }, + wantErr: true, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + got, err := tc.spec.Commands() + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("Commands() error = %v", err) + } + if len(got) != len(tc.want) { + t.Fatalf("unexpected command count: got %d want %d (%v)", len(got), len(tc.want), got) + } + for i := range tc.want { + if got[i] != tc.want[i] { + t.Fatalf("command %d mismatch: got %q want %q", i, got[i], tc.want[i]) + } + } + }) + } +} + +func TestNewSessionSpecSplitsFiles(t *testing.T) { + t.Parallel() + + spec := NewSessionSpec(config.Args{ + Mode: omode.GrepClient, + What: " a.log , , b.log ", + RegexStr: "ERROR", + RegexInvert: true, + Timeout: 10, + }) + + if len(spec.Files) != 2 || spec.Files[0] != "a.log" || spec.Files[1] != "b.log" { + t.Fatalf("unexpected files: %#v", spec.Files) + } + if !spec.RegexInvert { + t.Fatalf("expected RegexInvert to be true") + } + if spec.Timeout != 10 { + t.Fatalf("unexpected timeout: %d", spec.Timeout) + } +} diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index fff6646..a90d80a 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -1,9 +1,7 @@ package clients import ( - "fmt" "runtime" - "strings" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" @@ -37,15 +35,18 @@ func (c TailClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } +func (c TailClient) makeSessionSpec() (SessionSpec, error) { + return NewSessionSpec(c.Args), nil +} + func (c TailClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() + sessionSpec, err := c.makeSessionSpec() if err != nil { - dlog.Client.FatalPanic(err) + dlog.Client.FatalPanic("unable to build tail session spec", err) } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) + commands, err = sessionSpec.Commands() + if err != nil { + dlog.Client.FatalPanic("unable to build tail commands from session spec", err) } - dlog.Client.Debug(commands) - return + return commands } diff --git a/internal/config/args.go b/internal/config/args.go index d612e21..42aff85 100644 --- a/internal/config/args.go +++ b/internal/config/args.go @@ -18,7 +18,9 @@ type Args struct { Arguments []string ConfigFile string ConnectionsPerCPU int + ControlTTYPath string Discovery string + InteractiveQuery bool LogDir string Logger string LogLevel string @@ -52,7 +54,9 @@ func (a *Args) String() string { sb.WriteString(fmt.Sprintf("%s:%v,", "Arguments", a.Arguments)) sb.WriteString(fmt.Sprintf("%s:%v,", "ConfigFile", a.ConfigFile)) sb.WriteString(fmt.Sprintf("%s:%v,", "ConnectionsPerCPU", a.ConnectionsPerCPU)) + sb.WriteString(fmt.Sprintf("%s:%v,", "ControlTTYPath", a.ControlTTYPath)) sb.WriteString(fmt.Sprintf("%s:%v,", "Discovery", a.Discovery)) + sb.WriteString(fmt.Sprintf("%s:%v,", "InteractiveQuery", a.InteractiveQuery)) sb.WriteString(fmt.Sprintf("%s:%v,", "LogDir", a.LogDir)) sb.WriteString(fmt.Sprintf("%s:%v,", "LogLevel", a.LogLevel)) sb.WriteString(fmt.Sprintf("%s:%v,", "Logger", a.Logger)) diff --git a/internal/protocol/capabilities.go b/internal/protocol/capabilities.go new file mode 100644 index 0000000..24b2704 --- /dev/null +++ b/internal/protocol/capabilities.go @@ -0,0 +1,9 @@ +package protocol + +const ( + // HiddenCapabilitiesPrefix identifies server capability advertisements sent over the hidden control channel. + HiddenCapabilitiesPrefix = ".syn capabilities " + + // CapabilityQueryUpdateV1 marks support for in-flight query replacement over an existing session. + CapabilityQueryUpdateV1 = "query-update-v1" +) |
