summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@dionysus>2020-09-19 17:52:45 +0100
committerPaul Buetow <paul@dionysus>2020-09-19 17:52:45 +0100
commit813d2d00ec581c801d64091c7774988b559c3e93 (patch)
treed491355684c3244860cb7d2c9ae2450f709db3e0 /internal
parentec67d9833095dfbe620dd3c99ea0caba391c4b87 (diff)
refactor to have no context.Context in client handler structs
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/baseclient.go2
-rw-r--r--internal/clients/handlers/basehandler.go22
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/done.go37
-rw-r--r--internal/clients/handlers/handler.go3
-rw-r--r--internal/clients/handlers/healthhandler.go15
-rw-r--r--internal/clients/handlers/maprhandler.go4
-rw-r--r--internal/clients/handlers/withcancel.go24
-rw-r--r--internal/clients/healthclient.go2
-rw-r--r--internal/clients/remote/connection.go7
10 files changed, 73 insertions, 47 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 008a01e..d8d4fde 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -99,7 +99,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
defer func() { <-active }()
for {
- connCtx, cancel := conn.Handler.WithCancel(ctx)
+ connCtx, cancel := context.WithCancel(ctx)
defer cancel()
conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh)
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 65bbfd7..54b80ae 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -13,7 +13,7 @@ import (
)
type baseHandler struct {
- withCancel
+ done *Done
server string
shellStarted bool
commands chan string
@@ -29,6 +29,14 @@ func (h *baseHandler) Status() int {
return h.status
}
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -38,7 +46,8 @@ func (h *baseHandler) SendMessage(command string) error {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
- case <-h.ctx.Done():
+ case <-h.Done():
+ return nil
}
return nil
@@ -65,7 +74,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
select {
case command := <-h.commands:
n = copy(p, []byte(command))
- case <-h.ctx.Done():
+ case <-h.Done():
return 0, io.EOF
}
return
@@ -95,10 +104,11 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case strings.HasPrefix(message, ".syn close connection"):
h.SendMessage(".ack close connection")
select {
- case <-time.After(time.Second * 1):
+ case <-time.After(time.Second * 5):
logger.Debug("Shutting down client after timeout and sending ack to server")
- h.withCancel.shutdown()
- case <-h.ctx.Done():
+ h.Shutdown()
+ case <-h.Done():
+ return
}
case strings.HasPrefix(message, ".run exitstatus"):
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index fcd8052..2908f7c 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -19,9 +19,7 @@ func NewClientHandler(server string) *ClientHandler {
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
},
}
}
diff --git a/internal/clients/handlers/done.go b/internal/clients/handlers/done.go
new file mode 100644
index 0000000..5b1335e
--- /dev/null
+++ b/internal/clients/handlers/done.go
@@ -0,0 +1,37 @@
+package handlers
+
+import (
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+type Done struct {
+ ch chan struct{}
+ mutex sync.Mutex
+}
+
+func NewDone() *Done {
+ return &Done{
+ ch: make(chan struct{}),
+ }
+}
+
+func (d *Done) Done() <-chan struct{} {
+ return d.ch
+}
+
+func (d *Done) Shutdown() {
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+
+ logger.Debug("Done.Shutdown()")
+
+ select {
+ case <-d.ch:
+ return
+ default:
+ logger.Debug("Done.Shutdown() -> close")
+ close(d.ch)
+ }
+}
diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go
index c53ca34..afa87e2 100644
--- a/internal/clients/handlers/handler.go
+++ b/internal/clients/handlers/handler.go
@@ -1,7 +1,6 @@
package handlers
import (
- "context"
"io"
)
@@ -11,6 +10,6 @@ type Handler interface {
SendMessage(command string) error
Server() string
Status() int
- WithCancel(ctx context.Context) (context.Context, context.CancelFunc)
+ Shutdown()
Done() <-chan struct{}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 9051015..9fc2671 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -8,7 +8,7 @@ import (
// HealthHandler implements the handler required for health checks.
type HealthHandler struct {
- withCancel
+ done *Done
// Buffer of incoming data from server.
receiveBuf []byte
// To send commands to the server.
@@ -27,9 +27,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
receive: receive,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
}
return &h
@@ -45,12 +43,21 @@ func (h *HealthHandler) Status() int {
return h.status
}
+func (h *HealthHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *HealthHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage sends a DTail command to the server.
func (h *HealthHandler) SendMessage(command string) error {
select {
case h.commands <- fmt.Sprintf("%s;", command):
case <-time.NewTimer(time.Second * 10).C:
return errors.New("Timed out sending command " + command)
+ case <-h.Done():
}
return nil
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index b908f3b..5d98690 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -24,9 +24,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
diff --git a/internal/clients/handlers/withcancel.go b/internal/clients/handlers/withcancel.go
deleted file mode 100644
index 7c9cf4e..0000000
--- a/internal/clients/handlers/withcancel.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package handlers
-
-import "context"
-
-type withCancel struct {
- ctx context.Context
- done chan struct{}
-}
-
-// WithCancel sets and returns the context used.
-func (w *withCancel) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) {
- cancelCtx, cancel := context.WithCancel(ctx)
- w.ctx = cancelCtx
-
- return cancelCtx, cancel
-}
-
-func (w *withCancel) Done() <-chan struct{} {
- return w.done
-}
-
-func (w *withCancel) shutdown() {
- close(w.done)
-}
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index 7313583..e93f6be 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -50,7 +50,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) {
conn.Handler = handlers.NewHealthHandler(c.server, receive)
conn.Commands = []string{c.mode.String()}
- connCtx, cancel := conn.Handler.WithCancel(ctx)
+ connCtx, cancel := context.WithCancel(ctx)
go conn.Start(connCtx, cancel, throttleCh, statsCh)
for {
diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go
index 2d97d14..b29ffed 100644
--- a/internal/clients/remote/connection.go
+++ b/internal/clients/remote/connection.go
@@ -177,21 +177,21 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess
}
go func() {
- defer cancel()
io.Copy(stdinPipe, c.Handler)
+ cancel()
}()
go func() {
- defer cancel()
io.Copy(c.Handler, stdoutPipe)
+ cancel()
}()
go func() {
- defer cancel()
select {
case <-c.Handler.Done():
case <-ctx.Done():
}
+ cancel()
}()
// Send all commands to client.
@@ -207,5 +207,6 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess
}
<-ctx.Done()
+ c.Handler.Shutdown()
return nil
}