diff options
| author | Paul Buetow <git@mx.buetow.org> | 2020-12-28 09:49:10 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2020-12-28 09:49:10 +0000 |
| commit | a5a91623ee60f33dafc16e1752f3fb1f6798ee76 (patch) | |
| tree | c6433ef4a3415cc7206b5fbe733c0539d0e5a60f | |
| parent | ae8ffc84331ca72f0d33fff69edd85d6e03d29ae (diff) | |
| parent | 495e9f38220a6d448b15882a235e7a9c21f21d18 (diff) | |
merge
48 files changed, 227 insertions, 129 deletions
@@ -6,16 +6,16 @@ build: ${GO} build -o dgrep ./cmd/dgrep/main.go ${GO} build -o dmap ./cmd/dmap/main.go ${GO} build -o dtail ./cmd/dtail/main.go +install: + ${GO} install ./cmd/dserver/main.go + ${GO} install ./cmd/dcat/main.go + ${GO} install ./cmd/dgrep/main.go + ${GO} install ./cmd/dmap/main.go + ${GO} install ./cmd/dtail/main.go clean: ls ./cmd/ | while read cmd; do \ test -f $$cmd && rm $$cmd; \ done -install: build - cp -pv dserver ${GOPATH}/bin/dserver - cp -pv dcat ${GOPATH}/bin/dcat - cp -pv dgrep ${GOPATH}/bin/dgrep - cp -pv dmap ${GOPATH}/bin/dmap - cp -pv dtail ${GOPATH}/bin/dtail vet: find . -type d | egrep -v '(./samples|./log|./doc)' | while read dir; do \ echo ${GO} vet $$dir; \ @@ -24,8 +24,8 @@ vet: lint: ${GO} get golang.org/x/lint/golint find . -type d | while read dir; do \ - echo ${GOPATH}/bin/golint $$dir; \ - ${GOPATH}/bin/golint $$dir; \ + echo golint $$dir; \ + golint $$dir; \ done test: ${GO} test ./... -v diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index 05e46ab..c59a242 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -29,6 +29,7 @@ func main() { flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages") flag.BoolVar(&displayVersion, "version", false, "Display version") flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") + flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") flag.IntVar(&sshPort, "port", 2222, "SSH server port") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index 133631f..276dff4 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -31,6 +31,7 @@ func main() { flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages") flag.BoolVar(&displayVersion, "version", false, "Display version") flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") + flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") flag.IntVar(&sshPort, "port", 2222, "SSH server port") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index 9f9ca9d..a954e45 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -34,6 +34,7 @@ func main() { flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages") flag.BoolVar(&displayVersion, "version", false, "Display version") flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") + flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.IntVar(&sshPort, "port", 2222, "SSH server port") diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index aefaa6a..5d4bad1 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -42,6 +42,7 @@ func main() { flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages") flag.BoolVar(&displayVersion, "version", false, "Display version") flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") + flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode") flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") @@ -4,7 +4,8 @@ go 1.15 require ( github.com/DataDog/zstd v1.4.5 - golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de - golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect - golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d // indirect + golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c + golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect + golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect + golang.org/x/term v0.0.0-20201207232118-ee85cb95a76b // indirect ) @@ -8,10 +8,14 @@ golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 h1:sKJQZMuxjOAR/Uo2LBfU90 golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c h1:9HhBz5L/UjnK9XLtiZhYAdue5BVKep3PMmS2LuPDt8k= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/lint v0.0.0-20200130185559-910be7a94367 h1:0IiAsCRByjO2QjX7ZPkw5oU9x+n1YqRL802rjC0c3Aw= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -19,8 +23,16 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d h1:QQrM/CCYEzTs91GZylDCQjGHudbPTxF/1fvXdVh5lMo= golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d h1:MiWWjyhUzZ+jvhZvloX6ZrUsdEghn8a64Upd8EMHglE= +golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201207232118-ee85cb95a76b h1:a0ErnNnPKmhDyIXQvdZr+Lq8dc8xpMeqkF8y5PgQU4Q= +golang.org/x/term v0.0.0-20201207232118-ee85cb95a76b/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/internal/clients/args.go b/internal/clients/args.go index 34fcfa2..7f782f1 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -22,4 +22,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string + Quiet bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d8d4fde..f20156f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,7 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Info("Initiating base client") + logger.Debug("Initiating base client") flag := regex.Default if c.Args.RegexInvert { @@ -66,11 +66,11 @@ func (c *baseClient) makeConnections(maker maker) { c.stats = newTailStats(len(c.connections)) } -func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. - go c.stats.Start(ctx, c.throttleCh, statsCh) + go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) // Keep count of active connections active := make(chan struct{}, len(c.connections)) @@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Info("Terminated connection") + defer logger.Debug("Terminated connection") // We want to have at least one active connection <-active diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index d8e9196..b7b6131 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } return } diff --git a/internal/clients/client.go b/internal/clients/client.go index eb8452d..4a547e8 100644 --- a/internal/clients/client.go +++ b/internal/clients/client.go @@ -4,5 +4,5 @@ import "context" // Client is the interface for the end user command line client. type Client interface { - Start(ctx context.Context, statsCh <-chan struct{}) int + Start(ctx context.Context, statsCh <-chan string) int } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 4024083..652c31b 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,8 +41,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } + return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index b5045e2..f07fd90 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "fmt" "io" - "strconv" "strings" "time" @@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } + return } @@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { case <-h.Done(): return } - - case strings.HasPrefix(message, ".run exitstatus"): - splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") - if len(splitted) != 3 { - logger.Error("Unable to retrieve exitstatus", message) - return - } - i, err := strconv.Atoi(splitted[2]) - if err != nil { - logger.Error("Unable to retrieve exitstatus", message, err) - return - } - logger.Debug("Retrieved exitstatus", h.status) - if i > h.status { - h.status = i - } } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 95693ab..0440706 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,6 +19,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + // The return status. status int } @@ -45,10 +46,12 @@ func (h *HealthHandler) Status() int { return h.status } +// Done returns done channel of the handler. func (h *HealthHandler) Done() <-chan struct{} { return h.done.Done() } +// Shutdown the handler. func (h *HealthHandler) Shutdown() { h.done.Shutdown() } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 1ba6482..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -4,6 +4,9 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" ) +// maker interface helps to re-use code in all DTail client implementations. +// All clients share the baseClient but have different connection handlers +// and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 149129d..1c0c2cc 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -94,12 +94,12 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* } // Start starts the mapreduce client. -func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) { go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Info("Received final mapreduce result") + logger.Debug("Received final mapreduce result") c.reportResults() } @@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { @@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) } return @@ -133,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") + logger.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -165,7 +166,7 @@ func (c *MaprClient) printResults() { } if numLines == 0 { - logger.Info("Empty result set this time...") + logger.Warn("Empty result set this time...") return } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index e7eabd8..d8163d4 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" ) @@ -33,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { var connectedLast int for { var force bool + var messages []string select { - case <-statsCh: + case message := <-statsCh: + messages = append(messages, message) force = true - case <-time.After(time.Second * 2): + case <-time.After(time.Second * 3): case <-ctx.Done(): return } @@ -52,11 +55,18 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) newConnections := connected - connectedLast - if connected == connectedLast && !force { + if (connected == connectedLast || quiet) && !force { continue } - logger.Info(s.statsLine(connected, newConnections, throttle)) + stats := s.statsLine(connected, newConnections, throttle) + switch force { + case true: + messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) + s.printStatsDueInterrupt(messages) + default: + logger.Info(stats) + } connectedLast = connected s.mutex.Lock() @@ -65,6 +75,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) } } +func (s *stats) printStatsDueInterrupt(messages []string) { + logger.Pause() + for _, message := range messages { + fmt.Println(fmt.Sprintf(" %s", message)) + } + time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) + logger.Resume() +} + func (s *stats) statsLine(connected, newConnections int, throttle int) string { percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) @@ -88,5 +107,6 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } + return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 53b5ba4..cefbaa7 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) + return &c, nil } @@ -37,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } logger.Debug(commands) diff --git a/internal/color/colorfy.go b/internal/color/colorfy.go index 9ae46f5..a2beb7a 100644 --- a/internal/color/colorfy.go +++ b/internal/color/colorfy.go @@ -41,16 +41,14 @@ func paintClientStats(line string) string { // Colorfy a given line based on the line's content. func Colorfy(line string) string { - if strings.HasPrefix(line, "REMOTE") { + switch { + case strings.HasPrefix(line, "REMOTE"): return paintRemote(line) - } - if strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|") { + case strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|"): return paintClientStats(line) - } - if strings.Contains(line, "ERROR") { + case strings.Contains(line, "ERROR"): return Paint(Magenta, line) - } - if strings.Contains(line, "WARN") { + case strings.Contains(line, "WARN"): return Paint(Magenta, line) } diff --git a/internal/config/common.go b/internal/config/common.go index 103b390..c3e203e 100644 --- a/internal/config/common.go +++ b/internal/config/common.go @@ -2,10 +2,14 @@ package config // CommonConfig stores configuration keys shared by DTail server and client. type CommonConfig struct { - SSHPort int + // The SSH port number + SSHPort int + // Enable experimental features (mainly for dev purposes) ExperimentalFeaturesEnable bool `json:",omitempty"` - DebugEnable bool `json:",omitempty"` - TraceEnable bool `json:",omitempty"` + // Enable debug logging. Don't enable in production. + DebugEnable bool `json:",omitempty"` + // Enable trace logging. Don't enable in production. + TraceEnable bool `json:",omitempty"` // The log strategy to use, one of // stdout: only log to stdout (useful when used with systemd) // daily: create a log file for every day diff --git a/internal/config/config.go b/internal/config/config.go index dc96d6b..276ddcf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,9 @@ const ScheduleUser string = "DTAIL-SCHEDULE" // ContinuousUser is used for non-interactive continuous mapreduce queries. const ContinuousUser string = "DTAIL-CONTINUOUS" +// InterruptTimeoutS is used to terminate DTail when Ctrl+C was pressed twice within a given interval. +const InterruptTimeoutS int = 3 + // Client holds a DTail client configuration. var Client *ClientConfig diff --git a/internal/config/server.go b/internal/config/server.go index db12cec..dc0d587 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -61,6 +61,7 @@ type ServerConfig struct { Continuous []Continuous `json:",omitempty"` } +// ServerRelaxedAuthEnable should be used for development and testing purposes only. var ServerRelaxedAuthEnable bool // Create a new default server configuration. diff --git a/internal/discovery/comma.go b/internal/discovery/comma.go index 94276c7..4344240 100644 --- a/internal/discovery/comma.go +++ b/internal/discovery/comma.go @@ -1,8 +1,9 @@ package discovery import ( - "github.com/mimecast/dtail/internal/io/logger" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromCOMMA retrieves a list of servers from comma separated input list. diff --git a/internal/discovery/file.go b/internal/discovery/file.go index c04173e..1250755 100644 --- a/internal/discovery/file.go +++ b/internal/discovery/file.go @@ -2,8 +2,9 @@ package discovery import ( "bufio" - "github.com/mimecast/dtail/internal/io/logger" "os" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromFILE retrieves a list of servers from a file. diff --git a/internal/io/fs/permissions/permission_linux.go b/internal/io/fs/permissions/permission_linux.go index feae729..bbc039b 100644 --- a/internal/io/fs/permissions/permission_linux.go +++ b/internal/io/fs/permissions/permission_linux.go @@ -11,7 +11,7 @@ import ( "unsafe" ) -// To check whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a given file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 9db93c0..715be34 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -15,7 +15,11 @@ type Line struct { // lines if that happens but it will signal to the client how // many log lines in % could be transmitted to the client. TransmittedPerc int - SourceID string + // Contains the unique identifier of the source log file. + // It could be the name of the log or it could be one of the parent + // directories in case multiple log files with the same basename are + // followed. + SourceID string } // Return a human readable representation of the followed line. diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index b7db0a7..4254eef 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -72,12 +72,15 @@ type buf struct { func Start(ctx context.Context, mode Modes) { Mode = mode - if Mode.Nothing { + switch { + case Mode.Nothing: return - } - - if Mode.Trace { + case Mode.Quiet: + Mode.Trace = false + Mode.Debug = false + case Mode.Trace: Mode.Debug = true + default: } strategy := logStrategy() @@ -87,7 +90,7 @@ func Start(ctx context.Context, mode Modes) { case DailyStrategy: _, err := os.Stat(config.Common.LogDir) Mode.logToFile = !os.IsNotExist(err) - Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace + Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet case StdoutStrategy: fallthrough default: @@ -131,11 +134,14 @@ func Info(args ...interface{}) string { // Warn message logging. func Warn(args ...interface{}) string { - if Mode.Server { - return log(serverStr, warnStr, args) + if !Mode.Quiet { + if Mode.Server { + return log(serverStr, warnStr, args) + } + return log(clientStr, warnStr, args) } - return log(clientStr, warnStr, args) + return "" } // Error message logging. @@ -242,7 +248,7 @@ func log(what string, severity string, args []interface{}) string { message := strings.Join(messages, "|") write(what, severity, message) - return message + return fmt.Sprintf("%s|%s", severity, message) } // Raw message logging. diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go index 47dadfe..8864179 100644 --- a/internal/io/logger/modes.go +++ b/internal/io/logger/modes.go @@ -6,6 +6,7 @@ type Modes struct { Trace bool Debug bool Nothing bool + Quiet bool logToStdout bool logToFile bool } diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index a438d33..36ebdb5 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -3,9 +3,10 @@ package prompt import ( "bufio" "fmt" - "github.com/mimecast/dtail/internal/io/logger" "os" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // Answer is a user input of a prompt question. @@ -18,8 +19,7 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - - AskAgain bool + AskAgain bool } // Prompt used for interactive user input. diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index 3785d71..500c530 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -4,33 +4,38 @@ import ( "context" "os" gosignal "os/signal" + "syscall" "time" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/config" ) -// StatsCh returns a channel for "please print stats" signalling. -func InterruptCh(ctx context.Context) <-chan struct{} { - sigCh := make(chan os.Signal) - gosignal.Notify(sigCh, os.Interrupt) +// InterruptCh returns a channel for "please print stats" signalling. +func InterruptCh(ctx context.Context) <-chan string { + sigIntCh := make(chan os.Signal) + gosignal.Notify(sigIntCh, os.Interrupt) - statsCh := make(chan struct{}) + sigOtherCh := make(chan os.Signal) + gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) + + statsCh := make(chan string) go func() { for { select { - case <-sigCh: + case <-sigIntCh: select { - case statsCh <- struct{}{}: - logger.Info("Hit Ctrl+C twice to exit") + case statsCh <- "Hint: Hit Ctrl+C again to exit": select { - case <-sigCh: + case <-sigIntCh: os.Exit(0) - case <-time.After(time.Second): + case <-time.After(time.Second * time.Duration(config.InterruptTimeoutS)): } default: - // Stats currently already printed. + // Stats already printed. } + case <-sigOtherCh: + os.Exit(0) case <-ctx.Done(): return } diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 52aaa98..1a89c3a 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -12,6 +12,7 @@ type CallbackFunc func(text string) string type Function struct { // Name of the callback function Name string + // The Go-callback function to call for this DTail function. call CallbackFunc } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 7f6b63c..01852da 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -177,12 +177,6 @@ func (q *Query) parse(tokens []token) error { } } - // Comment out for empty table support, which is "all" log lines. - /* - if q.Table == "" { - return errors.New(invalidQuery + "Empty table specified in 'from' clause") - } - */ if len(q.Select) < 1 { return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index 1882b7e..d6aa0d4 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -92,5 +92,6 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } + return sel, nil } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index cd59b63..28bb074 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -80,6 +80,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &a, nil } +// Shutdown the aggregation engine. func (a *Aggregate) Shutdown() { a.Flush() a.done.Shutdown() diff --git a/internal/mapr/token.go b/internal/mapr/token.go index d337bd2..8972188 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -22,6 +22,7 @@ func (t token) isKeyword() bool { return true } } + return false } @@ -94,6 +95,7 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } + return tokens, strings } @@ -104,5 +106,6 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } + return tokens } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index ff1b489..7a60dba 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -19,6 +19,10 @@ const ( StringNe QueryOperation = iota StringContains QueryOperation = iota StringNotContains QueryOperation = iota + StringHasPrefix QueryOperation = iota + StringNotHasPrefix QueryOperation = iota + StringHasSuffix QueryOperation = iota + StringNotHasSuffix QueryOperation = iota FloatOperation QueryOperation = iota FloatEq QueryOperation = iota FloatNe QueryOperation = iota @@ -78,7 +82,17 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { case "contains": wc.Operation = StringContains case "lacks": + fallthrough + case "ncontains": wc.Operation = StringNotContains + case "hasprefix": + wc.Operation = StringHasPrefix + case "nhasprefix": + wc.Operation = StringNotHasPrefix + case "hassuffix": + wc.Operation = StringHasSuffix + case "nhassuffix": + wc.Operation = StringNotHasSuffix default: return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) } @@ -156,6 +170,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { default: logger.Error("Unknown float operation", lValue, wc.Operation, rValue) } + return false } @@ -169,8 +184,17 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { return strings.Contains(lValue, rValue) case StringNotContains: return !strings.Contains(lValue, rValue) + case StringHasPrefix: + return strings.HasPrefix(lValue, rValue) + case StringNotHasPrefix: + return !strings.HasPrefix(lValue, rValue) + case StringHasSuffix: + return strings.HasSuffix(lValue, rValue) + case StringNotHasSuffix: + return !strings.HasSuffix(lValue, rValue) default: logger.Error("Unknown string operation", lValue, wc.Operation, rValue) } + return false } diff --git a/internal/omode/mode.go b/internal/omode/mode.go index e29aacc..1aafcfc 100644 --- a/internal/omode/mode.go +++ b/internal/omode/mode.go @@ -12,7 +12,6 @@ const ( GrepClient Mode = iota MapClient Mode = iota HealthClient Mode = iota - RunClient Mode = iota ) func (m Mode) String() string { @@ -29,8 +28,6 @@ func (m Mode) String() string { return "map" case HealthClient: return "health" - case RunClient: - return "run" default: return "unknown" } diff --git a/internal/regex/flag.go b/internal/regex/flag.go index d3ff712..396bda0 100644 --- a/internal/regex/flag.go +++ b/internal/regex/flag.go @@ -2,6 +2,7 @@ package regex import "fmt" +// Flag for regex. type Flag int const ( @@ -15,6 +16,7 @@ const ( Noop Flag = iota ) +// NewFlag returns a new regex flag. func NewFlag(str string) (Flag, error) { switch str { case "default": diff --git a/internal/regex/regex.go b/internal/regex/regex.go index f668c38..2561659 100644 --- a/internal/regex/regex.go +++ b/internal/regex/regex.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" ) +// Regex for filtering lines. type Regex struct { // The original regex string regexStr string @@ -24,6 +25,7 @@ func (r Regex) String() string { r.regexStr, r.flags, r.initialized, r.re == nil) } +// NewNoop is a noop regex (doing nothing). func NewNoop() Regex { return Regex{ flags: []Flag{Noop}, @@ -31,6 +33,7 @@ func NewNoop() Regex { } } +// New returns a new regex object. func New(regexStr string, flag Flag) (Regex, error) { if regexStr == "" || regexStr == "." || regexStr == ".*" { return NewNoop(), nil @@ -59,6 +62,7 @@ func new(regexStr string, flags []Flag) (Regex, error) { return r, nil } +// Match a byte string. func (r Regex) Match(bytes []byte) bool { switch r.flags[0] { case Default: @@ -72,6 +76,7 @@ func (r Regex) Match(bytes []byte) bool { } } +// MatchString matches a string. func (r Regex) MatchString(str string) bool { switch r.flags[0] { case Default: @@ -85,6 +90,7 @@ func (r Regex) MatchString(str string) bool { } } +// Serialize the regex. func (r Regex) Serialize() string { var flags []string for _, flag := range r.flags { @@ -98,6 +104,7 @@ func (r Regex) Serialize() string { return fmt.Sprintf("regex:%s %s", strings.Join(flags, ","), r.regexStr) } +// Deserialize the regex. func Deserialize(str string) (Regex, error) { // Get regex string s := strings.SplitN(str, " ", 2) diff --git a/internal/server/continuous.go b/internal/server/continuous.go index 583d136..f75c732 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -92,7 +92,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { } logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx, make(chan struct{})) + status := client.Start(jobCtx, make(chan string)) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go index 9a8eb75..1e17c78 100644 --- a/internal/server/handlers/controlhandler.go +++ b/internal/server/handlers/controlhandler.go @@ -41,10 +41,12 @@ func NewControlHandler(user *user.User) *ControlHandler { return &h } +// Shutdown the handler. func (h *ControlHandler) Shutdown() { h.done.Shutdown() } +// Done channel of the handler. func (h *ControlHandler) Done() <-chan struct{} { return h.done.Done() } @@ -90,9 +92,7 @@ func (h *ControlHandler) handleCommand(command string) { case "health": h.serverMessages <- "OK: DTail SSH Server seems fine" h.serverMessages <- "done;" - case "debug": - h.serverMessages <- logger.Debug(h.user, "Receiving debug command", command, s) default: - h.serverMessages <- logger.Warn(h.user, "Received unknown control command", command, s) + h.serverMessages <- logger.Error(h.user, "Received unknown control command", command, s) } } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 0f9207d..5bab26f 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -25,37 +25,29 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string) { +func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - logger.Error(err) r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err)) return } re = deserializedRegex } if argc < 3 { - r.server.sendServerMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } - r.readGlob(ctx, args[1], re) + r.readGlob(ctx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) { +func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) - maxRetries := 10 - for { - maxRetries-- - if maxRetries < 0 { - r.server.sendServerMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) - return - } - + for retryCount := 0; retryCount < retries; retryCount++ { paths, err := filepath.Glob(glob) if err != nil { logger.Warn(r.server.user, glob, err) @@ -65,7 +57,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) if numPaths := len(paths); numPaths == 0 { logger.Error(r.server.user, "No such file(s) to read", glob) - r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -76,8 +68,11 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) } r.readFiles(ctx, paths, glob, re, retryInterval) - break + return } + + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + return } func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { @@ -97,7 +92,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr if !r.server.user.HasFilePermission(path, "readfiles") { logger.Error(r.server.user, "No permission to read file", path, globID) - r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) return } @@ -161,6 +156,6 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.sendServerMessage(logger.Error("Empty file path given?", path, glob)) + r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob)) return "" } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index db917bd..185e7c2 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -43,6 +43,7 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 + quiet bool } // NewServerHandler returns the server handler. @@ -70,10 +71,12 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } +// Shutdown the handler. func (h *ServerHandler) Shutdown() { h.done.Shutdown() } +// Done channel of the handler. func (h *ServerHandler) Done() <-chan struct{} { return h.done.Done() } @@ -243,13 +246,19 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() return } + if quiet, ok := options["quiet"]; ok { + if quiet == "true" { + logger.Debug(h.user, "Enabling quiet mode") + h.quiet = true + } + } switch commandName { case "grep", "cat": command := newReadCommand(h, omode.CatClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args) + command.Start(ctx, argc, args, 1) readerFinished() commandFinished() }() @@ -258,7 +267,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.TailClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args) + command.Start(ctx, argc, args, 10) readerFinished() commandFinished() }() @@ -290,7 +299,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] func (h *ServerHandler) handleAckCommand(argc int, args []string) { if argc < 3 { - h.sendServerMessage(logger.Warn(h.user, commandParseWarning, args, argc)) + h.sendServerWarnMessage(logger.Warn(h.user, commandParseWarning, args, argc)) return } if args[1] == "close" && args[2] == "connection" { @@ -309,6 +318,13 @@ func (h *ServerHandler) sendServerMessage(message string) { h.send(h.serverMessageC(), message) } +func (h *ServerHandler) sendServerWarnMessage(message string) { + if h.quiet { + return + } + h.send(h.serverMessageC(), message) +} + func (h *ServerHandler) serverMessageC() chan<- string { return h.serverMessages } diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 9d76a3b..a1e9e36 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -93,7 +93,7 @@ func (s *scheduler) runJobs(ctx context.Context) { defer cancel() logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx, make(chan struct{})) + status := client.Start(jobCtx, make(chan string)) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { diff --git a/internal/server/server.go b/internal/server/server.go index d8d43c9..a20737e 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -75,9 +75,7 @@ func (s *Server) Start(ctx context.Context) int { go s.cont.start(ctx) go s.listenerLoop(ctx, listener) - select { - case <-ctx.Done(): - } + <-ctx.Done() // For future use. return 0 diff --git a/internal/user/server/user.go b/internal/user/server/user.go index c4e8b7b..637945c 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -40,6 +40,10 @@ func (u *User) String() string { // HasFilePermission is used to determine whether user is alowed to read a file. func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission bool) { logger.Debug(u, filePath, permissionType, "Checking config permissions") + if config.ServerRelaxedAuthEnable { + logger.Fatal(u, filePath, permissionType, "Server releaxed auth enabled") + return true + } if u.Name == config.ScheduleUser || u.Name == config.ContinuousUser { // Background user has same permissions as dtail process itself. diff --git a/internal/version/version.go b/internal/version/version.go index 36ef62c..a64417f 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -11,9 +11,9 @@ const ( // Name of DTail. Name string = "DTail" // Version of DTail. - Version string = "3.1.0" + Version string = "3.2.0" // Additional information for DTail - Additional string = "develop" + Additional string = "develop-3" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) diff --git a/samples/dtail.json.sample b/samples/dtail.json.sample index fa799ce..33d445f 100644 --- a/samples/dtail.json.sample +++ b/samples/dtail.json.sample @@ -10,19 +10,16 @@ "HostKeyBits" : 2048, "Permissions": { "Default": [ - "readfiles:^/.*$", - "runcommands:^/.*$" + "readfiles:^/.*$" ], "Users": { "pbuetow": [ - "readfiles:^/.*$", - "runcommands:^/.*$" + "readfiles:^/.*$" ], "jblake": [ "readfiles:^/tmp/foo.log$", "readfiles:^/.*$", - "readfiles:!^/tmp/bar.log$", - "runcommands:!^/.*$" + "readfiles:!^/tmp/bar.log$" ] } } |
