From 799b9b69ba08b898e13026b7ecab9f9f58580a82 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 8 Dec 2020 14:49:41 +0000 Subject: merge develop --- internal/clients/baseclient.go | 4 +- internal/clients/client.go | 2 +- internal/clients/handlers/basehandler.go | 23 +++- internal/clients/handlers/clienthandler.go | 5 +- internal/clients/handlers/handler.go | 3 +- internal/clients/handlers/healthhandler.go | 19 ++- internal/clients/handlers/maprhandler.go | 5 +- internal/clients/healthclient.go | 2 +- internal/clients/maprclient.go | 4 +- internal/clients/remote/connection.go | 7 +- internal/clients/stats.go | 42 +++++-- internal/config/config.go | 3 + internal/config/server.go | 1 + internal/io/logger/logger.go | 2 +- internal/io/signal/signal.go | 29 +++-- internal/mapr/server/aggregate.go | 81 +++++++------ internal/regex/flag.go | 2 + internal/regex/regex.go | 11 ++ internal/server/continuous.go | 2 +- internal/server/handlers/controlhandler.go | 28 +++-- internal/server/handlers/handler.go | 2 + internal/server/handlers/serverhandler.go | 182 +++++++---------------------- internal/server/scheduler.go | 2 +- internal/server/server.go | 44 +++---- internal/version/version.go | 2 +- 25 files changed, 241 insertions(+), 266 deletions(-) (limited to 'internal') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 008a01e..69055a3 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -66,7 +66,7 @@ func (c *baseClient) makeConnections(maker maker) { c.stats = newTailStats(len(c.connections)) } -func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. @@ -99,7 +99,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con defer func() { <-active }() for { - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) defer cancel() conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh) diff --git a/internal/clients/client.go b/internal/clients/client.go index eb8452d..4a547e8 100644 --- a/internal/clients/client.go +++ b/internal/clients/client.go @@ -4,5 +4,5 @@ import "context" // Client is the interface for the end user command line client. type Client interface { - Start(ctx context.Context, statsCh <-chan struct{}) int + Start(ctx context.Context, statsCh <-chan string) int } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 65bbfd7..b5045e2 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -8,12 +8,13 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/version" ) type baseHandler struct { - withCancel + done *internal.Done server string shellStarted bool commands chan string @@ -29,6 +30,14 @@ func (h *baseHandler) Status() int { return h.status } +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -38,7 +47,8 @@ func (h *baseHandler) SendMessage(command string) error { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) - case <-h.ctx.Done(): + case <-h.Done(): + return nil } return nil @@ -65,7 +75,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { select { case command := <-h.commands: n = copy(p, []byte(command)) - case <-h.ctx.Done(): + case <-h.Done(): return 0, io.EOF } return @@ -95,10 +105,11 @@ func (h *baseHandler) handleHiddenMessage(message string) { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") select { - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): logger.Debug("Shutting down client after timeout and sending ack to server") - h.withCancel.shutdown() - case <-h.ctx.Done(): + h.Shutdown() + case <-h.Done(): + return } case strings.HasPrefix(message, ".run exitstatus"): diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index fcd8052..2bcb038 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -1,6 +1,7 @@ package handlers import ( + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" ) @@ -19,9 +20,7 @@ func NewClientHandler(server string) *ClientHandler { shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), }, } } diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go index c53ca34..afa87e2 100644 --- a/internal/clients/handlers/handler.go +++ b/internal/clients/handlers/handler.go @@ -1,7 +1,6 @@ package handlers import ( - "context" "io" ) @@ -11,6 +10,6 @@ type Handler interface { SendMessage(command string) error Server() string Status() int - WithCancel(ctx context.Context) (context.Context, context.CancelFunc) + Shutdown() Done() <-chan struct{} } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 9051015..08ed137 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -4,11 +4,13 @@ import ( "errors" "fmt" "time" + + "github.com/mimecast/dtail/internal" ) // HealthHandler implements the handler required for health checks. type HealthHandler struct { - withCancel + done *internal.Done // Buffer of incoming data from server. receiveBuf []byte // To send commands to the server. @@ -27,9 +29,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler { receive: receive, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), } return &h @@ -45,12 +45,23 @@ func (h *HealthHandler) Status() int { return h.status } +// Done returns done channel of the handler. +func (h *HealthHandler) Done() <-chan struct{} { + return h.done.Done() +} + +// Shutdown the handler. +func (h *HealthHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage sends a DTail command to the server. func (h *HealthHandler) SendMessage(command string) error { select { case h.commands <- fmt.Sprintf("%s;", command): case <-time.NewTimer(time.Second * 10).C: return errors.New("Timed out sending command " + command) + case <-h.Done(): } return nil diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index b908f3b..fb71c8f 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -3,6 +3,7 @@ package handlers import ( "strings" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" @@ -24,9 +25,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), }, query: query, aggregate: client.NewAggregate(server, query, globalGroup), diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 7313583..e93f6be 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -50,7 +50,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) { conn.Handler = handlers.NewHealthHandler(c.server, receive) conn.Commands = []string{c.mode.String()} - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) go conn.Start(connCtx, cancel, throttleCh, statsCh) for { diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 581db44..6aadd6b 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -94,7 +94,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* } // Start starts the mapreduce client. -func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) { go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx, statsCh) @@ -123,7 +123,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) } return diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go index 2d97d14..b29ffed 100644 --- a/internal/clients/remote/connection.go +++ b/internal/clients/remote/connection.go @@ -177,21 +177,21 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } go func() { - defer cancel() io.Copy(stdinPipe, c.Handler) + cancel() }() go func() { - defer cancel() io.Copy(c.Handler, stdoutPipe) + cancel() }() go func() { - defer cancel() select { case <-c.Handler.Done(): case <-ctx.Done(): } + cancel() }() // Send all commands to client. @@ -207,5 +207,6 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } <-ctx.Done() + c.Handler.Shutdown() return nil } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index a6ac0c5..17343b5 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "runtime" + "strings" "sync" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" ) @@ -32,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) { var connectedLast int for { var force bool + var messages []string select { - case <-statsCh: + case message := <-statsCh: + messages = append(messages, message) force = true - case <-time.After(time.Second * 2): + case <-time.After(time.Second * 10): case <-ctx.Done(): return } @@ -54,7 +58,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) if connected == connectedLast && !force { continue } - s.log(connected, newConnections, throttle) + + stats := s.statsLine(connected, newConnections, throttle) + switch force { + case true: + messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) + s.printStatsOnInterrupt(messages) + default: + logger.Info(stats) + } connectedLast = connected s.mutex.Lock() @@ -63,15 +75,25 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) } } -func (s *stats) log(connected, newConnections int, throttle int) { +func (s *stats) printStatsOnInterrupt(messages []string) { + logger.Pause() + for _, message := range messages { + fmt.Println(fmt.Sprintf(" %s", message)) + } + time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) + logger.Resume() +} + +func (s *stats) statsLine(connected, newConnections int, throttle int) string { percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) - connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)) - newConnStr := fmt.Sprintf("new=%d", newConnections) - throttleStr := fmt.Sprintf("throttle=%d", throttle) - cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()) + var stats []string + stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))) + stats = append(stats, fmt.Sprintf("new=%d", newConnections)) + stats = append(stats, fmt.Sprintf("throttle=%d", throttle)) + stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())) - logger.Info("stats", connectedStr, newConnStr, throttleStr, cpusGoroutinesStr) + return strings.Join(stats, "|") } func (s *stats) numConnected() int { diff --git a/internal/config/config.go b/internal/config/config.go index dc96d6b..276ddcf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,9 @@ const ScheduleUser string = "DTAIL-SCHEDULE" // ContinuousUser is used for non-interactive continuous mapreduce queries. const ContinuousUser string = "DTAIL-CONTINUOUS" +// InterruptTimeoutS is used to terminate DTail when Ctrl+C was pressed twice within a given interval. +const InterruptTimeoutS int = 3 + // Client holds a DTail client configuration. var Client *ClientConfig diff --git a/internal/config/server.go b/internal/config/server.go index db12cec..dc0d587 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -61,6 +61,7 @@ type ServerConfig struct { Continuous []Continuous `json:",omitempty"` } +// ServerRelaxedAuthEnable should be used for development and testing purposes only. var ServerRelaxedAuthEnable bool // Create a new default server configuration. diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index d059cbb..b7db0a7 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -224,7 +224,7 @@ func log(what string, severity string, args []interface{}) string { return "" } - messages := []string{severity} + messages := []string{} for _, arg := range args { switch v := arg.(type) { diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index bca7e6e..500c530 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -5,24 +5,37 @@ import ( "os" gosignal "os/signal" "syscall" + "time" + + "github.com/mimecast/dtail/internal/config" ) -// StatsCh returns a channel for "please print stats" signalling. -func StatsCh(ctx context.Context) <-chan struct{} { - sigCh := make(chan os.Signal) - gosignal.Notify(sigCh, syscall.SIGINFO, syscall.SIGUSR1) +// InterruptCh returns a channel for "please print stats" signalling. +func InterruptCh(ctx context.Context) <-chan string { + sigIntCh := make(chan os.Signal) + gosignal.Notify(sigIntCh, os.Interrupt) + + sigOtherCh := make(chan os.Signal) + gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) - statsCh := make(chan struct{}) + statsCh := make(chan string) go func() { for { select { - case <-sigCh: + case <-sigIntCh: select { - case statsCh <- struct{}{}: + case statsCh <- "Hint: Hit Ctrl+C again to exit": + select { + case <-sigIntCh: + os.Exit(0) + case <-time.After(time.Second * time.Duration(config.InterruptTimeoutS)): + } default: - // Stats currently already printed. + // Stats already printed. } + case <-sigOtherCh: + os.Exit(0) case <-ctx.Done(): return } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 1028943..28bb074 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" @@ -15,6 +16,7 @@ import ( // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { + done *internal.Done // Log lines to process (parsing MAPREDUCE lines). Lines chan line.Line // Hostname of the current server (used to populate $hostname field). @@ -23,12 +25,12 @@ type Aggregate struct { serialize chan struct{} // Signals to flush data. flush chan struct{} + // Signals that data has been flushed + flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser parser *logformat.Parser - cancel context.CancelFunc - ctx context.Context } // NewAggregate return a new server side aggregator. @@ -64,56 +66,64 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } } - ctx, cancel := context.WithCancel(context.Background()) - a := Aggregate{ + done: internal.NewDone(), Lines: make(chan line.Line, 100), serialize: make(chan struct{}), flush: make(chan struct{}), + flushed: make(chan struct{}), hostname: s[0], query: query, parser: logParser, - ctx: ctx, - cancel: cancel, } return &a, nil } +// Shutdown the aggregation engine. +func (a *Aggregate) Shutdown() { + a.Flush() + a.done.Shutdown() +} + // Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - defer a.cancel() - fieldsCh := a.linesToFields(ctx) + myCtx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + select { + case <-myCtx.Done(): + a.done.Shutdown() + case <-a.done.Done(): + cancel() + } + }() + + fieldsCh := a.makeFields(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addMoreFields(ctx, fieldsCh) + fieldsCh = a.addFields(myCtx, fieldsCh) } - go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) - a.periodicAggregateTimer(ctx) + go a.aggregateTimer(myCtx) + a.makeMaprLines(myCtx, fieldsCh, maprLines) } -// Cancel the aggregation. -func (a *Aggregate) Cancel() { - a.cancel() -} - -func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { +func (a *Aggregate) aggregateTimer(ctx context.Context) { for { select { case <-time.After(a.query.Interval): a.Serialize(ctx) case <-ctx.Done(): return - case <-a.ctx.Done(): - return } } } -func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { +func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { ch := make(chan map[string]string) go func() { @@ -144,8 +154,6 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } case <-ctx.Done(): return - case <-a.ctx.Done(): - return } } }() @@ -153,14 +161,14 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string return ch } -func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { +func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { ch := make(chan map[string]string) go func() { defer close(ch) for { - // fieldsCh will be closed via 'linesToFields' if ctx is done + // fieldsCh will be closed via 'makeFields' if ctx is done fields, ok := <-fieldsCh if !ok { return @@ -179,7 +187,7 @@ func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[strin return ch } -func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { +func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { group := mapr.NewGroupSet() serialize := func() { @@ -200,18 +208,10 @@ func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[s case <-a.serialize: serialize() case <-a.flush: - logger.Info("Flushing mapreduce result") serialize() - a.flush <- struct{}{} - logger.Info("Done flushing mapreduce result") + a.flushed <- struct{}{} case <-ctx.Done(): return - case <-a.ctx.Done(): - logger.Info("Flushing mapreduce result") - serialize() - a.flush <- struct{}{} - logger.Info("Done flushing mapreduce result") - return } } } @@ -254,6 +254,8 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: + case <-time.After(time.Minute): + logger.Warn("Starting to serialize mapredice data takes over a minute") case <-ctx.Done(): } } @@ -261,15 +263,20 @@ func (a *Aggregate) Serialize(ctx context.Context) { // Flush all data. func (a *Aggregate) Flush() { select { - case <-a.ctx.Done(): - return case a.flush <- struct{}{}: + logger.Info("Flushing mapreduce data") case <-time.After(time.Minute): + logger.Warn("Starting to flush mapreduce data takes over a minute") + return + case <-a.done.Done(): return } select { - case <-a.flush: + case <-a.flushed: + logger.Info("Done flushing") case <-time.After(time.Minute): + logger.Warn("Waiting for data to be flushed takes over a minute") + case <-a.done.Done(): } } diff --git a/internal/regex/flag.go b/internal/regex/flag.go index d3ff712..396bda0 100644 --- a/internal/regex/flag.go +++ b/internal/regex/flag.go @@ -2,6 +2,7 @@ package regex import "fmt" +// Flag for regex. type Flag int const ( @@ -15,6 +16,7 @@ const ( Noop Flag = iota ) +// NewFlag returns a new regex flag. func NewFlag(str string) (Flag, error) { switch str { case "default": diff --git a/internal/regex/regex.go b/internal/regex/regex.go index 707cb48..2561659 100644 --- a/internal/regex/regex.go +++ b/internal/regex/regex.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" ) +// Regex for filtering lines. type Regex struct { // The original regex string regexStr string @@ -24,6 +25,7 @@ func (r Regex) String() string { r.regexStr, r.flags, r.initialized, r.re == nil) } +// NewNoop is a noop regex (doing nothing). func NewNoop() Regex { return Regex{ flags: []Flag{Noop}, @@ -31,6 +33,7 @@ func NewNoop() Regex { } } +// New returns a new regex object. func New(regexStr string, flag Flag) (Regex, error) { if regexStr == "" || regexStr == "." || regexStr == ".*" { return NewNoop(), nil @@ -39,6 +42,10 @@ func New(regexStr string, flag Flag) (Regex, error) { } func new(regexStr string, flags []Flag) (Regex, error) { + if len(flags) == 0 { + flags = append(flags, Default) + } + r := Regex{ regexStr: regexStr, flags: flags, @@ -55,6 +62,7 @@ func new(regexStr string, flags []Flag) (Regex, error) { return r, nil } +// Match a byte string. func (r Regex) Match(bytes []byte) bool { switch r.flags[0] { case Default: @@ -68,6 +76,7 @@ func (r Regex) Match(bytes []byte) bool { } } +// MatchString matches a string. func (r Regex) MatchString(str string) bool { switch r.flags[0] { case Default: @@ -81,6 +90,7 @@ func (r Regex) MatchString(str string) bool { } } +// Serialize the regex. func (r Regex) Serialize() string { var flags []string for _, flag := range r.flags { @@ -94,6 +104,7 @@ func (r Regex) Serialize() string { return fmt.Sprintf("regex:%s %s", strings.Join(flags, ","), r.regexStr) } +// Deserialize the regex. func Deserialize(str string) (Regex, error) { // Get regex string s := strings.SplitN(str, " ", 2) diff --git a/internal/server/continuous.go b/internal/server/continuous.go index 583d136..f75c732 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -92,7 +92,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { } logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx, make(chan struct{})) + status := client.Start(jobCtx, make(chan string)) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go index daa9835..8cc5a40 100644 --- a/internal/server/handlers/controlhandler.go +++ b/internal/server/handlers/controlhandler.go @@ -1,20 +1,19 @@ package handlers import ( - "context" "fmt" "io" "os" "strings" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" user "github.com/mimecast/dtail/internal/user/server" ) // ControlHandler is used for control functions and health monitoring. type ControlHandler struct { - ctx context.Context - done chan struct{} + done *internal.Done hostname string payload []byte serverMessages chan string @@ -22,12 +21,11 @@ type ControlHandler struct { } // NewControlHandler returns a new control handler. -func NewControlHandler(ctx context.Context, user *user.User) (*ControlHandler, <-chan struct{}) { +func NewControlHandler(user *user.User) *ControlHandler { logger.Debug(user, "Creating control handler") h := ControlHandler{ - ctx: ctx, - done: make(chan struct{}), + done: internal.NewDone(), serverMessages: make(chan string, 10), user: user, } @@ -40,7 +38,17 @@ func NewControlHandler(ctx context.Context, user *user.User) (*ControlHandler, < s := strings.Split(fqdn, ".") h.hostname = s[0] - return &h, h.done + return &h +} + +// Shutdown the handler. +func (h *ControlHandler) Shutdown() { + h.done.Shutdown() +} + +// Done channel of the handler. +func (h *ControlHandler) Done() <-chan struct{} { + return h.done.Done() } // Read is to send data to the client via the Reader interface. @@ -51,7 +59,7 @@ func (h *ControlHandler) Read(p []byte) (n int, err error) { wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) n = copy(p, wholePayload) return - case <-h.ctx.Done(): + case <-h.done.Done(): return 0, io.EOF } } @@ -63,7 +71,7 @@ func (h *ControlHandler) Write(p []byte) (n int, err error) { switch c { case ';': wholePayload := strings.TrimSpace(string(h.payload)) - h.handleCommand(h.ctx, wholePayload) + h.handleCommand(wholePayload) h.payload = nil default: @@ -75,7 +83,7 @@ func (h *ControlHandler) Write(p []byte) (n int, err error) { return } -func (h *ControlHandler) handleCommand(ctx context.Context, command string) { +func (h *ControlHandler) handleCommand(command string) { logger.Info(h.user, command) s := strings.Split(command, " ") logger.Debug(h.user, "Receiving command", command, s) diff --git a/internal/server/handlers/handler.go b/internal/server/handlers/handler.go index c42ceb9..b04e854 100644 --- a/internal/server/handlers/handler.go +++ b/internal/server/handlers/handler.go @@ -5,4 +5,6 @@ import "io" // Handler interface for server side functionality. type Handler interface { io.ReadWriter + Shutdown() + Done() <-chan struct{} } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 7017f3e..5cf8041 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -7,18 +7,16 @@ import ( "fmt" "io" "os" - "strconv" "strings" - "sync" "sync/atomic" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -31,33 +29,27 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - payload []byte - hostname string - user *user.User - // TODO: Move all these channels into a separate struct for readability! + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + aggregatedMessages chan string + serverMessages chan string + payload []byte + hostname string + user *user.User catLimiter chan struct{} tailLimiter chan struct{} globalServerWaitFor chan struct{} ackCloseReceived chan struct{} - serverCtx context.Context - handlerCtx context.Context - done chan struct{} activeCommands int32 activeReaders int32 - background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) (*ServerHandler, <-chan struct{}) { +func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { h := ServerHandler{ - serverCtx: serverCtx, - handlerCtx: handlerCtx, - done: make(chan struct{}), + done: internal.NewDone(), lines: make(chan line.Line, 100), serverMessages: make(chan string, 10), aggregatedMessages: make(chan string, 10), @@ -67,7 +59,6 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background, } fqdn, err := os.Hostname() @@ -78,7 +69,17 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca s := strings.Split(fqdn, ".") h.hostname = s[0] - return &h, h.done + return &h +} + +// Shutdown the handler. +func (h *ServerHandler) Shutdown() { + h.done.Shutdown() +} + +// Done channel of the handler. +func (h *ServerHandler) Done() <-chan struct{} { + return h.done.Done() } // Read is to send data to the dtail client via Reader interface. @@ -120,7 +121,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case <-time.After(time.Second): // Once in a while check whether we are done. select { - case <-h.handlerCtx.Done(): + case <-h.done.Done(): return 0, io.EOF default: } @@ -134,7 +135,7 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { switch c { case ';': commandStr := strings.TrimSpace(string(h.payload)) - h.handleCommand(h.handlerCtx, commandStr) + h.handleCommand(commandStr) h.payload = nil default: h.payload = append(h.payload, c) @@ -145,9 +146,9 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { return } -func (h *ServerHandler) handleCommand(ctx context.Context, commandStr string) { +func (h *ServerHandler) handleCommand(commandStr string) { logger.Debug(h.user, commandStr) - var timeout time.Duration + ctx := context.Background() args, argc, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) if err != nil { @@ -161,30 +162,18 @@ func (h *ServerHandler) handleCommand(ctx context.Context, commandStr string) { return } - args, argc, timeout, err = h.handleTimeout(args, argc) - if err != nil { - h.send(h.serverMessages, logger.Error(h.user, err)) - return - } - if h.user.Name == config.ControlUser { h.handleControlCommand(argc, args) return } - if timeout > 0 { - logger.Info(h.user, "Command with timeout context", argc, args, timeout) - commandCtx, cancel := context.WithTimeout(ctx, timeout) - go func() { - <-commandCtx.Done() - logger.Info(h.user, "Command timed out, canceling it", args, args, timeout) - cancel() - }() - h.handleUserCommand(commandCtx, argc, args, timeout) - return - } + ctx, cancel := context.WithCancel(ctx) + go func() { + <-h.done.Done() + cancel() + }() - h.handleUserCommand(ctx, argc, args, timeout) + h.handleUserCommand(ctx, argc, args) } func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, error) { @@ -222,16 +211,6 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er return args, argc, nil } -func (h *ServerHandler) handleTimeout(args []string, argc int) ([]string, int, time.Duration, error) { - if argc <= 2 || args[0] != "timeout" { - // No timeout specified - return args, argc, time.Duration(0) * time.Second, nil - } - - timeout, err := strconv.Atoi(args[1]) - return args[2:], argc - 2, time.Duration(timeout) * time.Second, err -} - func (h *ServerHandler) handleControlCommand(argc int, args []string) { switch args[0] { case "debug": @@ -241,7 +220,7 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) { } } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, timeout time.Duration) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { logger.Debug(h.user, "handleUserCommand", argc, args) h.incrementActiveCommands() @@ -255,7 +234,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] if h.aggregate == nil { return } - h.aggregate.Cancel() + h.aggregate.Shutdown() } } @@ -303,86 +282,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() }() - case "run": - // TODO: Refactor this "run" case, move code to runcommand.go - command := newRunCommand(h) - - jobName, _ := options["jobName"] - logger.Debug(h.user, "run", options) - - if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") { - if err := h.background.Cancel(h.user.Name, jobName); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - } else { - h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) - } - commandFinished() - return - } - - if val, ok := options["background"]; ok && val == "list" { - h.sendServerMessage("Listing jobs") - count := 0 - for jobName := range h.background.ListJobsC(h.user.Name) { - h.sendServerMessage(jobName) - count++ - } - h.sendServerMessage(fmt.Sprintf("Found %d jobs", count)) - commandFinished() - return - } - - str, _ := options["outerArgs"] - outerArgs := strings.Split(str, " ") - - var background bool - if val, ok := options["background"]; ok && val == "start" { - background = true - } - - var wg sync.WaitGroup - wg.Add(1) - - if background { - if timeout == 0 { - // Set default background timeout. - timeout = time.Hour * 1 - } - // Use a new context based on the server context, so that background job does not get - // terminated when handler/SSH connection terminates. - commandCtx, cancel := context.WithTimeout(h.serverCtx, timeout) - - if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - commandFinished() - return - } - ctx = commandCtx - } - - if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil { - h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err)) - commandFinished() - return - } - - // Make sure that server waits for all sub-processes to finish on shutdown - go func() { h.globalServerWaitFor <- struct{}{} }() - go func() { - wg.Wait() - <-h.globalServerWaitFor - }() - - if background { - h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) - commandFinished() - return - } - - // Command run in foreground, wait for it to complete before finishing the connection. - wg.Wait() - commandFinished() - case "ack", ".ack": h.handleAckCommand(argc, args) commandFinished() @@ -406,7 +305,7 @@ func (h *ServerHandler) handleAckCommand(argc int, args []string) { func (h *ServerHandler) send(ch chan<- string, message string) { select { case ch <- message: - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } } @@ -447,7 +346,7 @@ func (h *ServerHandler) shutdown() { go func() { select { case h.serverMessageC() <- ".syn close connection": - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } }() @@ -455,13 +354,10 @@ func (h *ServerHandler) shutdown() { case <-h.ackCloseReceived: case <-time.After(time.Second * 5): logger.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } - select { - case h.done <- struct{}{}: - default: - } + h.done.Shutdown() } func (h *ServerHandler) incrementActiveCommands() { diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 9d76a3b..a1e9e36 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -93,7 +93,7 @@ func (s *scheduler) runJobs(ctx context.Context) { defer cancel() logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx, make(chan struct{})) + status := client.Start(jobCtx, make(chan string)) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { diff --git a/internal/server/server.go b/internal/server/server.go index a446738..31fa85d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -35,9 +34,8 @@ type Server struct { // Mointor log files for pattern (if configured) cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. + // TODO: Remove this counter. shutdownWaitFor chan struct{} - // Background jobs - background background.Background } // New returns a new server. @@ -51,7 +49,6 @@ func New() *Server { shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), - background: background.New(), } s.sshServerConfig.PasswordCallback = s.Callback @@ -178,53 +175,46 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch switch req.Type { case "shell": - handlerCtx, cancel := context.WithCancel(ctx) - var handler handlers.Handler - var done <-chan struct{} - switch user.Name { case config.ControlUser: - handler, done = handlers.NewControlHandler(handlerCtx, user) + handler = handlers.NewControlHandler(user) default: - handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) } - go func() { - // Handler finished work, cancel all remaining routines - defer cancel() - - <-done - }() + terminate := func() { + handler.Shutdown() + sshConn.Close() + } go func() { // Broken pipe, cancel - defer cancel() - io.Copy(channel, handler) + terminate() }() go func() { // Broken pipe, cancel - defer cancel() - io.Copy(handler, channel) + terminate() }() go func() { - defer cancel() + select { + case <-ctx.Done(): + case <-handler.Done(): + } + terminate() + }() + go func() { if err := sshConn.Wait(); err != nil && err != io.EOF { logger.Error(user, err) } s.stats.decrementConnections() logger.Info(user, "Good bye Mister!") - }() - - go func() { - <-handlerCtx.Done() - sshConn.Close() - logger.Info(user, "Closed SSH connection") + terminate() }() // Only serving shell type diff --git a/internal/version/version.go b/internal/version/version.go index 36ef62c..b513b40 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -13,7 +13,7 @@ const ( // Version of DTail. Version string = "3.1.0" // Additional information for DTail - Additional string = "develop" + Additional string = "" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) -- cgit v1.2.3 From e470159911c0b218ddb760fcb253c2b3523a0d27 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 8 Dec 2020 15:16:12 +0000 Subject: add done --- internal/done.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 internal/done.go (limited to 'internal') diff --git a/internal/done.go b/internal/done.go new file mode 100644 index 0000000..54e5e8e --- /dev/null +++ b/internal/done.go @@ -0,0 +1,36 @@ +package internal + +import ( + "sync" +) + +// Done is a cleanup/shutdown helper. +type Done struct { + ch chan struct{} + mutex sync.Mutex +} + +// NewDone returns a new cleanup/shutdown helper. +func NewDone() *Done { + return &Done{ + ch: make(chan struct{}), + } +} + +// Done returns the done channel (closed when done) +func (d *Done) Done() <-chan struct{} { + return d.ch +} + +// Shutdown closes the done channel. It can be called multiple times. +func (d *Done) Shutdown() { + d.mutex.Lock() + defer d.mutex.Unlock() + + select { + case <-d.ch: + return + default: + close(d.ch) + } +} -- cgit v1.2.3 From 44041db5db3d630413067606ae2402d10a61774e Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 10 Dec 2020 14:37:19 +0000 Subject: add hasprefix, nhasprefix, hassuffix, nhassuffix operation support to where clause --- internal/mapr/wherecondition.go | 22 ++++++++++++++++++++++ internal/version/version.go | 4 ++-- 2 files changed, 24 insertions(+), 2 deletions(-) (limited to 'internal') diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index ff1b489..70e9c32 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -19,6 +19,10 @@ const ( StringNe QueryOperation = iota StringContains QueryOperation = iota StringNotContains QueryOperation = iota + StringHasPrefix QueryOperation = iota + StringNotHasPrefix QueryOperation = iota + StringHasSuffix QueryOperation = iota + StringNotHasSuffix QueryOperation = iota FloatOperation QueryOperation = iota FloatEq QueryOperation = iota FloatNe QueryOperation = iota @@ -78,7 +82,17 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { case "contains": wc.Operation = StringContains case "lacks": + fallthrough + case "ncontains": wc.Operation = StringNotContains + case "hasprefix": + wc.Operation = StringHasPrefix + case "nhasprefix": + wc.Operation = StringNotHasPrefix + case "hassuffix": + wc.Operation = StringHasSuffix + case "nhassuffix": + wc.Operation = StringNotHasSuffix default: return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) } @@ -169,6 +183,14 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { return strings.Contains(lValue, rValue) case StringNotContains: return !strings.Contains(lValue, rValue) + case StringHasPrefix: + return strings.HasPrefix(lValue, rValue) + case StringNotHasPrefix: + return !strings.HasPrefix(lValue, rValue) + case StringHasSuffix: + return strings.HasSuffix(lValue, rValue) + case StringNotHasSuffix: + return !strings.HasSuffix(lValue, rValue) default: logger.Error("Unknown string operation", lValue, wc.Operation, rValue) } diff --git a/internal/version/version.go b/internal/version/version.go index b513b40..0c21e26 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -11,9 +11,9 @@ const ( // Name of DTail. Name string = "DTail" // Version of DTail. - Version string = "3.1.0" + Version string = "3.2.0" // Additional information for DTail - Additional string = "" + Additional string = "develop" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) -- cgit v1.2.3 From b4db37d8cbae8f0c3dec289b2e1b0cfe83731415 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 10 Dec 2020 15:08:56 +0000 Subject: bump verison --- internal/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal') diff --git a/internal/version/version.go b/internal/version/version.go index 0c21e26..4fa9890 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -13,7 +13,7 @@ const ( // Version of DTail. Version string = "3.2.0" // Additional information for DTail - Additional string = "develop" + Additional string = "develop2" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) -- cgit v1.2.3 From ab676c2b484225ed22765b23d8f0545088ecd610 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 26 Dec 2020 10:48:51 +0000 Subject: code cleanup and minor refactorings --- internal/clients/grepclient.go | 1 + internal/clients/handlers/basehandler.go | 18 +--- internal/clients/handlers/healthhandler.go | 1 + internal/clients/maker.go | 3 + internal/clients/runclient.go | 87 ----------------- internal/clients/stats.go | 7 +- internal/clients/tailclient.go | 1 + internal/color/colorfy.go | 12 +-- internal/config/common.go | 10 +- internal/discovery/comma.go | 3 +- internal/discovery/file.go | 3 +- internal/io/line/line.go | 6 +- internal/io/prompt/prompt.go | 6 +- internal/io/run/run.go | 150 ----------------------------- internal/mapr/funcs/function.go | 1 + internal/mapr/query.go | 6 -- internal/mapr/selectcondition.go | 1 + internal/mapr/token.go | 3 + internal/mapr/wherecondition.go | 2 + internal/omode/mode.go | 3 - internal/server/handlers/runcommand.go | 111 --------------------- internal/server/handlers/serverhandler.go | 50 +++++----- internal/server/server.go | 24 +---- 23 files changed, 68 insertions(+), 441 deletions(-) delete mode 100644 internal/clients/runclient.go delete mode 100644 internal/io/run/run.go delete mode 100644 internal/server/handlers/runcommand.go (limited to 'internal') diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 4024083..e6fc94a 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -44,5 +44,6 @@ func (c GrepClient) makeCommands() (commands []string) { for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) } + return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index b5045e2..f07fd90 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "fmt" "io" - "strconv" "strings" "time" @@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } + return } @@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { case <-h.Done(): return } - - case strings.HasPrefix(message, ".run exitstatus"): - splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") - if len(splitted) != 3 { - logger.Error("Unable to retrieve exitstatus", message) - return - } - i, err := strconv.Atoi(splitted[2]) - if err != nil { - logger.Error("Unable to retrieve exitstatus", message, err) - return - } - logger.Debug("Retrieved exitstatus", h.status) - if i > h.status { - h.status = i - } } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 08ed137..0440706 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,6 +19,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + // The return status. status int } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 1ba6482..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -4,6 +4,9 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" ) +// maker interface helps to re-use code in all DTail client implementations. +// All clients share the baseClient but have different connection handlers +// and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go deleted file mode 100644 index 5464d54..0000000 --- a/internal/clients/runclient.go +++ /dev/null @@ -1,87 +0,0 @@ -package clients - -import ( - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "fmt" - "runtime" - "strings" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/omode" -) - -// RunClient is a client to run various commands on the server. -type RunClient struct { - baseClient - jobName string - background string -} - -// NewRunClient returns a new run client to execute commands on the remote server. -func NewRunClient(args Args, background, jobName string) (*RunClient, error) { - args.Mode = omode.RunClient - - if jobName == "" { - jobName = hash(strings.Join(args.Arguments, " ")) - } - - c := RunClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, - jobName: jobName, - background: background, - } - - c.init() - c.makeConnections(c) - - return &c, nil -} - -func (c RunClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c RunClient) makeCommands() (commands []string) { - if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What)) - return - } - - commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What)) - return -} - -func (c RunClient) options() string { - var sb strings.Builder - - logger.Debug("options", fmt.Sprintf(":background=%s", c.background)) - sb.WriteString(fmt.Sprintf(":background=%s", c.background)) - - logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName)) - sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName)) - - if len(c.Arguments) > 0 { - logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " "))) - sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " ")))) - } - - return sb.String() -} - -func encode64(str string) string { - return base64.StdEncoding.EncodeToString([]byte(str)) -} - -func hash(str string) string { - h := sha256.New() - h.Write([]byte(str)) - - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 17343b5..2ec6f22 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -45,7 +45,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < case message := <-statsCh: messages = append(messages, message) force = true - case <-time.After(time.Second * 10): + case <-time.After(time.Second * 3): case <-ctx.Done(): return } @@ -63,7 +63,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < switch force { case true: messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) - s.printStatsOnInterrupt(messages) + s.printStatsDueInterrupt(messages) default: logger.Info(stats) } @@ -75,7 +75,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < } } -func (s *stats) printStatsOnInterrupt(messages []string) { +func (s *stats) printStatsDueInterrupt(messages []string) { logger.Pause() for _, message := range messages { fmt.Println(fmt.Sprintf(" %s", message)) @@ -107,5 +107,6 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } + return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 53b5ba4..ff2f46e 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) + return &c, nil } diff --git a/internal/color/colorfy.go b/internal/color/colorfy.go index 9ae46f5..a2beb7a 100644 --- a/internal/color/colorfy.go +++ b/internal/color/colorfy.go @@ -41,16 +41,14 @@ func paintClientStats(line string) string { // Colorfy a given line based on the line's content. func Colorfy(line string) string { - if strings.HasPrefix(line, "REMOTE") { + switch { + case strings.HasPrefix(line, "REMOTE"): return paintRemote(line) - } - if strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|") { + case strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|"): return paintClientStats(line) - } - if strings.Contains(line, "ERROR") { + case strings.Contains(line, "ERROR"): return Paint(Magenta, line) - } - if strings.Contains(line, "WARN") { + case strings.Contains(line, "WARN"): return Paint(Magenta, line) } diff --git a/internal/config/common.go b/internal/config/common.go index 103b390..c3e203e 100644 --- a/internal/config/common.go +++ b/internal/config/common.go @@ -2,10 +2,14 @@ package config // CommonConfig stores configuration keys shared by DTail server and client. type CommonConfig struct { - SSHPort int + // The SSH port number + SSHPort int + // Enable experimental features (mainly for dev purposes) ExperimentalFeaturesEnable bool `json:",omitempty"` - DebugEnable bool `json:",omitempty"` - TraceEnable bool `json:",omitempty"` + // Enable debug logging. Don't enable in production. + DebugEnable bool `json:",omitempty"` + // Enable trace logging. Don't enable in production. + TraceEnable bool `json:",omitempty"` // The log strategy to use, one of // stdout: only log to stdout (useful when used with systemd) // daily: create a log file for every day diff --git a/internal/discovery/comma.go b/internal/discovery/comma.go index 94276c7..4344240 100644 --- a/internal/discovery/comma.go +++ b/internal/discovery/comma.go @@ -1,8 +1,9 @@ package discovery import ( - "github.com/mimecast/dtail/internal/io/logger" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromCOMMA retrieves a list of servers from comma separated input list. diff --git a/internal/discovery/file.go b/internal/discovery/file.go index c04173e..1250755 100644 --- a/internal/discovery/file.go +++ b/internal/discovery/file.go @@ -2,8 +2,9 @@ package discovery import ( "bufio" - "github.com/mimecast/dtail/internal/io/logger" "os" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromFILE retrieves a list of servers from a file. diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 9db93c0..715be34 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -15,7 +15,11 @@ type Line struct { // lines if that happens but it will signal to the client how // many log lines in % could be transmitted to the client. TransmittedPerc int - SourceID string + // Contains the unique identifier of the source log file. + // It could be the name of the log or it could be one of the parent + // directories in case multiple log files with the same basename are + // followed. + SourceID string } // Return a human readable representation of the followed line. diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index a438d33..36ebdb5 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -3,9 +3,10 @@ package prompt import ( "bufio" "fmt" - "github.com/mimecast/dtail/internal/io/logger" "os" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // Answer is a user input of a prompt question. @@ -18,8 +19,7 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - - AskAgain bool + AskAgain bool } // Prompt used for interactive user input. diff --git a/internal/io/run/run.go b/internal/io/run/run.go deleted file mode 100644 index 2bb3756..0000000 --- a/internal/io/run/run.go +++ /dev/null @@ -1,150 +0,0 @@ -package run - -import ( - "bufio" - "context" - "io" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" -) - -// Run is for execute a command. -type Run struct { - command string - args []string - cmd *exec.Cmd - pgroupKilled chan struct{} -} - -// New returns a new command runner. -func New(command string, args []string) Run { - return Run{ - command: command, - args: args, - pgroupKilled: make(chan struct{}), - } -} - -// StartBackground starts running the command in background. -func (r Run) StartBackground(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, lines chan<- line.Line) (pid int, err error) { - pid = -1 - - if len(r.args) > 0 { - logger.Debug(r.command, r.args, " ") - r.cmd = exec.CommandContext(ctx, r.command, r.args...) - } else { - logger.Debug(r.command) - r.cmd = exec.CommandContext(ctx, r.command) - } - - // Create a new process group, so that kill() will only kill this command + pgroup. - r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdoutPipe, myErr := r.cmd.StdoutPipe() - if err != nil { - wg.Done() - err = myErr - return - } - - stderrPipe, myErr := r.cmd.StderrPipe() - if myErr != nil { - wg.Done() - err = myErr - return - } - - if myErr := r.cmd.Start(); err != nil { - wg.Done() - err = myErr - return - } - - if r.cmd.Process != nil { - pid = r.cmd.Process.Pid - } - - commandExited := make(chan struct{}) - - var pipeWg sync.WaitGroup - pipeWg.Add(2) - - go r.killPgroup(ctx, commandExited, pid) - go r.pipeToLines(commandExited, &pipeWg, pid, stdoutPipe, "STDOUT", lines) - go r.pipeToLines(commandExited, &pipeWg, pid, stderrPipe, "STDERR", lines) - - go func() { - exitCode := 255 - if waitErr := r.cmd.Wait(); waitErr != nil { - if exitError, ok := waitErr.(*exec.ExitError); ok { - exitCode = exitError.ExitCode() - } - } - ec <- exitCode - - // Tell pipes we are done - close(commandExited) - // Wait for process group to be killed - <-r.pgroupKilled - // Wait for the pipes to flush the contents - pipeWg.Wait() - // Now the job is truly done - wg.Done() - }() - - return -} - -func (r Run) pipeToLines(commandExited chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) { - defer wg.Done() - bufReader := bufio.NewReader(reader) - - for { - time.Sleep(time.Millisecond * 10) - lineStr, err := bufReader.ReadString('\n') - - if err != nil { - select { - case <-commandExited: - return - default: - } - continue - } - - newLine := line.Line{ - Content: []byte(lineStr), - Count: uint64(pid), - TransmittedPerc: 100, - SourceID: what, - } - - select { - case lines <- newLine: - case <-commandExited: - return - } - } -} - -func (r Run) killPgroup(ctx context.Context, commandExited chan struct{}, pid int) { - if pid == -1 { - close(r.pgroupKilled) - return - } - - if pgid, err := syscall.Getpgid(pid); err == nil { - // Kill process group when done - select { - case <-ctx.Done(): - case <-commandExited: - } - syscall.Kill(-pgid, syscall.SIGKILL) - close(r.pgroupKilled) - } -} diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 52aaa98..1a89c3a 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -12,6 +12,7 @@ type CallbackFunc func(text string) string type Function struct { // Name of the callback function Name string + // The Go-callback function to call for this DTail function. call CallbackFunc } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 7f6b63c..01852da 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -177,12 +177,6 @@ func (q *Query) parse(tokens []token) error { } } - // Comment out for empty table support, which is "all" log lines. - /* - if q.Table == "" { - return errors.New(invalidQuery + "Empty table specified in 'from' clause") - } - */ if len(q.Select) < 1 { return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index 1882b7e..d6aa0d4 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -92,5 +92,6 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } + return sel, nil } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index d337bd2..8972188 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -22,6 +22,7 @@ func (t token) isKeyword() bool { return true } } + return false } @@ -94,6 +95,7 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } + return tokens, strings } @@ -104,5 +106,6 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } + return tokens } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index 70e9c32..7a60dba 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -170,6 +170,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { default: logger.Error("Unknown float operation", lValue, wc.Operation, rValue) } + return false } @@ -194,5 +195,6 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { default: logger.Error("Unknown string operation", lValue, wc.Operation, rValue) } + return false } diff --git a/internal/omode/mode.go b/internal/omode/mode.go index e29aacc..1aafcfc 100644 --- a/internal/omode/mode.go +++ b/internal/omode/mode.go @@ -12,7 +12,6 @@ const ( GrepClient Mode = iota MapClient Mode = iota HealthClient Mode = iota - RunClient Mode = iota ) func (m Mode) String() string { @@ -29,8 +28,6 @@ func (m Mode) String() string { return "map" case HealthClient: return "health" - case RunClient: - return "run" default: return "unknown" } diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go deleted file mode 100644 index 8e5895b..0000000 --- a/internal/server/handlers/runcommand.go +++ /dev/null @@ -1,111 +0,0 @@ -package handlers - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/run" -) - -type runCommand struct { - server *ServerHandler - run run.Run -} - -func newRunCommand(server *ServerHandler) runCommand { - return runCommand{ - server: server, - } -} - -func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error { - if argc < 2 { - return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc) - } - - ec := make(chan int, 1) - var pid int - var err error - - command := strings.Join(args[1:], " ") - if strings.Contains(command, ";") || strings.Contains(command, "\n") { - if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - return nil - } - - if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - - exitCode := <-ec - r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode)) - r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode))) - - return nil -} - -func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) { - if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) { - return -1, err - } - - timestamp := time.Now().UnixNano() - scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp) - - // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so) - logger.Debug(r.server.user, "Writing temp script", scriptPath) - - script = fmt.Sprintf("#!/bin/sh\n%s", script) - if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil { - return -1, err - } - - pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs) - go func() { - wg.Wait() - logger.Debug("Deleting script", scriptPath) - os.Remove(scriptPath) - }() - - return pid, err -} - -func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) { - if len(command) == 0 { - return -1, errors.New("Empty command provided") - } - - splitted := strings.Split(command, " ") - path := splitted[0] - args := splitted[1:] - args = append(args, outerArgs...) - - qualifiedPath, err := exec.LookPath(path) - if err != nil { - return -1, err - } - - if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") { - return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath) - } - - r.run = run.New(qualifiedPath, args) - pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines) - if err != nil { - return pid, err - } - return pid, nil -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5cf8041..3d1a53d 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -29,36 +29,34 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - payload []byte - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - globalServerWaitFor chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - activeReaders int32 + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + aggregatedMessages chan string + serverMessages chan string + payload []byte + hostname string + user *user.User + catLimiter chan struct{} + tailLimiter chan struct{} + ackCloseReceived chan struct{} + activeCommands int32 + activeReaders int32 } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - aggregatedMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - globalServerWaitFor: globalServerWaitFor, - regex: ".", - user: user, + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + aggregatedMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + user: user, } fqdn, err := os.Hostname() diff --git a/internal/server/server.go b/internal/server/server.go index 31fa85d..a20737e 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -7,7 +7,6 @@ import ( "io" "net" "strings" - "time" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" @@ -33,9 +32,6 @@ type Server struct { sched *scheduler // Mointor log files for pattern (if configured) cont *continuous - // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. - // TODO: Remove this counter. - shutdownWaitFor chan struct{} } // New returns a new server. @@ -46,7 +42,6 @@ func New() *Server { sshServerConfig: &gossh.ServerConfig{}, catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats), tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), - shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), } @@ -80,27 +75,12 @@ func (s *Server) Start(ctx context.Context) int { go s.cont.start(ctx) go s.listenerLoop(ctx, listener) - select { - case <-ctx.Done(): - // Wait until all commands/jobs/children are no more! - s.wait() - } + <-ctx.Done() // For future use. return 0 } -func (s *Server) wait() { - for { - num := len(s.shutdownWaitFor) - logger.Debug("Waiting for stuff to finish", num) - if num <= 0 { - return - } - time.Sleep(time.Second) - } -} - func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { logger.Debug("Starting listener loop") @@ -180,7 +160,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } terminate := func() { -- cgit v1.2.3 From 60fa324cd5296b088c24d8db1d334a25ca955788 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 26 Dec 2020 11:44:31 +0000 Subject: initial spartan mode support --- internal/clients/args.go | 1 + internal/clients/baseclient.go | 2 +- internal/clients/catclient.go | 3 ++- internal/clients/grepclient.go | 3 ++- internal/clients/maprclient.go | 3 ++- internal/clients/stats.go | 4 ++-- internal/clients/tailclient.go | 3 ++- internal/server/handlers/serverhandler.go | 7 +++++++ internal/user/server/user.go | 4 ++++ 9 files changed, 23 insertions(+), 7 deletions(-) (limited to 'internal') diff --git a/internal/clients/args.go b/internal/clients/args.go index 34fcfa2..cd0f174 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -22,4 +22,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string + Spartan bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 69055a3..1e40ff2 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -70,7 +70,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. - go c.stats.Start(ctx, c.throttleCh, statsCh) + go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Spartan) // Keep count of active connections active := make(chan struct{}, len(c.connections)) diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index d8e9196..50a8d18 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { + options := fmt.Sprintf("spartan=%v", c.Args.Spartan) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } return } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index e6fc94a..5fa0ae0 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,8 +41,9 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { + options := fmt.Sprintf("spartan=%v", c.Args.Spartan) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } return diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 6aadd6b..d201d40 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) + options := fmt.Sprintf("spartan=%v", c.Args.Spartan) modeStr := "cat" if c.Mode == omode.TailClient { @@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) } return diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 2ec6f22..3ea59b7 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -34,7 +34,7 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, spartan bool) { var connectedLast int for { @@ -55,7 +55,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < newConnections := connected - connectedLast - if connected == connectedLast && !force { + if (connected == connectedLast || spartan) && !force { continue } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index ff2f46e..853bdf1 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -38,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { + options := fmt.Sprintf("spartan=%v", c.Args.Spartan) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } logger.Debug(commands) diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 3d1a53d..5b948b3 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -43,6 +43,7 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 + spartan bool } // NewServerHandler returns the server handler. @@ -245,6 +246,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() return } + if spartan, ok := options["spartan"]; ok { + if spartan == "true" { + logger.Debug(h.user, "Enabling spartan mode") + h.spartan = true + } + } switch commandName { case "grep", "cat": diff --git a/internal/user/server/user.go b/internal/user/server/user.go index c4e8b7b..637945c 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -40,6 +40,10 @@ func (u *User) String() string { // HasFilePermission is used to determine whether user is alowed to read a file. func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission bool) { logger.Debug(u, filePath, permissionType, "Checking config permissions") + if config.ServerRelaxedAuthEnable { + logger.Fatal(u, filePath, permissionType, "Server releaxed auth enabled") + return true + } if u.Name == config.ScheduleUser || u.Name == config.ContinuousUser { // Background user has same permissions as dtail process itself. -- cgit v1.2.3 From b05ae938aa6ada831d19de076861e246a03a9d7d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 26 Dec 2020 14:00:41 +0000 Subject: rename spartan to quiet --- internal/clients/args.go | 2 +- internal/clients/baseclient.go | 2 +- internal/clients/catclient.go | 2 +- internal/clients/grepclient.go | 2 +- internal/clients/maprclient.go | 2 +- internal/clients/stats.go | 4 ++-- internal/clients/tailclient.go | 2 +- internal/server/handlers/serverhandler.go | 10 +++++----- 8 files changed, 13 insertions(+), 13 deletions(-) (limited to 'internal') diff --git a/internal/clients/args.go b/internal/clients/args.go index cd0f174..7f782f1 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -22,5 +22,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string - Spartan bool + Quiet bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 1e40ff2..572680c 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -70,7 +70,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. - go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Spartan) + go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) // Keep count of active connections active := make(chan struct{}, len(c.connections)) diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 50a8d18..b7b6131 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,7 +42,7 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { - options := fmt.Sprintf("spartan=%v", c.Args.Spartan) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 5fa0ae0..652c31b 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,7 +41,7 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { - options := fmt.Sprintf("spartan=%v", c.Args.Spartan) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index d201d40..9beea13 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -112,7 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - options := fmt.Sprintf("spartan=%v", c.Args.Spartan) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 3ea59b7..d8163d4 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -34,7 +34,7 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, spartan bool) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { var connectedLast int for { @@ -55,7 +55,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < newConnections := connected - connectedLast - if (connected == connectedLast || spartan) && !force { + if (connected == connectedLast || quiet) && !force { continue } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 853bdf1..cefbaa7 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -38,7 +38,7 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { - options := fmt.Sprintf("spartan=%v", c.Args.Spartan) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5b948b3..3212ee1 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -43,7 +43,7 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 - spartan bool + quiet bool } // NewServerHandler returns the server handler. @@ -246,10 +246,10 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() return } - if spartan, ok := options["spartan"]; ok { - if spartan == "true" { - logger.Debug(h.user, "Enabling spartan mode") - h.spartan = true + if quiet, ok := options["quiet"]; ok { + if quiet == "true" { + logger.Debug(h.user, "Enabling quiet mode") + h.quiet = true } } -- cgit v1.2.3 From f9415259bc5d74a89801be75587da87dd37518af Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 26 Dec 2020 14:28:31 +0000 Subject: initial quiet switch --- internal/clients/baseclient.go | 4 ++-- internal/clients/maprclient.go | 6 +++--- internal/io/logger/logger.go | 22 ++++++++++++++-------- internal/io/logger/modes.go | 1 + internal/server/handlers/controlhandler.go | 4 +--- internal/server/handlers/readcommand.go | 11 +++++------ internal/server/handlers/serverhandler.go | 11 +++++++++-- 7 files changed, 35 insertions(+), 24 deletions(-) (limited to 'internal') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 572680c..f20156f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,7 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Info("Initiating base client") + logger.Debug("Initiating base client") flag := regex.Default if c.Args.RegexInvert { @@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Info("Terminated connection") + defer logger.Debug("Terminated connection") // We want to have at least one active connection <-active diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 9beea13..1c0c2cc 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -99,7 +99,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Info("Received final mapreduce result") + logger.Debug("Received final mapreduce result") c.reportResults() } @@ -134,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") + logger.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -166,7 +166,7 @@ func (c *MaprClient) printResults() { } if numLines == 0 { - logger.Info("Empty result set this time...") + logger.Warn("Empty result set this time...") return } diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index b7db0a7..7674dd1 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -72,12 +72,15 @@ type buf struct { func Start(ctx context.Context, mode Modes) { Mode = mode - if Mode.Nothing { + switch { + case Mode.Nothing: return - } - - if Mode.Trace { + case Mode.Quiet: + Mode.Trace = false + Mode.Debug = false + case Mode.Trace: Mode.Debug = true + default: } strategy := logStrategy() @@ -87,7 +90,7 @@ func Start(ctx context.Context, mode Modes) { case DailyStrategy: _, err := os.Stat(config.Common.LogDir) Mode.logToFile = !os.IsNotExist(err) - Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace + Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet case StdoutStrategy: fallthrough default: @@ -131,11 +134,14 @@ func Info(args ...interface{}) string { // Warn message logging. func Warn(args ...interface{}) string { - if Mode.Server { - return log(serverStr, warnStr, args) + if !Mode.Quiet { + if Mode.Server { + return log(serverStr, warnStr, args) + } + return log(clientStr, warnStr, args) } - return log(clientStr, warnStr, args) + return "" } // Error message logging. diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go index 47dadfe..8864179 100644 --- a/internal/io/logger/modes.go +++ b/internal/io/logger/modes.go @@ -6,6 +6,7 @@ type Modes struct { Trace bool Debug bool Nothing bool + Quiet bool logToStdout bool logToFile bool } diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go index 8cc5a40..1e17c78 100644 --- a/internal/server/handlers/controlhandler.go +++ b/internal/server/handlers/controlhandler.go @@ -92,9 +92,7 @@ func (h *ControlHandler) handleCommand(command string) { case "health": h.serverMessages <- "OK: DTail SSH Server seems fine" h.serverMessages <- "done;" - case "debug": - h.serverMessages <- logger.Debug(h.user, "Receiving debug command", command, s) default: - h.serverMessages <- logger.Warn(h.user, "Received unknown control command", command, s) + h.serverMessages <- logger.Error(h.user, "Received unknown control command", command, s) } } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 0f9207d..5b8ce3a 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -31,14 +31,13 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string) { if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - logger.Error(err) r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err)) return } re = deserializedRegex } if argc < 3 { - r.server.sendServerMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } r.readGlob(ctx, args[1], re) @@ -52,7 +51,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) for { maxRetries-- if maxRetries < 0 { - r.server.sendServerMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) return } @@ -65,7 +64,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) if numPaths := len(paths); numPaths == 0 { logger.Error(r.server.user, "No such file(s) to read", glob) - r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -97,7 +96,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr if !r.server.user.HasFilePermission(path, "readfiles") { logger.Error(r.server.user, "No permission to read file", path, globID) - r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) return } @@ -161,6 +160,6 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.sendServerMessage(logger.Error("Empty file path given?", path, glob)) + r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob)) return "" } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 3212ee1..681598c 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -43,7 +43,7 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 - quiet bool + quiet bool } // NewServerHandler returns the server handler. @@ -299,7 +299,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] func (h *ServerHandler) handleAckCommand(argc int, args []string) { if argc < 3 { - h.sendServerMessage(logger.Warn(h.user, commandParseWarning, args, argc)) + h.sendServerWarnMessage(logger.Warn(h.user, commandParseWarning, args, argc)) return } if args[1] == "close" && args[2] == "connection" { @@ -318,6 +318,13 @@ func (h *ServerHandler) sendServerMessage(message string) { h.send(h.serverMessageC(), message) } +func (h *ServerHandler) sendServerWarnMessage(message string) { + if h.quiet { + return + } + h.send(h.serverMessageC(), message) +} + func (h *ServerHandler) serverMessageC() chan<- string { return h.serverMessages } -- cgit v1.2.3 From eca9c65b7c9e33cba8cc1ea5afe016bfc59f8918 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 26 Dec 2020 14:34:32 +0000 Subject: correctly format server messages --- internal/io/logger/logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal') diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 7674dd1..4254eef 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -248,7 +248,7 @@ func log(what string, severity string, args []interface{}) string { message := strings.Join(messages, "|") write(what, severity, message) - return message + return fmt.Sprintf("%s|%s", severity, message) } // Raw message logging. -- cgit v1.2.3 From 94e37105c5a8c0ce22104add751e9938f239261e Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 27 Dec 2020 16:31:03 +0000 Subject: only try to read a file once in cat and grep mode but 10 times in tail mode --- internal/server/handlers/readcommand.go | 20 ++++++++------------ internal/server/handlers/serverhandler.go | 4 ++-- internal/version/version.go | 2 +- 3 files changed, 11 insertions(+), 15 deletions(-) (limited to 'internal') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 5b8ce3a..5bab26f 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -25,7 +25,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string) { +func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { @@ -40,21 +40,14 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string) { r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } - r.readGlob(ctx, args[1], re) + r.readGlob(ctx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) { +func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) - maxRetries := 10 - for { - maxRetries-- - if maxRetries < 0 { - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) - return - } - + for retryCount := 0; retryCount < retries; retryCount++ { paths, err := filepath.Glob(glob) if err != nil { logger.Warn(r.server.user, glob, err) @@ -75,8 +68,11 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) } r.readFiles(ctx, paths, glob, re, retryInterval) - break + return } + + r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + return } func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 681598c..185e7c2 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -258,7 +258,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.CatClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args) + command.Start(ctx, argc, args, 1) readerFinished() commandFinished() }() @@ -267,7 +267,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.TailClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args) + command.Start(ctx, argc, args, 10) readerFinished() commandFinished() }() diff --git a/internal/version/version.go b/internal/version/version.go index 4fa9890..a64417f 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -13,7 +13,7 @@ const ( // Version of DTail. Version string = "3.2.0" // Additional information for DTail - Additional string = "develop2" + Additional string = "develop-3" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) -- cgit v1.2.3 From 2c7bdd09e8b7c58d98d631e32a24e4bd34d5bec9 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 27 Dec 2020 16:42:03 +0000 Subject: make lint happy --- internal/io/fs/permissions/permission_linux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal') diff --git a/internal/io/fs/permissions/permission_linux.go b/internal/io/fs/permissions/permission_linux.go index feae729..bbc039b 100644 --- a/internal/io/fs/permissions/permission_linux.go +++ b/internal/io/fs/permissions/permission_linux.go @@ -11,7 +11,7 @@ import ( "unsafe" ) -// To check whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a given file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) -- cgit v1.2.3