summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-10 14:02:12 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 14:15:26 +0300
commitf44792c9102488774c9993b080f35c65287a64b1 (patch)
tree94b0be371a2135f4fea63bca87e14ce057fc172e /internal/server
parent97747ea0f3178f7f5890512d483fdccaa82846b0 (diff)
add another dmap test - reading 100 source files at once
fix a data race when reading multiple files on one server from the same session at once
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go40
-rw-r--r--internal/server/handlers/healthhandler.go2
-rw-r--r--internal/server/handlers/serverhandler.go18
3 files changed, 37 insertions, 23 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 847e8f9..d814cc9 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -9,6 +9,7 @@ import (
"io"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -22,7 +23,7 @@ import (
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string, string, map[string]string)
+type handleCommandCb func(context.Context, int, []string, string)
type baseHandler struct {
done *internal.Done
@@ -35,11 +36,15 @@ type baseHandler struct {
user *user.User
ackCloseReceived chan struct{}
activeCommands int32
- quiet bool
- spartan bool
- serverless int32
readBuf bytes.Buffer
writeBuf bytes.Buffer
+
+ // Some global options + sync primitives required.
+ once sync.Once
+ mutex sync.Mutex
+ quiet bool
+ spartan bool
+ serverless bool
}
// Shutdown the handler.
@@ -66,7 +71,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
return
}
- if h.serverless > 0 {
+ if h.serverless {
return
}
@@ -160,8 +165,9 @@ func (h *baseHandler) handleCommand(commandStr string) {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
+ h.setOptions(options)
- h.handleCommandCb(ctx, argc, args, commandName, options)
+ h.handleCommandCb(ctx, argc, args, commandName)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -232,6 +238,28 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) {
}
}
+func (h *baseHandler) setOptions(options map[string]string) {
+ // We have to make sure that this block is executed only once.
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ // We can read the options only once, will cause a data race otherwise if
+ // changed multiple times for multiple incoming commands.
+ h.once.Do(func() {
+ if quiet, _ := options["quiet"]; quiet == "true" {
+ dlog.Server.Debug(h.user, "Enabling quiet mode")
+ h.quiet = true
+ }
+ if spartan, _ := options["spartan"]; spartan == "true" {
+ dlog.Server.Debug(h.user, "Enabling spartan mode")
+ h.spartan = true
+ }
+ if serverless, _ := options["serverless"]; serverless == "true" {
+ dlog.Server.Debug(h.user, "Enabling serverless mode")
+ h.serverless = true
+ }
+ })
+}
+
func (h *baseHandler) send(ch chan<- string, message string) {
select {
case ch <- message:
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 8d6c400..0425696 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -41,7 +41,7 @@ func NewHealthHandler(user *user.User) *HealthHandler {
}
func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int,
- args []string, commandName string, options map[string]string) {
+ args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling health command", argc, args)
switch commandName {
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index f12d590..52c4570 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -4,7 +4,6 @@ import (
"context"
"os"
"strings"
- "sync/atomic"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -55,7 +54,7 @@ func NewServerHandler(user *user.User, catLimiter,
}
func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
- args []string, commandName string, options map[string]string) {
+ args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
@@ -65,19 +64,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
}
}
- if quiet, _ := options["quiet"]; quiet == "true" {
- dlog.Server.Debug(h.user, "Enabling quiet mode")
- h.quiet = true
- }
- if spartan, _ := options["spartan"]; spartan == "true" {
- dlog.Server.Debug(h.user, "Enabling spartan mode")
- h.spartan = true
- }
- if serverless, _ := options["serverless"]; serverless == "true" {
- dlog.Server.Debug(h.user, "Enabling serverless mode")
- atomic.AddInt32(&h.serverless, 1)
- }
-
switch commandName {
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
@@ -109,7 +95,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
commandFinished()
default:
h.send(h.serverMessages, dlog.Server.Error(h.user,
- "Received unknown user command", commandName, argc, args, options))
+ "Received unknown user command", commandName, argc, args))
commandFinished()
}
}