summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 07:40:22 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 07:40:22 +0200
commit2a665812a0c224ef32d37b2cca681512c5b7d6c1 (patch)
tree8e5cf0c2c28775bff5afeeea4d719c627442adbc
parent71e431af2e65196ad4e7bc3404e772b1726d3338 (diff)
task 399: add client session spec scaffolding
-rw-r--r--cmd/dcat/main.go2
-rw-r--r--cmd/dgrep/main.go2
-rw-r--r--cmd/dmap/main.go2
-rw-r--r--cmd/dtail/main.go2
-rw-r--r--internal/clients/baseclient.go9
-rw-r--r--internal/clients/catclient.go18
-rw-r--r--internal/clients/connectors/serverconnection_test.go12
-rw-r--r--internal/clients/grepclient.go18
-rw-r--r--internal/clients/handlers/basehandler.go79
-rw-r--r--internal/clients/handlers/basehandler_test.go39
-rw-r--r--internal/clients/handlers/clienthandler.go12
-rw-r--r--internal/clients/handlers/handler.go4
-rw-r--r--internal/clients/handlers/healthhandler.go12
-rw-r--r--internal/clients/handlers/maprhandler.go12
-rw-r--r--internal/clients/healthclient.go16
-rw-r--r--internal/clients/maker.go4
-rw-r--r--internal/clients/maprclient.go33
-rw-r--r--internal/clients/session_spec.go124
-rw-r--r--internal/clients/session_spec_test.go133
-rw-r--r--internal/clients/tailclient.go19
-rw-r--r--internal/config/args.go4
-rw-r--r--internal/protocol/capabilities.go9
22 files changed, 503 insertions, 62 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index 64c6f10..554887e 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -28,6 +28,7 @@ func main() {
flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
+ flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions")
flag.BoolVar(&args.Plain, "plain", false, "Plain output mode")
flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
@@ -36,6 +37,7 @@ func main() {
flag.IntVar(&args.SSHAgentKeyIndex, "agentKeyIndex", -1, "SSH agent key index to use (-1 for all keys)")
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
+ flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index aadf5c7..804b047 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -29,6 +29,7 @@ func main() {
flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex")
+ flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions")
flag.BoolVar(&args.Plain, "plain", false, "Plain output mode")
flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
@@ -40,6 +41,7 @@ func main() {
flag.IntVar(&args.SSHAgentKeyIndex, "agentKeyIndex", -1, "SSH agent key index to use (-1 for all keys)")
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
+ flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index e6665ee..088da68 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -32,6 +32,7 @@ func main() {
flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
+ flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions")
flag.BoolVar(&args.Plain, "plain", false, "Plain output mode")
flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
@@ -41,6 +42,7 @@ func main() {
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
+ flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index d3fe931..d4dbad0 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -37,6 +37,7 @@ func main() {
flag.BoolVar(&args.NoAuthKey, "no-auth-key", false, "Disable auth-key fast reconnect feature")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex")
+ flag.BoolVar(&args.InteractiveQuery, "interactive-query", false, "Enable interactive in-flight query control over supported sessions")
flag.BoolVar(&args.Plain, "plain", false, "Plain output mode")
flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&checkHealth, "checkHealth", false, "Deprecated, flag will be removed soon")
@@ -53,6 +54,7 @@ func main() {
flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection")
flag.IntVar(&shutdownAfter, "shutdownAfter", 3600*24, "Shutdown after so many seconds")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
+ flag.StringVar(&args.ControlTTYPath, "control-tty", "/dev/tty", "TTY device for interactive query control")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 7358cb3..999c0ed 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -38,6 +38,8 @@ type baseClient struct {
throttleCh chan struct{}
// Retry connection upon failure?
retry bool
+ // The current connection-wide session specification.
+ sessionSpec SessionSpec
// Connection maker helper.
maker maker
// Regex is the regular expresion object for line filtering
@@ -70,6 +72,13 @@ func (c *baseClient) init() {
func (c *baseClient) makeConnections(maker maker) {
c.maker = maker
+ if builder, ok := maker.(sessionSpecMaker); ok {
+ sessionSpec, err := builder.makeSessionSpec()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build session specification", err)
+ }
+ c.sessionSpec = sessionSpec
+ }
discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle)
for _, server := range discoveryService.ServerList() {
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index 0a62b9d..a258e64 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -2,9 +2,7 @@ package clients
import (
"errors"
- "fmt"
"runtime"
- "strings"
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
@@ -42,14 +40,18 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
+func (c CatClient) makeSessionSpec() (SessionSpec, error) {
+ return NewSessionSpec(c.Args), nil
+}
+
func (c CatClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
+ sessionSpec, err := c.makeSessionSpec()
if err != nil {
- dlog.Client.FatalPanic(err)
+ dlog.Client.FatalPanic("unable to build cat session spec", err)
}
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
+ commands, err = sessionSpec.Commands()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build cat commands from session spec", err)
}
- return
+ return commands
}
diff --git a/internal/clients/connectors/serverconnection_test.go b/internal/clients/connectors/serverconnection_test.go
index 227a1e9..be53cd4 100644
--- a/internal/clients/connectors/serverconnection_test.go
+++ b/internal/clients/connectors/serverconnection_test.go
@@ -175,6 +175,14 @@ func (m *mockHandler) SendMessage(command string) error {
return nil
}
+func (m *mockHandler) Capabilities() []string {
+ return nil
+}
+
+func (m *mockHandler) HasCapability(string) bool {
+ return false
+}
+
func (m *mockHandler) Server() string {
return "mock"
}
@@ -191,6 +199,10 @@ func (m *mockHandler) Done() <-chan struct{} {
return ch
}
+func (m *mockHandler) WaitForCapabilities(timeout time.Duration) bool {
+ return false
+}
+
func (m *mockHandler) Read(_ []byte) (int, error) {
return 0, nil
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index f0f08d4..2e234a7 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -2,9 +2,7 @@ package clients
import (
"errors"
- "fmt"
"runtime"
- "strings"
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
@@ -43,14 +41,18 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
+func (c GrepClient) makeSessionSpec() (SessionSpec, error) {
+ return NewSessionSpec(c.Args), nil
+}
+
func (c GrepClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
+ sessionSpec, err := c.makeSessionSpec()
if err != nil {
- dlog.Client.FatalPanic(err)
+ dlog.Client.FatalPanic("unable to build grep session spec", err)
}
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
+ commands, err = sessionSpec.Commands()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build grep commands from session spec", err)
}
- return
+ return commands
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 1debb98..923b24a 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -5,7 +5,9 @@ import (
"encoding/base64"
"fmt"
"io"
+ "sort"
"strings"
+ "sync"
"time"
"github.com/mimecast/dtail/internal"
@@ -20,6 +22,11 @@ type baseHandler struct {
commands chan string
receiveBuf bytes.Buffer
status int
+
+ capabilitiesMu sync.RWMutex
+ capabilities map[string]struct{}
+ capabilitiesCh chan struct{}
+ capabilitiesOk sync.Once
}
func (h *baseHandler) String() string {
@@ -40,6 +47,26 @@ func (h *baseHandler) Status() int {
return h.status
}
+func (h *baseHandler) Capabilities() []string {
+ h.capabilitiesMu.RLock()
+ defer h.capabilitiesMu.RUnlock()
+
+ capabilities := make([]string, 0, len(h.capabilities))
+ for capability := range h.capabilities {
+ capabilities = append(capabilities, capability)
+ }
+ sort.Strings(capabilities)
+ return capabilities
+}
+
+func (h *baseHandler) HasCapability(name string) bool {
+ h.capabilitiesMu.RLock()
+ defer h.capabilitiesMu.RUnlock()
+
+ _, ok := h.capabilities[name]
+ return ok
+}
+
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -148,16 +175,68 @@ func parseAuthKeyMessage(message string) (isAuthKeyMessage bool, ok bool, detail
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
+ case strings.HasPrefix(message, protocol.HiddenCapabilitiesPrefix):
+ h.handleCapabilitiesMessage(message)
case strings.HasPrefix(message, ".syn close connection"):
go h.SendMessage(".ack close connection")
h.Shutdown()
}
}
+func (h *baseHandler) handleCapabilitiesMessage(message string) {
+ capabilities := strings.Fields(strings.TrimPrefix(message, protocol.HiddenCapabilitiesPrefix))
+
+ h.capabilitiesMu.Lock()
+ defer h.capabilitiesMu.Unlock()
+
+ if h.capabilities == nil {
+ h.capabilities = make(map[string]struct{})
+ }
+ for _, capability := range capabilities {
+ if capability == "" {
+ continue
+ }
+ h.capabilities[capability] = struct{}{}
+ }
+
+ h.capabilitiesOk.Do(func() {
+ if h.capabilitiesCh != nil {
+ close(h.capabilitiesCh)
+ }
+ })
+}
+
func (h *baseHandler) Done() <-chan struct{} {
return h.done.Done()
}
+func (h *baseHandler) WaitForCapabilities(timeout time.Duration) bool {
+ if h.capabilitiesCh == nil {
+ return false
+ }
+
+ if timeout <= 0 {
+ select {
+ case <-h.capabilitiesCh:
+ return true
+ default:
+ return false
+ }
+ }
+
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+
+ select {
+ case <-h.capabilitiesCh:
+ return true
+ case <-h.Done():
+ return false
+ case <-timer.C:
+ return false
+ }
+}
+
func (h *baseHandler) Shutdown() {
h.done.Shutdown()
}
diff --git a/internal/clients/handlers/basehandler_test.go b/internal/clients/handlers/basehandler_test.go
index 996cd23..fddc890 100644
--- a/internal/clients/handlers/basehandler_test.go
+++ b/internal/clients/handlers/basehandler_test.go
@@ -3,7 +3,9 @@ package handlers
import (
"fmt"
"testing"
+ "time"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -57,3 +59,40 @@ func TestParseAuthKeyMessage(t *testing.T) {
})
}
}
+
+func TestHandleCapabilitiesMessage(t *testing.T) {
+ handler := baseHandler{
+ done: internal.NewDone(),
+ capabilities: make(map[string]struct{}),
+ capabilitiesCh: make(chan struct{}),
+ }
+
+ handler.handleHiddenMessage(".syn capabilities query-update-v1 feature-two")
+
+ if !handler.HasCapability(protocol.CapabilityQueryUpdateV1) {
+ t.Fatalf("expected handler to track %q", protocol.CapabilityQueryUpdateV1)
+ }
+ if !handler.HasCapability("feature-two") {
+ t.Fatalf("expected handler to track feature-two")
+ }
+ if handler.WaitForCapabilities(10*time.Millisecond) != true {
+ t.Fatalf("expected capabilities wait to succeed")
+ }
+
+ capabilities := handler.Capabilities()
+ if len(capabilities) != 2 {
+ t.Fatalf("unexpected capabilities: %#v", capabilities)
+ }
+}
+
+func TestWaitForCapabilitiesTimeout(t *testing.T) {
+ handler := baseHandler{
+ done: internal.NewDone(),
+ capabilities: make(map[string]struct{}),
+ capabilitiesCh: make(chan struct{}),
+ }
+
+ if handler.WaitForCapabilities(5 * time.Millisecond) {
+ t.Fatalf("expected capabilities wait to time out")
+ }
+}
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 4d29429..61bbc50 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -18,11 +18,13 @@ func NewClientHandler(server string) *ClientHandler {
return &ClientHandler{
baseHandler{
- server: server,
- shellStarted: false,
- commands: make(chan string),
- status: -1,
- done: internal.NewDone(),
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: -1,
+ done: internal.NewDone(),
+ capabilities: make(map[string]struct{}),
+ capabilitiesCh: make(chan struct{}),
},
}
}
diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go
index afa87e2..aebebaa 100644
--- a/internal/clients/handlers/handler.go
+++ b/internal/clients/handlers/handler.go
@@ -2,14 +2,18 @@ package handlers
import (
"io"
+ "time"
)
// Handler provides all methods which can be run on any client handler.
type Handler interface {
io.ReadWriter
+ Capabilities() []string
+ HasCapability(name string) bool
SendMessage(command string) error
Server() string
Status() int
Shutdown()
Done() <-chan struct{}
+ WaitForCapabilities(timeout time.Duration) bool
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 47b594e..cd5605a 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -19,11 +19,13 @@ func NewHealthHandler(server string) *HealthHandler {
dlog.Client.Debug(server, "Creating new health handler")
return &HealthHandler{
baseHandler: baseHandler{
- server: server,
- shellStarted: false,
- commands: make(chan string),
- status: 2, // Assume CRITICAL status by default.
- done: internal.NewDone(),
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: 2, // Assume CRITICAL status by default.
+ done: internal.NewDone(),
+ capabilities: make(map[string]struct{}),
+ capabilitiesCh: make(chan struct{}),
},
}
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 4c11470..9e9a0d1 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -25,11 +25,13 @@ func NewMaprHandler(server string, query *mapr.Query,
return &MaprHandler{
baseHandler: baseHandler{
- server: server,
- shellStarted: false,
- commands: make(chan string),
- status: -1,
- done: internal.NewDone(),
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: -1,
+ done: internal.NewDone(),
+ capabilities: make(map[string]struct{}),
+ capabilitiesCh: make(chan struct{}),
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index f699912..7fc2e97 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -7,6 +7,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -41,9 +42,20 @@ func (c HealthClient) makeHandler(server string) handlers.Handler {
return handlers.NewHealthHandler(server)
}
+func (c HealthClient) makeSessionSpec() (SessionSpec, error) {
+ return NewSessionSpec(c.Args), nil
+}
+
func (c HealthClient) makeCommands() (commands []string) {
- commands = append(commands, "health")
- return
+ sessionSpec, err := c.makeSessionSpec()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build health session spec", err)
+ }
+ commands, err = sessionSpec.Commands()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build health commands from session spec", err)
+ }
+ return commands
}
// Start the health client.
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index d5ffd8b..6d75fc1 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -11,3 +11,7 @@ type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
}
+
+type sessionSpecMaker interface {
+ makeSessionSpec() (SessionSpec, error)
+}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 2757229..4bf0cf0 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"runtime"
- "strings"
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
@@ -118,28 +117,22 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
+func (c MaprClient) makeSessionSpec() (SessionSpec, error) {
+ sessionSpec := NewSessionSpec(c.Args)
+ sessionSpec.Query = c.query.RawQuery
+ return sessionSpec, nil
+}
+
func (c MaprClient) makeCommands() (commands []string) {
- // Include options with the map command so serverless mode is properly detected
- commands = append(commands, fmt.Sprintf("map:%s %s", c.Args.SerializeOptions(), c.query.RawQuery))
- modeStr := "cat"
- if c.Mode == omode.TailClient {
- modeStr = "tail"
+ sessionSpec, err := c.makeSessionSpec()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build map session spec", err)
}
-
- for _, file := range strings.Split(c.What, ",") {
- regex, err := c.Regex.Serialize()
- if err != nil {
- dlog.Client.FatalPanic(err)
- }
- if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
- modeStr, file, regex))
- continue
- }
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- modeStr, c.Args.SerializeOptions(), file, regex))
+ commands, err = sessionSpec.Commands()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build map commands from session spec", err)
}
- return
+ return commands
}
func (c *MaprClient) periodicReportResults(ctx context.Context) {
diff --git a/internal/clients/session_spec.go b/internal/clients/session_spec.go
new file mode 100644
index 0000000..37b1803
--- /dev/null
+++ b/internal/clients/session_spec.go
@@ -0,0 +1,124 @@
+package clients
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+// SessionSpec captures the mutable, per-connection workload a DTail client wants to run.
+type SessionSpec struct {
+ Mode omode.Mode
+ Files []string
+ Options string
+ Query string
+ Regex string
+ RegexInvert bool
+ Timeout int
+}
+
+// NewSessionSpec returns a session specification from client args.
+func NewSessionSpec(args config.Args) SessionSpec {
+ return SessionSpec{
+ Mode: args.Mode,
+ Files: splitSessionFiles(args.What),
+ Options: args.SerializeOptions(),
+ Query: strings.TrimSpace(args.QueryStr),
+ Regex: args.RegexStr,
+ RegexInvert: args.RegexInvert,
+ Timeout: args.Timeout,
+ }
+}
+
+// Commands returns the legacy command stream for this session specification.
+func (s SessionSpec) Commands() ([]string, error) {
+ switch {
+ case s.Mode == omode.HealthClient:
+ return []string{"health"}, nil
+ case s.Query != "":
+ return s.queryCommands()
+ default:
+ return s.readCommands(s.Mode.String())
+ }
+}
+
+func (s SessionSpec) queryCommands() ([]string, error) {
+ if s.Mode != omode.MapClient && s.Mode != omode.TailClient {
+ return nil, fmt.Errorf("session spec query mode requires map or tail mode, got %s", s.Mode)
+ }
+
+ regexValue, err := s.serializedRegex()
+ if err != nil {
+ return nil, err
+ }
+
+ commands := []string{fmt.Sprintf("map:%s %s", s.Options, s.Query)}
+ readMode := "cat"
+ if s.Mode == omode.TailClient {
+ readMode = "tail"
+ }
+
+ for _, file := range s.Files {
+ if s.Timeout > 0 {
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", s.Timeout, readMode, file, regexValue))
+ continue
+ }
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", readMode, s.Options, file, regexValue))
+ }
+
+ return commands, nil
+}
+
+func (s SessionSpec) readCommands(mode string) ([]string, error) {
+ switch s.Mode {
+ case omode.TailClient, omode.CatClient, omode.GrepClient:
+ default:
+ return nil, fmt.Errorf("unsupported session mode %s", s.Mode)
+ }
+
+ regexValue, err := s.serializedRegex()
+ if err != nil {
+ return nil, err
+ }
+
+ var commands []string
+ for _, file := range s.Files {
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", mode, s.Options, file, regexValue))
+ }
+
+ return commands, nil
+}
+
+func (s SessionSpec) serializedRegex() (string, error) {
+ flag := regex.Default
+ if s.RegexInvert {
+ flag = regex.Invert
+ }
+
+ re, err := regex.New(s.Regex, flag)
+ if err != nil {
+ return "", err
+ }
+
+ return re.Serialize()
+}
+
+func splitSessionFiles(what string) []string {
+ if strings.TrimSpace(what) == "" {
+ return nil
+ }
+
+ rawFiles := strings.Split(what, ",")
+ files := make([]string, 0, len(rawFiles))
+ for _, file := range rawFiles {
+ file = strings.TrimSpace(file)
+ if file == "" {
+ continue
+ }
+ files = append(files, file)
+ }
+ return files
+}
diff --git a/internal/clients/session_spec_test.go b/internal/clients/session_spec_test.go
new file mode 100644
index 0000000..aa3c45d
--- /dev/null
+++ b/internal/clients/session_spec_test.go
@@ -0,0 +1,133 @@
+package clients
+
+import (
+ "testing"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/omode"
+)
+
+func TestSessionSpecCommands(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ spec SessionSpec
+ want []string
+ wantErr bool
+ }{
+ {
+ name: "tail read commands",
+ spec: SessionSpec{
+ Mode: omode.TailClient,
+ Files: []string{"/var/log/app.log"},
+ Options: "plain=true",
+ Regex: "ERROR",
+ },
+ want: []string{"tail:plain=true /var/log/app.log regex:default,literal ERROR"},
+ },
+ {
+ name: "map client commands",
+ spec: SessionSpec{
+ Mode: omode.MapClient,
+ Files: []string{"/var/log/app.log"},
+ Options: "plain=true",
+ Query: "from STATS select count(*)",
+ Regex: ".",
+ },
+ want: []string{
+ "map:plain=true from STATS select count(*)",
+ "cat:plain=true /var/log/app.log regex:noop ",
+ },
+ },
+ {
+ name: "tail query with timeout",
+ spec: SessionSpec{
+ Mode: omode.TailClient,
+ Files: []string{"/var/log/app.log"},
+ Options: "plain=true",
+ Query: "from STATS select count(*)",
+ Regex: "WARN",
+ Timeout: 15,
+ },
+ want: []string{
+ "map:plain=true from STATS select count(*)",
+ "timeout 15 tail /var/log/app.log regex:default,literal WARN",
+ },
+ },
+ {
+ name: "health command",
+ spec: SessionSpec{
+ Mode: omode.HealthClient,
+ },
+ want: []string{"health"},
+ },
+ {
+ name: "invalid regex returns error",
+ spec: SessionSpec{
+ Mode: omode.GrepClient,
+ Files: []string{"/var/log/app.log"},
+ Regex: "[",
+ },
+ wantErr: true,
+ },
+ {
+ name: "query with unsupported mode returns error",
+ spec: SessionSpec{
+ Mode: omode.GrepClient,
+ Files: []string{"/var/log/app.log"},
+ Query: "from STATS select count(*)",
+ Regex: ".",
+ },
+ wantErr: true,
+ },
+ }
+
+ for _, tc := range tests {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ got, err := tc.spec.Commands()
+ if tc.wantErr {
+ if err == nil {
+ t.Fatalf("expected error, got nil")
+ }
+ return
+ }
+ if err != nil {
+ t.Fatalf("Commands() error = %v", err)
+ }
+ if len(got) != len(tc.want) {
+ t.Fatalf("unexpected command count: got %d want %d (%v)", len(got), len(tc.want), got)
+ }
+ for i := range tc.want {
+ if got[i] != tc.want[i] {
+ t.Fatalf("command %d mismatch: got %q want %q", i, got[i], tc.want[i])
+ }
+ }
+ })
+ }
+}
+
+func TestNewSessionSpecSplitsFiles(t *testing.T) {
+ t.Parallel()
+
+ spec := NewSessionSpec(config.Args{
+ Mode: omode.GrepClient,
+ What: " a.log , , b.log ",
+ RegexStr: "ERROR",
+ RegexInvert: true,
+ Timeout: 10,
+ })
+
+ if len(spec.Files) != 2 || spec.Files[0] != "a.log" || spec.Files[1] != "b.log" {
+ t.Fatalf("unexpected files: %#v", spec.Files)
+ }
+ if !spec.RegexInvert {
+ t.Fatalf("expected RegexInvert to be true")
+ }
+ if spec.Timeout != 10 {
+ t.Fatalf("unexpected timeout: %d", spec.Timeout)
+ }
+}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index fff6646..a90d80a 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -1,9 +1,7 @@
package clients
import (
- "fmt"
"runtime"
- "strings"
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
@@ -37,15 +35,18 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
+func (c TailClient) makeSessionSpec() (SessionSpec, error) {
+ return NewSessionSpec(c.Args), nil
+}
+
func (c TailClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
+ sessionSpec, err := c.makeSessionSpec()
if err != nil {
- dlog.Client.FatalPanic(err)
+ dlog.Client.FatalPanic("unable to build tail session spec", err)
}
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
+ commands, err = sessionSpec.Commands()
+ if err != nil {
+ dlog.Client.FatalPanic("unable to build tail commands from session spec", err)
}
- dlog.Client.Debug(commands)
- return
+ return commands
}
diff --git a/internal/config/args.go b/internal/config/args.go
index d612e21..42aff85 100644
--- a/internal/config/args.go
+++ b/internal/config/args.go
@@ -18,7 +18,9 @@ type Args struct {
Arguments []string
ConfigFile string
ConnectionsPerCPU int
+ ControlTTYPath string
Discovery string
+ InteractiveQuery bool
LogDir string
Logger string
LogLevel string
@@ -52,7 +54,9 @@ func (a *Args) String() string {
sb.WriteString(fmt.Sprintf("%s:%v,", "Arguments", a.Arguments))
sb.WriteString(fmt.Sprintf("%s:%v,", "ConfigFile", a.ConfigFile))
sb.WriteString(fmt.Sprintf("%s:%v,", "ConnectionsPerCPU", a.ConnectionsPerCPU))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "ControlTTYPath", a.ControlTTYPath))
sb.WriteString(fmt.Sprintf("%s:%v,", "Discovery", a.Discovery))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "InteractiveQuery", a.InteractiveQuery))
sb.WriteString(fmt.Sprintf("%s:%v,", "LogDir", a.LogDir))
sb.WriteString(fmt.Sprintf("%s:%v,", "LogLevel", a.LogLevel))
sb.WriteString(fmt.Sprintf("%s:%v,", "Logger", a.Logger))
diff --git a/internal/protocol/capabilities.go b/internal/protocol/capabilities.go
new file mode 100644
index 0000000..24b2704
--- /dev/null
+++ b/internal/protocol/capabilities.go
@@ -0,0 +1,9 @@
+package protocol
+
+const (
+ // HiddenCapabilitiesPrefix identifies server capability advertisements sent over the hidden control channel.
+ HiddenCapabilitiesPrefix = ".syn capabilities "
+
+ // CapabilityQueryUpdateV1 marks support for in-flight query replacement over an existing session.
+ CapabilityQueryUpdateV1 = "query-update-v1"
+)