summaryrefslogtreecommitdiff
path: root/internal/server/background
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-02-26 11:11:07 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-02-26 11:11:07 +0000
commit3cdc86e20cbd311fb9c85cef63876a2f39e5e74d (patch)
tree9cb50347900ff1ba4dc6a7b6e4766ebd951c2c58 /internal/server/background
parent6e176034306026b922c1df4231a1807f36cbe460 (diff)
can list remote jobs and can also pass outer args to scripts
Diffstat (limited to 'internal/server/background')
-rw-r--r--internal/server/background/background.go66
1 files changed, 54 insertions, 12 deletions
diff --git a/internal/server/background/background.go b/internal/server/background/background.go
index 05a502f..51ef052 100644
--- a/internal/server/background/background.go
+++ b/internal/server/background/background.go
@@ -3,12 +3,16 @@ package background
import (
"context"
"errors"
+ "fmt"
+ "strings"
"sync"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
type job struct {
cancel context.CancelFunc
- done <-chan struct{}
+ wg *sync.WaitGroup
}
// Background specifies a job or command run in background on server side.
@@ -27,43 +31,81 @@ func New() Background {
}
// Add a background job.
-func (b Background) Add(name string, cancel context.CancelFunc, done <-chan struct{}) error {
+func (b Background) Add(userName, jobName string, cancel context.CancelFunc, wg *sync.WaitGroup) error {
+ key := b.key(userName, jobName)
+
b.mutex.Lock()
defer b.mutex.Unlock()
- if _, ok := b.jobs[name]; ok {
+ if _, ok := b.jobs[key]; ok {
return errors.New("job already exists")
}
- b.jobs[name] = job{cancel, done}
+ b.jobs[key] = job{cancel, wg}
+
+ // Clean up background job database.
+ go func() {
+ wg.Wait()
+ b.cancel(key)
+ }()
+
return nil
}
// Cancel a background job.
-func (b Background) Cancel(name string) error {
- job, ok := b.get(name)
+func (b Background) Cancel(userName, jobName string) error {
+ return b.cancel(b.key(userName, jobName))
+}
+
+func (b Background) cancel(key string) error {
+ job, ok := b.get(key)
if !ok {
return errors.New("no job to cancel")
}
job.cancel()
- <-job.done
- b.delete(name)
+ job.wg.Wait()
+ b.delete(key)
return nil
}
-func (b Background) get(name string) (job, bool) {
+// ListJobsC returns a channel listing all jobs of the given user.
+func (b Background) ListJobsC(userName string) <-chan string {
+ ch := make(chan string)
+
+ go func() {
+ defer close(ch)
+
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ for k, _ := range b.jobs {
+ logger.Debug("ListJobsC", k, userName)
+ if strings.HasPrefix(k, fmt.Sprintf("%s.", userName)) {
+ ch <- k
+ }
+ }
+ }()
+
+ return ch
+}
+
+func (b Background) get(key string) (job, bool) {
b.mutex.Lock()
defer b.mutex.Unlock()
- job, ok := b.jobs[name]
+ job, ok := b.jobs[key]
return job, ok
}
-func (b Background) delete(name string) {
+func (b Background) delete(key string) {
b.mutex.Lock()
defer b.mutex.Unlock()
- delete(b.jobs, name)
+ delete(b.jobs, key)
+}
+
+func (Background) key(userName, jobName string) string {
+ return fmt.Sprintf("%s.%s", userName, jobName)
}