summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-10 14:02:12 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 14:15:26 +0300
commitf44792c9102488774c9993b080f35c65287a64b1 (patch)
tree94b0be371a2135f4fea63bca87e14ce057fc172e /internal/server/handlers/basehandler.go
parent97747ea0f3178f7f5890512d483fdccaa82846b0 (diff)
add another dmap test - reading 100 source files at once
fix a data race when reading multiple files on one server from the same session at once
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go40
1 files changed, 34 insertions, 6 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 847e8f9..d814cc9 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -9,6 +9,7 @@ import (
"io"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -22,7 +23,7 @@ import (
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string, string, map[string]string)
+type handleCommandCb func(context.Context, int, []string, string)
type baseHandler struct {
done *internal.Done
@@ -35,11 +36,15 @@ type baseHandler struct {
user *user.User
ackCloseReceived chan struct{}
activeCommands int32
- quiet bool
- spartan bool
- serverless int32
readBuf bytes.Buffer
writeBuf bytes.Buffer
+
+ // Some global options + sync primitives required.
+ once sync.Once
+ mutex sync.Mutex
+ quiet bool
+ spartan bool
+ serverless bool
}
// Shutdown the handler.
@@ -66,7 +71,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
return
}
- if h.serverless > 0 {
+ if h.serverless {
return
}
@@ -160,8 +165,9 @@ func (h *baseHandler) handleCommand(commandStr string) {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
+ h.setOptions(options)
- h.handleCommandCb(ctx, argc, args, commandName, options)
+ h.handleCommandCb(ctx, argc, args, commandName)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -232,6 +238,28 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) {
}
}
+func (h *baseHandler) setOptions(options map[string]string) {
+ // We have to make sure that this block is executed only once.
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ // We can read the options only once, will cause a data race otherwise if
+ // changed multiple times for multiple incoming commands.
+ h.once.Do(func() {
+ if quiet, _ := options["quiet"]; quiet == "true" {
+ dlog.Server.Debug(h.user, "Enabling quiet mode")
+ h.quiet = true
+ }
+ if spartan, _ := options["spartan"]; spartan == "true" {
+ dlog.Server.Debug(h.user, "Enabling spartan mode")
+ h.spartan = true
+ }
+ if serverless, _ := options["serverless"]; serverless == "true" {
+ dlog.Server.Debug(h.user, "Enabling serverless mode")
+ h.serverless = true
+ }
+ })
+}
+
func (h *baseHandler) send(ch chan<- string, message string) {
select {
case ch <- message: