summaryrefslogtreecommitdiff
path: root/internal/clients/handlers
diff options
context:
space:
mode:
authorPaul Buetow <paul@dionysus>2020-09-19 17:52:45 +0100
committerPaul Buetow <paul@dionysus>2020-09-19 17:52:45 +0100
commit813d2d00ec581c801d64091c7774988b559c3e93 (patch)
treed491355684c3244860cb7d2c9ae2450f709db3e0 /internal/clients/handlers
parentec67d9833095dfbe620dd3c99ea0caba391c4b87 (diff)
refactor to have no context.Context in client handler structs
Diffstat (limited to 'internal/clients/handlers')
-rw-r--r--internal/clients/handlers/basehandler.go22
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/done.go37
-rw-r--r--internal/clients/handlers/handler.go3
-rw-r--r--internal/clients/handlers/healthhandler.go15
-rw-r--r--internal/clients/handlers/maprhandler.go4
-rw-r--r--internal/clients/handlers/withcancel.go24
7 files changed, 67 insertions, 42 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 65bbfd7..54b80ae 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -13,7 +13,7 @@ import (
)
type baseHandler struct {
- withCancel
+ done *Done
server string
shellStarted bool
commands chan string
@@ -29,6 +29,14 @@ func (h *baseHandler) Status() int {
return h.status
}
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -38,7 +46,8 @@ func (h *baseHandler) SendMessage(command string) error {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
- case <-h.ctx.Done():
+ case <-h.Done():
+ return nil
}
return nil
@@ -65,7 +74,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
select {
case command := <-h.commands:
n = copy(p, []byte(command))
- case <-h.ctx.Done():
+ case <-h.Done():
return 0, io.EOF
}
return
@@ -95,10 +104,11 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case strings.HasPrefix(message, ".syn close connection"):
h.SendMessage(".ack close connection")
select {
- case <-time.After(time.Second * 1):
+ case <-time.After(time.Second * 5):
logger.Debug("Shutting down client after timeout and sending ack to server")
- h.withCancel.shutdown()
- case <-h.ctx.Done():
+ h.Shutdown()
+ case <-h.Done():
+ return
}
case strings.HasPrefix(message, ".run exitstatus"):
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index fcd8052..2908f7c 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -19,9 +19,7 @@ func NewClientHandler(server string) *ClientHandler {
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
},
}
}
diff --git a/internal/clients/handlers/done.go b/internal/clients/handlers/done.go
new file mode 100644
index 0000000..5b1335e
--- /dev/null
+++ b/internal/clients/handlers/done.go
@@ -0,0 +1,37 @@
+package handlers
+
+import (
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+type Done struct {
+ ch chan struct{}
+ mutex sync.Mutex
+}
+
+func NewDone() *Done {
+ return &Done{
+ ch: make(chan struct{}),
+ }
+}
+
+func (d *Done) Done() <-chan struct{} {
+ return d.ch
+}
+
+func (d *Done) Shutdown() {
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+
+ logger.Debug("Done.Shutdown()")
+
+ select {
+ case <-d.ch:
+ return
+ default:
+ logger.Debug("Done.Shutdown() -> close")
+ close(d.ch)
+ }
+}
diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go
index c53ca34..afa87e2 100644
--- a/internal/clients/handlers/handler.go
+++ b/internal/clients/handlers/handler.go
@@ -1,7 +1,6 @@
package handlers
import (
- "context"
"io"
)
@@ -11,6 +10,6 @@ type Handler interface {
SendMessage(command string) error
Server() string
Status() int
- WithCancel(ctx context.Context) (context.Context, context.CancelFunc)
+ Shutdown()
Done() <-chan struct{}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 9051015..9fc2671 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -8,7 +8,7 @@ import (
// HealthHandler implements the handler required for health checks.
type HealthHandler struct {
- withCancel
+ done *Done
// Buffer of incoming data from server.
receiveBuf []byte
// To send commands to the server.
@@ -27,9 +27,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
receive: receive,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
}
return &h
@@ -45,12 +43,21 @@ func (h *HealthHandler) Status() int {
return h.status
}
+func (h *HealthHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *HealthHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage sends a DTail command to the server.
func (h *HealthHandler) SendMessage(command string) error {
select {
case h.commands <- fmt.Sprintf("%s;", command):
case <-time.NewTimer(time.Second * 10).C:
return errors.New("Timed out sending command " + command)
+ case <-h.Done():
}
return nil
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index b908f3b..5d98690 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -24,9 +24,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: NewDone(),
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
diff --git a/internal/clients/handlers/withcancel.go b/internal/clients/handlers/withcancel.go
deleted file mode 100644
index 7c9cf4e..0000000
--- a/internal/clients/handlers/withcancel.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package handlers
-
-import "context"
-
-type withCancel struct {
- ctx context.Context
- done chan struct{}
-}
-
-// WithCancel sets and returns the context used.
-func (w *withCancel) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) {
- cancelCtx, cancel := context.WithCancel(ctx)
- w.ctx = cancelCtx
-
- return cancelCtx, cancel
-}
-
-func (w *withCancel) Done() <-chan struct{} {
- return w.done
-}
-
-func (w *withCancel) shutdown() {
- close(w.done)
-}