diff options
| author | Paul Buetow <git@mx.buetow.org> | 2020-11-16 22:11:44 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2020-11-16 22:11:44 +0000 |
| commit | 7df612f527bd5dc2e785bf766d7d61124c260b94 (patch) | |
| tree | 9d1674b4fe3d7e492afeefc839009e5b11d5fe27 | |
| parent | 3c889d2eed4e12af505ea84d46d8e52d21057a1f (diff) | |
remove drun command for simplicity. only focus on interactive commands dealing with log streams
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | cmd/drun/main.go | 88 | ||||
| -rw-r--r-- | doc/drun.gif | bin | 1668728 -> 0 bytes | |||
| -rw-r--r-- | doc/examples.md | 11 | ||||
| -rw-r--r-- | doc/quickstart.md | 3 | ||||
| -rw-r--r-- | internal/clients/runclient.go | 87 | ||||
| -rw-r--r-- | internal/io/run/run.go | 150 | ||||
| -rw-r--r-- | internal/server/background/background.go | 126 | ||||
| -rw-r--r-- | internal/server/handlers/runcommand.go | 111 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 85 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
11 files changed, 3 insertions, 666 deletions
@@ -5,7 +5,6 @@ build: ${GO} build -o dcat ./cmd/dcat/main.go ${GO} build -o dgrep ./cmd/dgrep/main.go ${GO} build -o dmap ./cmd/dmap/main.go - ${GO} build -o drun ./cmd/drun/main.go ${GO} build -o dtail ./cmd/dtail/main.go clean: ls ./cmd/ | while read cmd; do \ @@ -16,7 +15,6 @@ install: build cp -pv dcat ${GOPATH}/bin/dcat cp -pv dgrep ${GOPATH}/bin/dgrep cp -pv dmap ${GOPATH}/bin/dmap - cp -pv drun ${GOPATH}/bin/drun cp -pv dtail ${GOPATH}/bin/dtail vet: find . -type d | while read dir; do \ diff --git a/cmd/drun/main.go b/cmd/drun/main.go deleted file mode 100644 index 3916817..0000000 --- a/cmd/drun/main.go +++ /dev/null @@ -1,88 +0,0 @@ -package main - -import ( - "context" - "flag" - "io/ioutil" - "os" - "strings" - - "github.com/mimecast/dtail/internal/clients" - "github.com/mimecast/dtail/internal/color" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/signal" - "github.com/mimecast/dtail/internal/user" - "github.com/mimecast/dtail/internal/version" -) - -// The evil begins here. -func main() { - var args clients.Args - var background string - var cfgFile string - var command string - var debugEnable bool - var displayVersion bool - var jobName string - var noColor bool - var sshPort int - - userName := user.Name() - - flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys") - flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages") - flag.BoolVar(&displayVersion, "version", false, "Display version") - flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") - flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") - flag.IntVar(&args.Timeout, "timeout", 0, "Command execution timeout") - flag.IntVar(&sshPort, "port", 2222, "SSH server port") - flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") - flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") - flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") - flag.StringVar(&args.UserName, "user", userName, "Your system user name") - flag.StringVar(&background, "background", "", "Can be one of 'start', 'cancel', 'list' or empty") - flag.StringVar(&cfgFile, "cfg", "", "Config file path") - flag.StringVar(&command, "command", "", "Command to run") - flag.StringVar(&jobName, "name", "", "The job name (if run in background)") - - flag.Parse() - - config.Read(cfgFile, sshPort) - color.Colored = !noColor - - if displayVersion { - version.PrintAndExit() - } - - ctx := context.TODO() - logger.Start(ctx, logger.Modes{Debug: debugEnable || config.Common.DebugEnable}) - - args.What, args.Arguments = readCommand(command) - client, err := clients.NewRunClient(args, background, jobName) - if err != nil { - panic(err) - } - - status := client.Start(ctx, signal.InterruptCh(ctx)) - logger.Flush() - os.Exit(status) -} - -func readCommand(command string) (string, []string) { - splitted := strings.Split(command, " ") - - script := splitted[0] - if _, err := os.Stat(script); os.IsNotExist(err) { - var commandArgs []string - return command, commandArgs - } - commandArgs := splitted[1:] - - bytes, err := ioutil.ReadFile(script) - if err != nil { - panic(err) - } - - return string(bytes), commandArgs -} diff --git a/doc/drun.gif b/doc/drun.gif Binary files differdeleted file mode 100644 index a3efeed..0000000 --- a/doc/drun.gif +++ /dev/null diff --git a/doc/examples.md b/doc/examples.md index 5a5d892..964660a 100644 --- a/doc/examples.md +++ b/doc/examples.md @@ -52,17 +52,6 @@ The following example demonstrates how to grep files (display only the lines whi  -# How to use ``drun`` - -The following example demonstrates how to execute a command on multiple machines remotely: - -```shell -% drun --servers <(head -n 30 serverlist.txt) \ - --command uptime -``` - - - # How to use ``dmap`` To run a mapreduce aggregation over logs written in the past the ``dmap`` command can be used. For example the following command aggregates all mapreduce fields of all the logs and calculates the average memory free grouped by day of the month, hour, minute and the server hostname. ``dmap`` will print interim results every few seconds. The final result however will be written to file ``mapreduce.csv``. diff --git a/doc/quickstart.md b/doc/quickstart.md index 733442f..6baedbb 100644 --- a/doc/quickstart.md +++ b/doc/quickstart.md @@ -16,7 +16,7 @@ On Linux you need to install the libacl development library for file system ACL To compile and install all DTail binaries directly from GitHub run: ```console -% for cmd in dcat dgrep dmap drun dtail dserver; do +% for cmd in dcat dgrep dmap dtail dserver; do go get github.com/mimecast/dtail/cmd/$cmd; done ``` @@ -26,7 +26,6 @@ It produces the following executables in ``$GOPATH/bin``: * ``dcat``: Client for displaying whole files remotely (distributed cat) * ``dgrep``: Client for searching whole files files remotely using a regex (distributed grep) * ``dmap``: Client for executing distributed mapreduce queries (may will consume a lot of RAM and CPU) -* ``drun``: Client for executing commands on remote servers. * ``dtail``: Client for tailing/following log files remotely (distributed tail) * ``dserver``: The DTail server diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go deleted file mode 100644 index 5464d54..0000000 --- a/internal/clients/runclient.go +++ /dev/null @@ -1,87 +0,0 @@ -package clients - -import ( - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "fmt" - "runtime" - "strings" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/omode" -) - -// RunClient is a client to run various commands on the server. -type RunClient struct { - baseClient - jobName string - background string -} - -// NewRunClient returns a new run client to execute commands on the remote server. -func NewRunClient(args Args, background, jobName string) (*RunClient, error) { - args.Mode = omode.RunClient - - if jobName == "" { - jobName = hash(strings.Join(args.Arguments, " ")) - } - - c := RunClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, - jobName: jobName, - background: background, - } - - c.init() - c.makeConnections(c) - - return &c, nil -} - -func (c RunClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c RunClient) makeCommands() (commands []string) { - if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What)) - return - } - - commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What)) - return -} - -func (c RunClient) options() string { - var sb strings.Builder - - logger.Debug("options", fmt.Sprintf(":background=%s", c.background)) - sb.WriteString(fmt.Sprintf(":background=%s", c.background)) - - logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName)) - sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName)) - - if len(c.Arguments) > 0 { - logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " "))) - sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " ")))) - } - - return sb.String() -} - -func encode64(str string) string { - return base64.StdEncoding.EncodeToString([]byte(str)) -} - -func hash(str string) string { - h := sha256.New() - h.Write([]byte(str)) - - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/io/run/run.go b/internal/io/run/run.go deleted file mode 100644 index 2bb3756..0000000 --- a/internal/io/run/run.go +++ /dev/null @@ -1,150 +0,0 @@ -package run - -import ( - "bufio" - "context" - "io" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" -) - -// Run is for execute a command. -type Run struct { - command string - args []string - cmd *exec.Cmd - pgroupKilled chan struct{} -} - -// New returns a new command runner. -func New(command string, args []string) Run { - return Run{ - command: command, - args: args, - pgroupKilled: make(chan struct{}), - } -} - -// StartBackground starts running the command in background. -func (r Run) StartBackground(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, lines chan<- line.Line) (pid int, err error) { - pid = -1 - - if len(r.args) > 0 { - logger.Debug(r.command, r.args, " ") - r.cmd = exec.CommandContext(ctx, r.command, r.args...) - } else { - logger.Debug(r.command) - r.cmd = exec.CommandContext(ctx, r.command) - } - - // Create a new process group, so that kill() will only kill this command + pgroup. - r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdoutPipe, myErr := r.cmd.StdoutPipe() - if err != nil { - wg.Done() - err = myErr - return - } - - stderrPipe, myErr := r.cmd.StderrPipe() - if myErr != nil { - wg.Done() - err = myErr - return - } - - if myErr := r.cmd.Start(); err != nil { - wg.Done() - err = myErr - return - } - - if r.cmd.Process != nil { - pid = r.cmd.Process.Pid - } - - commandExited := make(chan struct{}) - - var pipeWg sync.WaitGroup - pipeWg.Add(2) - - go r.killPgroup(ctx, commandExited, pid) - go r.pipeToLines(commandExited, &pipeWg, pid, stdoutPipe, "STDOUT", lines) - go r.pipeToLines(commandExited, &pipeWg, pid, stderrPipe, "STDERR", lines) - - go func() { - exitCode := 255 - if waitErr := r.cmd.Wait(); waitErr != nil { - if exitError, ok := waitErr.(*exec.ExitError); ok { - exitCode = exitError.ExitCode() - } - } - ec <- exitCode - - // Tell pipes we are done - close(commandExited) - // Wait for process group to be killed - <-r.pgroupKilled - // Wait for the pipes to flush the contents - pipeWg.Wait() - // Now the job is truly done - wg.Done() - }() - - return -} - -func (r Run) pipeToLines(commandExited chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) { - defer wg.Done() - bufReader := bufio.NewReader(reader) - - for { - time.Sleep(time.Millisecond * 10) - lineStr, err := bufReader.ReadString('\n') - - if err != nil { - select { - case <-commandExited: - return - default: - } - continue - } - - newLine := line.Line{ - Content: []byte(lineStr), - Count: uint64(pid), - TransmittedPerc: 100, - SourceID: what, - } - - select { - case lines <- newLine: - case <-commandExited: - return - } - } -} - -func (r Run) killPgroup(ctx context.Context, commandExited chan struct{}, pid int) { - if pid == -1 { - close(r.pgroupKilled) - return - } - - if pgid, err := syscall.Getpgid(pid); err == nil { - // Kill process group when done - select { - case <-ctx.Done(): - case <-commandExited: - } - syscall.Kill(-pgid, syscall.SIGKILL) - close(r.pgroupKilled) - } -} diff --git a/internal/server/background/background.go b/internal/server/background/background.go deleted file mode 100644 index 3907448..0000000 --- a/internal/server/background/background.go +++ /dev/null @@ -1,126 +0,0 @@ -package background - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - - "github.com/mimecast/dtail/internal/io/logger" -) - -type job struct { - cancel context.CancelFunc - wg *sync.WaitGroup -} - -// Background specifies a job or command run in background on server side. -// This does not require an active DTail client SSH connection/session. -type Background struct { - mutex *sync.Mutex - jobs map[string]job -} - -// New returns a new background manager. -func New() Background { - return Background{ - jobs: make(map[string]job), - mutex: &sync.Mutex{}, - } -} - -// Add a background job. -func (b Background) Add(userName, jobName string, cancel context.CancelFunc, wg *sync.WaitGroup) error { - key := b.key(userName, jobName) - logger.Debug("background", "Add", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - if _, ok := b.jobs[key]; ok { - return errors.New("job already exists") - } - - b.jobs[key] = job{cancel, wg} - - // Clean up background job database. - go func() { - wg.Wait() - b.cancel(key) - }() - - return nil -} - -// Cancel a background job. -func (b Background) Cancel(userName, jobName string) error { - key := b.key(userName, jobName) - logger.Debug("background", "Cancel", key) - - return b.cancel(key) -} - -func (b Background) cancel(key string) error { - job, ok := b.get(key) - logger.Debug("background", "cancel", key, job, ok) - - if !ok { - return errors.New("no job to cancel") - } - - logger.Debug("background", "cancel", "run job.cancel()") - job.cancel() - logger.Debug("background", "cancel", "run job.wg.Wait()") - job.wg.Wait() - logger.Debug("background", "cancel", "run b.delete(key)") - b.delete(key) - - return nil -} - -// ListJobsC returns a channel listing all jobs of the given user. -func (b Background) ListJobsC(userName string) <-chan string { - logger.Debug("background", "ListJobC", userName) - - ch := make(chan string) - - go func() { - defer close(ch) - - b.mutex.Lock() - defer b.mutex.Unlock() - - for k := range b.jobs { - logger.Debug("ListJobsC", k, userName) - if strings.HasPrefix(k, fmt.Sprintf("%s.", userName)) { - ch <- k - } - } - }() - - return ch -} - -func (b Background) get(key string) (job, bool) { - logger.Debug("background", "get", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - job, ok := b.jobs[key] - return job, ok -} - -func (b Background) delete(key string) { - logger.Debug("background", "delete", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - delete(b.jobs, key) -} - -func (Background) key(userName, jobName string) string { - return fmt.Sprintf("%s.%s", userName, jobName) -} diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go deleted file mode 100644 index 8e5895b..0000000 --- a/internal/server/handlers/runcommand.go +++ /dev/null @@ -1,111 +0,0 @@ -package handlers - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/run" -) - -type runCommand struct { - server *ServerHandler - run run.Run -} - -func newRunCommand(server *ServerHandler) runCommand { - return runCommand{ - server: server, - } -} - -func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error { - if argc < 2 { - return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc) - } - - ec := make(chan int, 1) - var pid int - var err error - - command := strings.Join(args[1:], " ") - if strings.Contains(command, ";") || strings.Contains(command, "\n") { - if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - return nil - } - - if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - - exitCode := <-ec - r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode)) - r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode))) - - return nil -} - -func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) { - if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) { - return -1, err - } - - timestamp := time.Now().UnixNano() - scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp) - - // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so) - logger.Debug(r.server.user, "Writing temp script", scriptPath) - - script = fmt.Sprintf("#!/bin/sh\n%s", script) - if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil { - return -1, err - } - - pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs) - go func() { - wg.Wait() - logger.Debug("Deleting script", scriptPath) - os.Remove(scriptPath) - }() - - return pid, err -} - -func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) { - if len(command) == 0 { - return -1, errors.New("Empty command provided") - } - - splitted := strings.Split(command, " ") - path := splitted[0] - args := splitted[1:] - args = append(args, outerArgs...) - - qualifiedPath, err := exec.LookPath(path) - if err != nil { - return -1, err - } - - if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") { - return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath) - } - - r.run = run.New(qualifiedPath, args) - pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines) - if err != nil { - return pid, err - } - return pid, nil -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 164a280..7ad1224 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -19,7 +18,6 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -47,11 +45,10 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 - background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { h := ServerHandler{ done: internal.NewDone(), lines: make(chan line.Line, 100), @@ -63,7 +60,6 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWait globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background, } fqdn, err := os.Hostname() @@ -314,85 +310,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() }() - case "run": - // TODO: Refactor this "run" case, move code to runcommand.go - command := newRunCommand(h) - - jobName, _ := options["jobName"] - logger.Debug(h.user, "run", options) - - if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") { - if err := h.background.Cancel(h.user.Name, jobName); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - } else { - h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) - } - commandFinished() - return - } - - if val, ok := options["background"]; ok && val == "list" { - h.sendServerMessage("Listing jobs") - count := 0 - for jobName := range h.background.ListJobsC(h.user.Name) { - h.sendServerMessage(jobName) - count++ - } - h.sendServerMessage(fmt.Sprintf("Found %d jobs", count)) - commandFinished() - return - } - - str, _ := options["outerArgs"] - outerArgs := strings.Split(str, " ") - - var background bool - if val, ok := options["background"]; ok && val == "start" { - background = true - } - - var wg sync.WaitGroup - wg.Add(1) - - if background { - if timeout == 0 { - // Set default background timeout. - timeout = time.Hour * 1 - } - - commandCtx, cancel := context.WithTimeout(ctx, timeout) - - if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - commandFinished() - return - } - ctx = commandCtx - } - - if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil { - h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err)) - commandFinished() - return - } - - // Make sure that server waits for all sub-processes to finish on shutdown - go func() { h.globalServerWaitFor <- struct{}{} }() - go func() { - wg.Wait() - <-h.globalServerWaitFor - }() - - if background { - h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) - commandFinished() - return - } - - // Command run in foreground, wait for it to complete before finishing the connection. - wg.Wait() - commandFinished() - case "ack", ".ack": h.handleAckCommand(argc, args) commandFinished() diff --git a/internal/server/server.go b/internal/server/server.go index 5e2a521..d4255a3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -36,8 +35,6 @@ type Server struct { cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} - // Background jobs - background background.Background } // New returns a new server. @@ -51,7 +48,6 @@ func New() *Server { shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), - background: background.New(), } s.sshServerConfig.PasswordCallback = s.Callback @@ -183,7 +179,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) } terminate := func() { |
