summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-02-26 11:11:07 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-02-26 11:11:07 +0000
commit3cdc86e20cbd311fb9c85cef63876a2f39e5e74d (patch)
tree9cb50347900ff1ba4dc6a7b6e4766ebd951c2c58 /internal/io
parent6e176034306026b922c1df4231a1807f36cbe460 (diff)
can list remote jobs and can also pass outer args to scripts
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/run/run.go57
1 files changed, 33 insertions, 24 deletions
diff --git a/internal/io/run/run.go b/internal/io/run/run.go
index 18e1eb9..186528d 100644
--- a/internal/io/run/run.go
+++ b/internal/io/run/run.go
@@ -31,11 +31,7 @@ func New(command string, args []string) Run {
}
// Start running the command.
-func (r Run) Start(ctx context.Context, lines chan<- line.Line) (pid int, ec int, err error) {
- done := make(chan struct{})
- defer close(done)
-
- ec = 255
+func (r Run) StartBackground(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, lines chan<- line.Line) (pid int, err error) {
pid = -1
if len(r.args) > 0 {
@@ -45,48 +41,66 @@ func (r Run) Start(ctx context.Context, lines chan<- line.Line) (pid int, ec int
logger.Debug(r.command)
r.cmd = exec.CommandContext(ctx, r.command)
}
+
// Create a new process group, so that kill() will only kill this command + pgroup.
r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdoutPipe, myErr := r.cmd.StdoutPipe()
if err != nil {
+ wg.Done()
err = myErr
return
}
stderrPipe, myErr := r.cmd.StderrPipe()
if myErr != nil {
+ wg.Done()
err = myErr
return
}
if myErr := r.cmd.Start(); err != nil {
+ wg.Done()
err = myErr
return
}
if r.cmd.Process != nil {
pid = r.cmd.Process.Pid
- ec = 0
}
- go r.killPgroup(ctx, done, pid)
- var wg sync.WaitGroup
- wg.Add(2)
+ commandExited := make(chan struct{})
- go r.pipeToLines(done, &wg, pid, stdoutPipe, "STDOUT", lines)
- go r.pipeToLines(done, &wg, pid, stderrPipe, "STDERR", lines)
+ var pipeWg sync.WaitGroup
+ pipeWg.Add(2)
- if err = r.cmd.Wait(); err != nil {
- if exitError, ok := err.(*exec.ExitError); ok {
- ec = exitError.ExitCode()
+ go r.killPgroup(ctx, commandExited, pid)
+ go r.pipeToLines(commandExited, &pipeWg, pid, stdoutPipe, "STDOUT", lines)
+ go r.pipeToLines(commandExited, &pipeWg, pid, stderrPipe, "STDERR", lines)
+
+ go func() {
+ exitCode := 255
+ if waitErr := r.cmd.Wait(); waitErr != nil {
+ if exitError, ok := waitErr.(*exec.ExitError); ok {
+ exitCode = exitError.ExitCode()
+ }
}
- }
+ ec <- exitCode
+
+ // Tell pipes we are done
+ close(commandExited)
+ // Wait for process group to be killed
+ <-r.pgroupKilled
+ // Wait for the pipes to flush the contents
+ pipeWg.Wait()
+ // Now the job is truly done
+ wg.Done()
+ }()
return
}
-func (r Run) pipeToLines(done chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) {
+func (r Run) pipeToLines(commandExited chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) {
defer wg.Done()
bufReader := bufio.NewReader(reader)
@@ -102,7 +116,7 @@ func (r Run) pipeToLines(done chan struct{}, wg *sync.WaitGroup, pid int, reader
lineStr, err = bufReader.ReadString('\n')
}
select {
- case <-done:
+ case <-commandExited:
return
default:
}
@@ -110,12 +124,7 @@ func (r Run) pipeToLines(done chan struct{}, wg *sync.WaitGroup, pid int, reader
}
}
-// PgroupKilled identifies whether all subprocesses are killed or not.
-func (r Run) PgroupKilled() <-chan struct{} {
- return r.pgroupKilled
-}
-
-func (r Run) killPgroup(ctx context.Context, done chan struct{}, pid int) {
+func (r Run) killPgroup(ctx context.Context, commandExited chan struct{}, pid int) {
if pid == -1 {
close(r.pgroupKilled)
return
@@ -125,7 +134,7 @@ func (r Run) killPgroup(ctx context.Context, done chan struct{}, pid int) {
// Kill process group when done
select {
case <-ctx.Done():
- case <-done:
+ case <-commandExited:
}
syscall.Kill(-pgid, syscall.SIGKILL)
close(r.pgroupKilled)