diff options
| author | Paul Buetow <git@mx.buetow.org> | 2020-12-26 10:48:51 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2020-12-26 10:48:51 +0000 |
| commit | ab676c2b484225ed22765b23d8f0545088ecd610 (patch) | |
| tree | 5292e21339fef551f19e8fdd90beeb35d676381d | |
| parent | b4db37d8cbae8f0c3dec289b2e1b0cfe83731415 (diff) | |
code cleanup and minor refactorings
| -rw-r--r-- | internal/clients/grepclient.go | 1 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 18 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 1 | ||||
| -rw-r--r-- | internal/clients/maker.go | 3 | ||||
| -rw-r--r-- | internal/clients/runclient.go | 87 | ||||
| -rw-r--r-- | internal/clients/stats.go | 7 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 1 | ||||
| -rw-r--r-- | internal/color/colorfy.go | 12 | ||||
| -rw-r--r-- | internal/config/common.go | 10 | ||||
| -rw-r--r-- | internal/discovery/comma.go | 3 | ||||
| -rw-r--r-- | internal/discovery/file.go | 3 | ||||
| -rw-r--r-- | internal/io/line/line.go | 6 | ||||
| -rw-r--r-- | internal/io/prompt/prompt.go | 6 | ||||
| -rw-r--r-- | internal/io/run/run.go | 150 | ||||
| -rw-r--r-- | internal/mapr/funcs/function.go | 1 | ||||
| -rw-r--r-- | internal/mapr/query.go | 6 | ||||
| -rw-r--r-- | internal/mapr/selectcondition.go | 1 | ||||
| -rw-r--r-- | internal/mapr/token.go | 3 | ||||
| -rw-r--r-- | internal/mapr/wherecondition.go | 2 | ||||
| -rw-r--r-- | internal/omode/mode.go | 3 | ||||
| -rw-r--r-- | internal/server/handlers/runcommand.go | 111 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 50 | ||||
| -rw-r--r-- | internal/server/server.go | 24 |
23 files changed, 68 insertions, 441 deletions
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 4024083..e6fc94a 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -44,5 +44,6 @@ func (c GrepClient) makeCommands() (commands []string) { for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) } + return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index b5045e2..f07fd90 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "fmt" "io" - "strconv" "strings" "time" @@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } + return } @@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { case <-h.Done(): return } - - case strings.HasPrefix(message, ".run exitstatus"): - splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") - if len(splitted) != 3 { - logger.Error("Unable to retrieve exitstatus", message) - return - } - i, err := strconv.Atoi(splitted[2]) - if err != nil { - logger.Error("Unable to retrieve exitstatus", message, err) - return - } - logger.Debug("Retrieved exitstatus", h.status) - if i > h.status { - h.status = i - } } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 08ed137..0440706 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,6 +19,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + // The return status. status int } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 1ba6482..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -4,6 +4,9 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" ) +// maker interface helps to re-use code in all DTail client implementations. +// All clients share the baseClient but have different connection handlers +// and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go deleted file mode 100644 index 5464d54..0000000 --- a/internal/clients/runclient.go +++ /dev/null @@ -1,87 +0,0 @@ -package clients - -import ( - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "fmt" - "runtime" - "strings" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/omode" -) - -// RunClient is a client to run various commands on the server. -type RunClient struct { - baseClient - jobName string - background string -} - -// NewRunClient returns a new run client to execute commands on the remote server. -func NewRunClient(args Args, background, jobName string) (*RunClient, error) { - args.Mode = omode.RunClient - - if jobName == "" { - jobName = hash(strings.Join(args.Arguments, " ")) - } - - c := RunClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, - jobName: jobName, - background: background, - } - - c.init() - c.makeConnections(c) - - return &c, nil -} - -func (c RunClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c RunClient) makeCommands() (commands []string) { - if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What)) - return - } - - commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What)) - return -} - -func (c RunClient) options() string { - var sb strings.Builder - - logger.Debug("options", fmt.Sprintf(":background=%s", c.background)) - sb.WriteString(fmt.Sprintf(":background=%s", c.background)) - - logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName)) - sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName)) - - if len(c.Arguments) > 0 { - logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " "))) - sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " ")))) - } - - return sb.String() -} - -func encode64(str string) string { - return base64.StdEncoding.EncodeToString([]byte(str)) -} - -func hash(str string) string { - h := sha256.New() - h.Write([]byte(str)) - - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 17343b5..2ec6f22 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -45,7 +45,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < case message := <-statsCh: messages = append(messages, message) force = true - case <-time.After(time.Second * 10): + case <-time.After(time.Second * 3): case <-ctx.Done(): return } @@ -63,7 +63,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < switch force { case true: messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) - s.printStatsOnInterrupt(messages) + s.printStatsDueInterrupt(messages) default: logger.Info(stats) } @@ -75,7 +75,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < } } -func (s *stats) printStatsOnInterrupt(messages []string) { +func (s *stats) printStatsDueInterrupt(messages []string) { logger.Pause() for _, message := range messages { fmt.Println(fmt.Sprintf(" %s", message)) @@ -107,5 +107,6 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } + return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 53b5ba4..ff2f46e 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) + return &c, nil } diff --git a/internal/color/colorfy.go b/internal/color/colorfy.go index 9ae46f5..a2beb7a 100644 --- a/internal/color/colorfy.go +++ b/internal/color/colorfy.go @@ -41,16 +41,14 @@ func paintClientStats(line string) string { // Colorfy a given line based on the line's content. func Colorfy(line string) string { - if strings.HasPrefix(line, "REMOTE") { + switch { + case strings.HasPrefix(line, "REMOTE"): return paintRemote(line) - } - if strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|") { + case strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|"): return paintClientStats(line) - } - if strings.Contains(line, "ERROR") { + case strings.Contains(line, "ERROR"): return Paint(Magenta, line) - } - if strings.Contains(line, "WARN") { + case strings.Contains(line, "WARN"): return Paint(Magenta, line) } diff --git a/internal/config/common.go b/internal/config/common.go index 103b390..c3e203e 100644 --- a/internal/config/common.go +++ b/internal/config/common.go @@ -2,10 +2,14 @@ package config // CommonConfig stores configuration keys shared by DTail server and client. type CommonConfig struct { - SSHPort int + // The SSH port number + SSHPort int + // Enable experimental features (mainly for dev purposes) ExperimentalFeaturesEnable bool `json:",omitempty"` - DebugEnable bool `json:",omitempty"` - TraceEnable bool `json:",omitempty"` + // Enable debug logging. Don't enable in production. + DebugEnable bool `json:",omitempty"` + // Enable trace logging. Don't enable in production. + TraceEnable bool `json:",omitempty"` // The log strategy to use, one of // stdout: only log to stdout (useful when used with systemd) // daily: create a log file for every day diff --git a/internal/discovery/comma.go b/internal/discovery/comma.go index 94276c7..4344240 100644 --- a/internal/discovery/comma.go +++ b/internal/discovery/comma.go @@ -1,8 +1,9 @@ package discovery import ( - "github.com/mimecast/dtail/internal/io/logger" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromCOMMA retrieves a list of servers from comma separated input list. diff --git a/internal/discovery/file.go b/internal/discovery/file.go index c04173e..1250755 100644 --- a/internal/discovery/file.go +++ b/internal/discovery/file.go @@ -2,8 +2,9 @@ package discovery import ( "bufio" - "github.com/mimecast/dtail/internal/io/logger" "os" + + "github.com/mimecast/dtail/internal/io/logger" ) // ServerListFromFILE retrieves a list of servers from a file. diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 9db93c0..715be34 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -15,7 +15,11 @@ type Line struct { // lines if that happens but it will signal to the client how // many log lines in % could be transmitted to the client. TransmittedPerc int - SourceID string + // Contains the unique identifier of the source log file. + // It could be the name of the log or it could be one of the parent + // directories in case multiple log files with the same basename are + // followed. + SourceID string } // Return a human readable representation of the followed line. diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index a438d33..36ebdb5 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -3,9 +3,10 @@ package prompt import ( "bufio" "fmt" - "github.com/mimecast/dtail/internal/io/logger" "os" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // Answer is a user input of a prompt question. @@ -18,8 +19,7 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - - AskAgain bool + AskAgain bool } // Prompt used for interactive user input. diff --git a/internal/io/run/run.go b/internal/io/run/run.go deleted file mode 100644 index 2bb3756..0000000 --- a/internal/io/run/run.go +++ /dev/null @@ -1,150 +0,0 @@ -package run - -import ( - "bufio" - "context" - "io" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" -) - -// Run is for execute a command. -type Run struct { - command string - args []string - cmd *exec.Cmd - pgroupKilled chan struct{} -} - -// New returns a new command runner. -func New(command string, args []string) Run { - return Run{ - command: command, - args: args, - pgroupKilled: make(chan struct{}), - } -} - -// StartBackground starts running the command in background. -func (r Run) StartBackground(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, lines chan<- line.Line) (pid int, err error) { - pid = -1 - - if len(r.args) > 0 { - logger.Debug(r.command, r.args, " ") - r.cmd = exec.CommandContext(ctx, r.command, r.args...) - } else { - logger.Debug(r.command) - r.cmd = exec.CommandContext(ctx, r.command) - } - - // Create a new process group, so that kill() will only kill this command + pgroup. - r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdoutPipe, myErr := r.cmd.StdoutPipe() - if err != nil { - wg.Done() - err = myErr - return - } - - stderrPipe, myErr := r.cmd.StderrPipe() - if myErr != nil { - wg.Done() - err = myErr - return - } - - if myErr := r.cmd.Start(); err != nil { - wg.Done() - err = myErr - return - } - - if r.cmd.Process != nil { - pid = r.cmd.Process.Pid - } - - commandExited := make(chan struct{}) - - var pipeWg sync.WaitGroup - pipeWg.Add(2) - - go r.killPgroup(ctx, commandExited, pid) - go r.pipeToLines(commandExited, &pipeWg, pid, stdoutPipe, "STDOUT", lines) - go r.pipeToLines(commandExited, &pipeWg, pid, stderrPipe, "STDERR", lines) - - go func() { - exitCode := 255 - if waitErr := r.cmd.Wait(); waitErr != nil { - if exitError, ok := waitErr.(*exec.ExitError); ok { - exitCode = exitError.ExitCode() - } - } - ec <- exitCode - - // Tell pipes we are done - close(commandExited) - // Wait for process group to be killed - <-r.pgroupKilled - // Wait for the pipes to flush the contents - pipeWg.Wait() - // Now the job is truly done - wg.Done() - }() - - return -} - -func (r Run) pipeToLines(commandExited chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) { - defer wg.Done() - bufReader := bufio.NewReader(reader) - - for { - time.Sleep(time.Millisecond * 10) - lineStr, err := bufReader.ReadString('\n') - - if err != nil { - select { - case <-commandExited: - return - default: - } - continue - } - - newLine := line.Line{ - Content: []byte(lineStr), - Count: uint64(pid), - TransmittedPerc: 100, - SourceID: what, - } - - select { - case lines <- newLine: - case <-commandExited: - return - } - } -} - -func (r Run) killPgroup(ctx context.Context, commandExited chan struct{}, pid int) { - if pid == -1 { - close(r.pgroupKilled) - return - } - - if pgid, err := syscall.Getpgid(pid); err == nil { - // Kill process group when done - select { - case <-ctx.Done(): - case <-commandExited: - } - syscall.Kill(-pgid, syscall.SIGKILL) - close(r.pgroupKilled) - } -} diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 52aaa98..1a89c3a 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -12,6 +12,7 @@ type CallbackFunc func(text string) string type Function struct { // Name of the callback function Name string + // The Go-callback function to call for this DTail function. call CallbackFunc } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 7f6b63c..01852da 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -177,12 +177,6 @@ func (q *Query) parse(tokens []token) error { } } - // Comment out for empty table support, which is "all" log lines. - /* - if q.Table == "" { - return errors.New(invalidQuery + "Empty table specified in 'from' clause") - } - */ if len(q.Select) < 1 { return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index 1882b7e..d6aa0d4 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -92,5 +92,6 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } + return sel, nil } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index d337bd2..8972188 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -22,6 +22,7 @@ func (t token) isKeyword() bool { return true } } + return false } @@ -94,6 +95,7 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } + return tokens, strings } @@ -104,5 +106,6 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } + return tokens } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index 70e9c32..7a60dba 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -170,6 +170,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { default: logger.Error("Unknown float operation", lValue, wc.Operation, rValue) } + return false } @@ -194,5 +195,6 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { default: logger.Error("Unknown string operation", lValue, wc.Operation, rValue) } + return false } diff --git a/internal/omode/mode.go b/internal/omode/mode.go index e29aacc..1aafcfc 100644 --- a/internal/omode/mode.go +++ b/internal/omode/mode.go @@ -12,7 +12,6 @@ const ( GrepClient Mode = iota MapClient Mode = iota HealthClient Mode = iota - RunClient Mode = iota ) func (m Mode) String() string { @@ -29,8 +28,6 @@ func (m Mode) String() string { return "map" case HealthClient: return "health" - case RunClient: - return "run" default: return "unknown" } diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go deleted file mode 100644 index 8e5895b..0000000 --- a/internal/server/handlers/runcommand.go +++ /dev/null @@ -1,111 +0,0 @@ -package handlers - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/run" -) - -type runCommand struct { - server *ServerHandler - run run.Run -} - -func newRunCommand(server *ServerHandler) runCommand { - return runCommand{ - server: server, - } -} - -func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error { - if argc < 2 { - return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc) - } - - ec := make(chan int, 1) - var pid int - var err error - - command := strings.Join(args[1:], " ") - if strings.Contains(command, ";") || strings.Contains(command, "\n") { - if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - return nil - } - - if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - - exitCode := <-ec - r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode)) - r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode))) - - return nil -} - -func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) { - if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) { - return -1, err - } - - timestamp := time.Now().UnixNano() - scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp) - - // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so) - logger.Debug(r.server.user, "Writing temp script", scriptPath) - - script = fmt.Sprintf("#!/bin/sh\n%s", script) - if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil { - return -1, err - } - - pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs) - go func() { - wg.Wait() - logger.Debug("Deleting script", scriptPath) - os.Remove(scriptPath) - }() - - return pid, err -} - -func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) { - if len(command) == 0 { - return -1, errors.New("Empty command provided") - } - - splitted := strings.Split(command, " ") - path := splitted[0] - args := splitted[1:] - args = append(args, outerArgs...) - - qualifiedPath, err := exec.LookPath(path) - if err != nil { - return -1, err - } - - if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") { - return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath) - } - - r.run = run.New(qualifiedPath, args) - pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines) - if err != nil { - return pid, err - } - return pid, nil -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5cf8041..3d1a53d 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -29,36 +29,34 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - payload []byte - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - globalServerWaitFor chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - activeReaders int32 + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + aggregatedMessages chan string + serverMessages chan string + payload []byte + hostname string + user *user.User + catLimiter chan struct{} + tailLimiter chan struct{} + ackCloseReceived chan struct{} + activeCommands int32 + activeReaders int32 } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - aggregatedMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - globalServerWaitFor: globalServerWaitFor, - regex: ".", - user: user, + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + aggregatedMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + user: user, } fqdn, err := os.Hostname() diff --git a/internal/server/server.go b/internal/server/server.go index 31fa85d..a20737e 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -7,7 +7,6 @@ import ( "io" "net" "strings" - "time" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" @@ -33,9 +32,6 @@ type Server struct { sched *scheduler // Mointor log files for pattern (if configured) cont *continuous - // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. - // TODO: Remove this counter. - shutdownWaitFor chan struct{} } // New returns a new server. @@ -46,7 +42,6 @@ func New() *Server { sshServerConfig: &gossh.ServerConfig{}, catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats), tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), - shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), } @@ -80,27 +75,12 @@ func (s *Server) Start(ctx context.Context) int { go s.cont.start(ctx) go s.listenerLoop(ctx, listener) - select { - case <-ctx.Done(): - // Wait until all commands/jobs/children are no more! - s.wait() - } + <-ctx.Done() // For future use. return 0 } -func (s *Server) wait() { - for { - num := len(s.shutdownWaitFor) - logger.Debug("Waiting for stuff to finish", num) - if num <= 0 { - return - } - time.Sleep(time.Second) - } -} - func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { logger.Debug("Starting listener loop") @@ -180,7 +160,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } terminate := func() { |
