diff options
| author | Paul Buetow <paul@dionysus> | 2020-09-19 17:52:45 +0100 |
|---|---|---|
| committer | Paul Buetow <paul@dionysus> | 2020-09-19 17:52:45 +0100 |
| commit | 813d2d00ec581c801d64091c7774988b559c3e93 (patch) | |
| tree | d491355684c3244860cb7d2c9ae2450f709db3e0 /internal | |
| parent | ec67d9833095dfbe620dd3c99ea0caba391c4b87 (diff) | |
refactor to have no context.Context in client handler structs
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/baseclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 22 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/done.go | 37 | ||||
| -rw-r--r-- | internal/clients/handlers/handler.go | 3 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 15 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/withcancel.go | 24 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/remote/connection.go | 7 |
10 files changed, 73 insertions, 47 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 008a01e..d8d4fde 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -99,7 +99,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con defer func() { <-active }() for { - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) defer cancel() conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh) diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 65bbfd7..54b80ae 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -13,7 +13,7 @@ import ( ) type baseHandler struct { - withCancel + done *Done server string shellStarted bool commands chan string @@ -29,6 +29,14 @@ func (h *baseHandler) Status() int { return h.status } +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -38,7 +46,8 @@ func (h *baseHandler) SendMessage(command string) error { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) - case <-h.ctx.Done(): + case <-h.Done(): + return nil } return nil @@ -65,7 +74,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { select { case command := <-h.commands: n = copy(p, []byte(command)) - case <-h.ctx.Done(): + case <-h.Done(): return 0, io.EOF } return @@ -95,10 +104,11 @@ func (h *baseHandler) handleHiddenMessage(message string) { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") select { - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): logger.Debug("Shutting down client after timeout and sending ack to server") - h.withCancel.shutdown() - case <-h.ctx.Done(): + h.Shutdown() + case <-h.Done(): + return } case strings.HasPrefix(message, ".run exitstatus"): diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index fcd8052..2908f7c 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -19,9 +19,7 @@ func NewClientHandler(server string) *ClientHandler { shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: NewDone(), }, } } diff --git a/internal/clients/handlers/done.go b/internal/clients/handlers/done.go new file mode 100644 index 0000000..5b1335e --- /dev/null +++ b/internal/clients/handlers/done.go @@ -0,0 +1,37 @@ +package handlers + +import ( + "sync" + + "github.com/mimecast/dtail/internal/io/logger" +) + +type Done struct { + ch chan struct{} + mutex sync.Mutex +} + +func NewDone() *Done { + return &Done{ + ch: make(chan struct{}), + } +} + +func (d *Done) Done() <-chan struct{} { + return d.ch +} + +func (d *Done) Shutdown() { + d.mutex.Lock() + defer d.mutex.Unlock() + + logger.Debug("Done.Shutdown()") + + select { + case <-d.ch: + return + default: + logger.Debug("Done.Shutdown() -> close") + close(d.ch) + } +} diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go index c53ca34..afa87e2 100644 --- a/internal/clients/handlers/handler.go +++ b/internal/clients/handlers/handler.go @@ -1,7 +1,6 @@ package handlers import ( - "context" "io" ) @@ -11,6 +10,6 @@ type Handler interface { SendMessage(command string) error Server() string Status() int - WithCancel(ctx context.Context) (context.Context, context.CancelFunc) + Shutdown() Done() <-chan struct{} } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 9051015..9fc2671 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,7 +8,7 @@ import ( // HealthHandler implements the handler required for health checks. type HealthHandler struct { - withCancel + done *Done // Buffer of incoming data from server. receiveBuf []byte // To send commands to the server. @@ -27,9 +27,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler { receive: receive, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: NewDone(), } return &h @@ -45,12 +43,21 @@ func (h *HealthHandler) Status() int { return h.status } +func (h *HealthHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *HealthHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage sends a DTail command to the server. func (h *HealthHandler) SendMessage(command string) error { select { case h.commands <- fmt.Sprintf("%s;", command): case <-time.NewTimer(time.Second * 10).C: return errors.New("Timed out sending command " + command) + case <-h.Done(): } return nil diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index b908f3b..5d98690 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -24,9 +24,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: NewDone(), }, query: query, aggregate: client.NewAggregate(server, query, globalGroup), diff --git a/internal/clients/handlers/withcancel.go b/internal/clients/handlers/withcancel.go deleted file mode 100644 index 7c9cf4e..0000000 --- a/internal/clients/handlers/withcancel.go +++ /dev/null @@ -1,24 +0,0 @@ -package handlers - -import "context" - -type withCancel struct { - ctx context.Context - done chan struct{} -} - -// WithCancel sets and returns the context used. -func (w *withCancel) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) { - cancelCtx, cancel := context.WithCancel(ctx) - w.ctx = cancelCtx - - return cancelCtx, cancel -} - -func (w *withCancel) Done() <-chan struct{} { - return w.done -} - -func (w *withCancel) shutdown() { - close(w.done) -} diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 7313583..e93f6be 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -50,7 +50,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) { conn.Handler = handlers.NewHealthHandler(c.server, receive) conn.Commands = []string{c.mode.String()} - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) go conn.Start(connCtx, cancel, throttleCh, statsCh) for { diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go index 2d97d14..b29ffed 100644 --- a/internal/clients/remote/connection.go +++ b/internal/clients/remote/connection.go @@ -177,21 +177,21 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } go func() { - defer cancel() io.Copy(stdinPipe, c.Handler) + cancel() }() go func() { - defer cancel() io.Copy(c.Handler, stdoutPipe) + cancel() }() go func() { - defer cancel() select { case <-c.Handler.Done(): case <-ctx.Done(): } + cancel() }() // Send all commands to client. @@ -207,5 +207,6 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } <-ctx.Done() + c.Handler.Shutdown() return nil } |
