summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2020-12-26 10:48:51 +0000
committerPaul Buetow <git@mx.buetow.org>2020-12-26 10:48:51 +0000
commitab676c2b484225ed22765b23d8f0545088ecd610 (patch)
tree5292e21339fef551f19e8fdd90beeb35d676381d
parentb4db37d8cbae8f0c3dec289b2e1b0cfe83731415 (diff)
code cleanup and minor refactorings
-rw-r--r--internal/clients/grepclient.go1
-rw-r--r--internal/clients/handlers/basehandler.go18
-rw-r--r--internal/clients/handlers/healthhandler.go1
-rw-r--r--internal/clients/maker.go3
-rw-r--r--internal/clients/runclient.go87
-rw-r--r--internal/clients/stats.go7
-rw-r--r--internal/clients/tailclient.go1
-rw-r--r--internal/color/colorfy.go12
-rw-r--r--internal/config/common.go10
-rw-r--r--internal/discovery/comma.go3
-rw-r--r--internal/discovery/file.go3
-rw-r--r--internal/io/line/line.go6
-rw-r--r--internal/io/prompt/prompt.go6
-rw-r--r--internal/io/run/run.go150
-rw-r--r--internal/mapr/funcs/function.go1
-rw-r--r--internal/mapr/query.go6
-rw-r--r--internal/mapr/selectcondition.go1
-rw-r--r--internal/mapr/token.go3
-rw-r--r--internal/mapr/wherecondition.go2
-rw-r--r--internal/omode/mode.go3
-rw-r--r--internal/server/handlers/runcommand.go111
-rw-r--r--internal/server/handlers/serverhandler.go50
-rw-r--r--internal/server/server.go24
23 files changed, 68 insertions, 441 deletions
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 4024083..e6fc94a 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -44,5 +44,6 @@ func (c GrepClient) makeCommands() (commands []string) {
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
}
+
return
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index b5045e2..f07fd90 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"io"
- "strconv"
"strings"
"time"
@@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
+
return
}
@@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case <-h.Done():
return
}
-
- case strings.HasPrefix(message, ".run exitstatus"):
- splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ")
- if len(splitted) != 3 {
- logger.Error("Unable to retrieve exitstatus", message)
- return
- }
- i, err := strconv.Atoi(splitted[2])
- if err != nil {
- logger.Error("Unable to retrieve exitstatus", message, err)
- return
- }
- logger.Debug("Retrieved exitstatus", h.status)
- if i > h.status {
- h.status = i
- }
}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 08ed137..0440706 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -19,6 +19,7 @@ type HealthHandler struct {
receive chan<- string
// The remote server address
server string
+ // The return status.
status int
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index 1ba6482..d5ffd8b 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -4,6 +4,9 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
)
+// maker interface helps to re-use code in all DTail client implementations.
+// All clients share the baseClient but have different connection handlers
+// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go
deleted file mode 100644
index 5464d54..0000000
--- a/internal/clients/runclient.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package clients
-
-import (
- "crypto/sha256"
- "encoding/base64"
- "encoding/hex"
- "fmt"
- "runtime"
- "strings"
-
- "github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/omode"
-)
-
-// RunClient is a client to run various commands on the server.
-type RunClient struct {
- baseClient
- jobName string
- background string
-}
-
-// NewRunClient returns a new run client to execute commands on the remote server.
-func NewRunClient(args Args, background, jobName string) (*RunClient, error) {
- args.Mode = omode.RunClient
-
- if jobName == "" {
- jobName = hash(strings.Join(args.Arguments, " "))
- }
-
- c := RunClient{
- baseClient: baseClient{
- Args: args,
- throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: false,
- },
- jobName: jobName,
- background: background,
- }
-
- c.init()
- c.makeConnections(c)
-
- return &c, nil
-}
-
-func (c RunClient) makeHandler(server string) handlers.Handler {
- return handlers.NewClientHandler(server)
-}
-
-func (c RunClient) makeCommands() (commands []string) {
- if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What))
- return
- }
-
- commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What))
- return
-}
-
-func (c RunClient) options() string {
- var sb strings.Builder
-
- logger.Debug("options", fmt.Sprintf(":background=%s", c.background))
- sb.WriteString(fmt.Sprintf(":background=%s", c.background))
-
- logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName))
- sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName))
-
- if len(c.Arguments) > 0 {
- logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " ")))
- sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " "))))
- }
-
- return sb.String()
-}
-
-func encode64(str string) string {
- return base64.StdEncoding.EncodeToString([]byte(str))
-}
-
-func hash(str string) string {
- h := sha256.New()
- h.Write([]byte(str))
-
- return hex.EncodeToString(h.Sum(nil))
-}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 17343b5..2ec6f22 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -45,7 +45,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
case message := <-statsCh:
messages = append(messages, message)
force = true
- case <-time.After(time.Second * 10):
+ case <-time.After(time.Second * 3):
case <-ctx.Done():
return
}
@@ -63,7 +63,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
switch force {
case true:
messages = append(messages, fmt.Sprintf("Connection stats: %s", stats))
- s.printStatsOnInterrupt(messages)
+ s.printStatsDueInterrupt(messages)
default:
logger.Info(stats)
}
@@ -75,7 +75,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
}
}
-func (s *stats) printStatsOnInterrupt(messages []string) {
+func (s *stats) printStatsDueInterrupt(messages []string) {
logger.Pause()
for _, message := range messages {
fmt.Println(fmt.Sprintf(" %s", message))
@@ -107,5 +107,6 @@ func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
return 100
}
+
return value / (total / 100.0)
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 53b5ba4..ff2f46e 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) {
c.init()
c.makeConnections(c)
+
return &c, nil
}
diff --git a/internal/color/colorfy.go b/internal/color/colorfy.go
index 9ae46f5..a2beb7a 100644
--- a/internal/color/colorfy.go
+++ b/internal/color/colorfy.go
@@ -41,16 +41,14 @@ func paintClientStats(line string) string {
// Colorfy a given line based on the line's content.
func Colorfy(line string) string {
- if strings.HasPrefix(line, "REMOTE") {
+ switch {
+ case strings.HasPrefix(line, "REMOTE"):
return paintRemote(line)
- }
- if strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|") {
+ case strings.HasPrefix(line, "CLIENT") && strings.Contains(line, "|stats|"):
return paintClientStats(line)
- }
- if strings.Contains(line, "ERROR") {
+ case strings.Contains(line, "ERROR"):
return Paint(Magenta, line)
- }
- if strings.Contains(line, "WARN") {
+ case strings.Contains(line, "WARN"):
return Paint(Magenta, line)
}
diff --git a/internal/config/common.go b/internal/config/common.go
index 103b390..c3e203e 100644
--- a/internal/config/common.go
+++ b/internal/config/common.go
@@ -2,10 +2,14 @@ package config
// CommonConfig stores configuration keys shared by DTail server and client.
type CommonConfig struct {
- SSHPort int
+ // The SSH port number
+ SSHPort int
+ // Enable experimental features (mainly for dev purposes)
ExperimentalFeaturesEnable bool `json:",omitempty"`
- DebugEnable bool `json:",omitempty"`
- TraceEnable bool `json:",omitempty"`
+ // Enable debug logging. Don't enable in production.
+ DebugEnable bool `json:",omitempty"`
+ // Enable trace logging. Don't enable in production.
+ TraceEnable bool `json:",omitempty"`
// The log strategy to use, one of
// stdout: only log to stdout (useful when used with systemd)
// daily: create a log file for every day
diff --git a/internal/discovery/comma.go b/internal/discovery/comma.go
index 94276c7..4344240 100644
--- a/internal/discovery/comma.go
+++ b/internal/discovery/comma.go
@@ -1,8 +1,9 @@
package discovery
import (
- "github.com/mimecast/dtail/internal/io/logger"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
// ServerListFromCOMMA retrieves a list of servers from comma separated input list.
diff --git a/internal/discovery/file.go b/internal/discovery/file.go
index c04173e..1250755 100644
--- a/internal/discovery/file.go
+++ b/internal/discovery/file.go
@@ -2,8 +2,9 @@ package discovery
import (
"bufio"
- "github.com/mimecast/dtail/internal/io/logger"
"os"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
// ServerListFromFILE retrieves a list of servers from a file.
diff --git a/internal/io/line/line.go b/internal/io/line/line.go
index 9db93c0..715be34 100644
--- a/internal/io/line/line.go
+++ b/internal/io/line/line.go
@@ -15,7 +15,11 @@ type Line struct {
// lines if that happens but it will signal to the client how
// many log lines in % could be transmitted to the client.
TransmittedPerc int
- SourceID string
+ // Contains the unique identifier of the source log file.
+ // It could be the name of the log or it could be one of the parent
+ // directories in case multiple log files with the same basename are
+ // followed.
+ SourceID string
}
// Return a human readable representation of the followed line.
diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go
index a438d33..36ebdb5 100644
--- a/internal/io/prompt/prompt.go
+++ b/internal/io/prompt/prompt.go
@@ -3,9 +3,10 @@ package prompt
import (
"bufio"
"fmt"
- "github.com/mimecast/dtail/internal/io/logger"
"os"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
// Answer is a user input of a prompt question.
@@ -18,8 +19,7 @@ type Answer struct {
Callback func()
// Runs after Callback and after logging resumes
EndCallback func()
-
- AskAgain bool
+ AskAgain bool
}
// Prompt used for interactive user input.
diff --git a/internal/io/run/run.go b/internal/io/run/run.go
deleted file mode 100644
index 2bb3756..0000000
--- a/internal/io/run/run.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package run
-
-import (
- "bufio"
- "context"
- "io"
- "os/exec"
- "sync"
- "syscall"
- "time"
-
- "github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
-)
-
-// Run is for execute a command.
-type Run struct {
- command string
- args []string
- cmd *exec.Cmd
- pgroupKilled chan struct{}
-}
-
-// New returns a new command runner.
-func New(command string, args []string) Run {
- return Run{
- command: command,
- args: args,
- pgroupKilled: make(chan struct{}),
- }
-}
-
-// StartBackground starts running the command in background.
-func (r Run) StartBackground(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, lines chan<- line.Line) (pid int, err error) {
- pid = -1
-
- if len(r.args) > 0 {
- logger.Debug(r.command, r.args, " ")
- r.cmd = exec.CommandContext(ctx, r.command, r.args...)
- } else {
- logger.Debug(r.command)
- r.cmd = exec.CommandContext(ctx, r.command)
- }
-
- // Create a new process group, so that kill() will only kill this command + pgroup.
- r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-
- stdoutPipe, myErr := r.cmd.StdoutPipe()
- if err != nil {
- wg.Done()
- err = myErr
- return
- }
-
- stderrPipe, myErr := r.cmd.StderrPipe()
- if myErr != nil {
- wg.Done()
- err = myErr
- return
- }
-
- if myErr := r.cmd.Start(); err != nil {
- wg.Done()
- err = myErr
- return
- }
-
- if r.cmd.Process != nil {
- pid = r.cmd.Process.Pid
- }
-
- commandExited := make(chan struct{})
-
- var pipeWg sync.WaitGroup
- pipeWg.Add(2)
-
- go r.killPgroup(ctx, commandExited, pid)
- go r.pipeToLines(commandExited, &pipeWg, pid, stdoutPipe, "STDOUT", lines)
- go r.pipeToLines(commandExited, &pipeWg, pid, stderrPipe, "STDERR", lines)
-
- go func() {
- exitCode := 255
- if waitErr := r.cmd.Wait(); waitErr != nil {
- if exitError, ok := waitErr.(*exec.ExitError); ok {
- exitCode = exitError.ExitCode()
- }
- }
- ec <- exitCode
-
- // Tell pipes we are done
- close(commandExited)
- // Wait for process group to be killed
- <-r.pgroupKilled
- // Wait for the pipes to flush the contents
- pipeWg.Wait()
- // Now the job is truly done
- wg.Done()
- }()
-
- return
-}
-
-func (r Run) pipeToLines(commandExited chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) {
- defer wg.Done()
- bufReader := bufio.NewReader(reader)
-
- for {
- time.Sleep(time.Millisecond * 10)
- lineStr, err := bufReader.ReadString('\n')
-
- if err != nil {
- select {
- case <-commandExited:
- return
- default:
- }
- continue
- }
-
- newLine := line.Line{
- Content: []byte(lineStr),
- Count: uint64(pid),
- TransmittedPerc: 100,
- SourceID: what,
- }
-
- select {
- case lines <- newLine:
- case <-commandExited:
- return
- }
- }
-}
-
-func (r Run) killPgroup(ctx context.Context, commandExited chan struct{}, pid int) {
- if pid == -1 {
- close(r.pgroupKilled)
- return
- }
-
- if pgid, err := syscall.Getpgid(pid); err == nil {
- // Kill process group when done
- select {
- case <-ctx.Done():
- case <-commandExited:
- }
- syscall.Kill(-pgid, syscall.SIGKILL)
- close(r.pgroupKilled)
- }
-}
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go
index 52aaa98..1a89c3a 100644
--- a/internal/mapr/funcs/function.go
+++ b/internal/mapr/funcs/function.go
@@ -12,6 +12,7 @@ type CallbackFunc func(text string) string
type Function struct {
// Name of the callback function
Name string
+ // The Go-callback function to call for this DTail function.
call CallbackFunc
}
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index 7f6b63c..01852da 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -177,12 +177,6 @@ func (q *Query) parse(tokens []token) error {
}
}
- // Comment out for empty table support, which is "all" log lines.
- /*
- if q.Table == "" {
- return errors.New(invalidQuery + "Empty table specified in 'from' clause")
- }
- */
if len(q.Select) < 1 {
return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none")
}
diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go
index 1882b7e..d6aa0d4 100644
--- a/internal/mapr/selectcondition.go
+++ b/internal/mapr/selectcondition.go
@@ -92,5 +92,6 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) {
}
sel = append(sel, sc)
}
+
return sel, nil
}
diff --git a/internal/mapr/token.go b/internal/mapr/token.go
index d337bd2..8972188 100644
--- a/internal/mapr/token.go
+++ b/internal/mapr/token.go
@@ -22,6 +22,7 @@ func (t token) isKeyword() bool {
return true
}
}
+
return false
}
@@ -94,6 +95,7 @@ func tokensConsumeStr(tokens []token) ([]token, []string) {
for _, token := range found {
strings = append(strings, token.str)
}
+
return tokens, strings
}
@@ -104,5 +106,6 @@ func tokensConsumeOptional(tokens []token, optional string) []token {
if strings.ToLower(tokens[0].str) == strings.ToLower(optional) {
return tokens[1:]
}
+
return tokens
}
diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go
index 70e9c32..7a60dba 100644
--- a/internal/mapr/wherecondition.go
+++ b/internal/mapr/wherecondition.go
@@ -170,6 +170,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
default:
logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
}
+
return false
}
@@ -194,5 +195,6 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
default:
logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
}
+
return false
}
diff --git a/internal/omode/mode.go b/internal/omode/mode.go
index e29aacc..1aafcfc 100644
--- a/internal/omode/mode.go
+++ b/internal/omode/mode.go
@@ -12,7 +12,6 @@ const (
GrepClient Mode = iota
MapClient Mode = iota
HealthClient Mode = iota
- RunClient Mode = iota
)
func (m Mode) String() string {
@@ -29,8 +28,6 @@ func (m Mode) String() string {
return "map"
case HealthClient:
return "health"
- case RunClient:
- return "run"
default:
return "unknown"
}
diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go
deleted file mode 100644
index 8e5895b..0000000
--- a/internal/server/handlers/runcommand.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package handlers
-
-import (
- "context"
- "errors"
- "fmt"
- "io/ioutil"
- "os"
- "os/exec"
- "strings"
- "sync"
- "time"
-
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/io/run"
-)
-
-type runCommand struct {
- server *ServerHandler
- run run.Run
-}
-
-func newRunCommand(server *ServerHandler) runCommand {
- return runCommand{
- server: server,
- }
-}
-
-func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error {
- if argc < 2 {
- return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc)
- }
-
- ec := make(chan int, 1)
- var pid int
- var err error
-
- command := strings.Join(args[1:], " ")
- if strings.Contains(command, ";") || strings.Contains(command, "\n") {
- if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil {
- r.server.sendServerMessage(".run exitstatus 255")
- return err
- }
- return nil
- }
-
- if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil {
- r.server.sendServerMessage(".run exitstatus 255")
- return err
- }
-
- exitCode := <-ec
- r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode))
- r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode)))
-
- return nil
-}
-
-func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) {
- if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) {
- return -1, err
- }
-
- timestamp := time.Now().UnixNano()
- scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp)
-
- // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so)
- logger.Debug(r.server.user, "Writing temp script", scriptPath)
-
- script = fmt.Sprintf("#!/bin/sh\n%s", script)
- if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil {
- return -1, err
- }
-
- pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs)
- go func() {
- wg.Wait()
- logger.Debug("Deleting script", scriptPath)
- os.Remove(scriptPath)
- }()
-
- return pid, err
-}
-
-func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) {
- if len(command) == 0 {
- return -1, errors.New("Empty command provided")
- }
-
- splitted := strings.Split(command, " ")
- path := splitted[0]
- args := splitted[1:]
- args = append(args, outerArgs...)
-
- qualifiedPath, err := exec.LookPath(path)
- if err != nil {
- return -1, err
- }
-
- if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") {
- return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath)
- }
-
- r.run = run.New(qualifiedPath, args)
- pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines)
- if err != nil {
- return pid, err
- }
- return pid, nil
-}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 5cf8041..3d1a53d 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -29,36 +29,34 @@ const (
// the Bi-directional communication between SSH client and server.
// This handler implements the handler of the SSH server.
type ServerHandler struct {
- done *internal.Done
- lines chan line.Line
- regex string
- aggregate *server.Aggregate
- aggregatedMessages chan string
- serverMessages chan string
- payload []byte
- hostname string
- user *user.User
- catLimiter chan struct{}
- tailLimiter chan struct{}
- globalServerWaitFor chan struct{}
- ackCloseReceived chan struct{}
- activeCommands int32
- activeReaders int32
+ done *internal.Done
+ lines chan line.Line
+ regex string
+ aggregate *server.Aggregate
+ aggregatedMessages chan string
+ serverMessages chan string
+ payload []byte
+ hostname string
+ user *user.User
+ catLimiter chan struct{}
+ tailLimiter chan struct{}
+ ackCloseReceived chan struct{}
+ activeCommands int32
+ activeReaders int32
}
// NewServerHandler returns the server handler.
-func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler {
+func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
h := ServerHandler{
- done: internal.NewDone(),
- lines: make(chan line.Line, 100),
- serverMessages: make(chan string, 10),
- aggregatedMessages: make(chan string, 10),
- ackCloseReceived: make(chan struct{}),
- catLimiter: catLimiter,
- tailLimiter: tailLimiter,
- globalServerWaitFor: globalServerWaitFor,
- regex: ".",
- user: user,
+ done: internal.NewDone(),
+ lines: make(chan line.Line, 100),
+ serverMessages: make(chan string, 10),
+ aggregatedMessages: make(chan string, 10),
+ ackCloseReceived: make(chan struct{}),
+ catLimiter: catLimiter,
+ tailLimiter: tailLimiter,
+ regex: ".",
+ user: user,
}
fqdn, err := os.Hostname()
diff --git a/internal/server/server.go b/internal/server/server.go
index 31fa85d..a20737e 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -7,7 +7,6 @@ import (
"io"
"net"
"strings"
- "time"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
@@ -33,9 +32,6 @@ type Server struct {
sched *scheduler
// Mointor log files for pattern (if configured)
cont *continuous
- // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed.
- // TODO: Remove this counter.
- shutdownWaitFor chan struct{}
}
// New returns a new server.
@@ -46,7 +42,6 @@ func New() *Server {
sshServerConfig: &gossh.ServerConfig{},
catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats),
tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails),
- shutdownWaitFor: make(chan struct{}, 1000),
sched: newScheduler(),
cont: newContinuous(),
}
@@ -80,27 +75,12 @@ func (s *Server) Start(ctx context.Context) int {
go s.cont.start(ctx)
go s.listenerLoop(ctx, listener)
- select {
- case <-ctx.Done():
- // Wait until all commands/jobs/children are no more!
- s.wait()
- }
+ <-ctx.Done()
// For future use.
return 0
}
-func (s *Server) wait() {
- for {
- num := len(s.shutdownWaitFor)
- logger.Debug("Waiting for stuff to finish", num)
- if num <= 0 {
- return
- }
- time.Sleep(time.Second)
- }
-}
-
func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
logger.Debug("Starting listener loop")
@@ -180,7 +160,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
case config.ControlUser:
handler = handlers.NewControlHandler(user)
default:
- handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor)
+ handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
}
terminate := func() {