diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-10 14:02:12 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 14:15:26 +0300 |
| commit | f44792c9102488774c9993b080f35c65287a64b1 (patch) | |
| tree | 94b0be371a2135f4fea63bca87e14ce057fc172e /internal | |
| parent | 97747ea0f3178f7f5890512d483fdccaa82846b0 (diff) | |
add another dmap test - reading 100 source files at once
fix a data race when reading multiple files on one server from the same session at once
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/maprclient.go | 2 | ||||
| -rw-r--r-- | internal/config/config.go | 2 | ||||
| -rw-r--r-- | internal/config/initializer.go | 4 | ||||
| -rw-r--r-- | internal/io/dlog/level.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/permissions/permission.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 40 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 18 |
8 files changed, 48 insertions, 27 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 04f258d..074494c 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -108,7 +108,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i } // NEXT: Make this a callback function rather trying to use polymorphism to call -// this. This applies to all clients. +// this. This applies to all clients. It will make the code easier to read. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } diff --git a/internal/config/config.go b/internal/config/config.go index b99b22b..ee23829 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,7 +16,7 @@ const ( // DefaultSSHPort is the default DServer port. DefaultSSHPort int = 2222 // DefaultLogLevel specifies the default log level (obviously) - DefaultLogLevel string = "INFO" + DefaultLogLevel string = "info" // DefaultClientLogger specifies the default logger for the client commands. DefaultClientLogger string = "fout" // DefaultServerLogger specifies the default logger for dtail server. diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 0a913db..0c6dfdf 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -147,7 +147,9 @@ func transformClient(in *initializer, args *Args, additionalArgs []string) error strings.ToLower(args.ServersStr) == "serverless") { // We are not connecting to any servers. args.Serverless = true - in.Common.LogLevel = "warn" + if args.LogLevel == DefaultLogLevel { + in.Common.LogLevel = "warn" + } } return nil } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 0971094..05d9ed9 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -9,6 +9,7 @@ type level int // Available log levels. const ( + None level = iota Fatal level = iota Error level = iota Warn level = iota @@ -26,6 +27,8 @@ var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, func newLevel(l string) level { switch strings.ToLower(l) { + case "none": + return None case "fatal": return Fatal case "error": @@ -54,6 +57,8 @@ func newLevel(l string) level { func (l level) String() string { switch l { + case None: + return "NONE" case Fatal: return "FATAL" case Error: diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index e80dbb2..d621c09 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -9,6 +9,6 @@ import ( // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in") + dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in") return true, nil } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 847e8f9..d814cc9 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -9,6 +9,7 @@ import ( "io" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -22,7 +23,7 @@ import ( user "github.com/mimecast/dtail/internal/user/server" ) -type handleCommandCb func(context.Context, int, []string, string, map[string]string) +type handleCommandCb func(context.Context, int, []string, string) type baseHandler struct { done *internal.Done @@ -35,11 +36,15 @@ type baseHandler struct { user *user.User ackCloseReceived chan struct{} activeCommands int32 - quiet bool - spartan bool - serverless int32 readBuf bytes.Buffer writeBuf bytes.Buffer + + // Some global options + sync primitives required. + once sync.Once + mutex sync.Mutex + quiet bool + spartan bool + serverless bool } // Shutdown the handler. @@ -66,7 +71,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return } - if h.serverless > 0 { + if h.serverless { return } @@ -160,8 +165,9 @@ func (h *baseHandler) handleCommand(commandStr string) { h.send(h.serverMessages, dlog.Server.Error(h.user, err)) return } + h.setOptions(options) - h.handleCommandCb(ctx, argc, args, commandName, options) + h.handleCommandCb(ctx, argc, args, commandName) } func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { @@ -232,6 +238,28 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) { } } +func (h *baseHandler) setOptions(options map[string]string) { + // We have to make sure that this block is executed only once. + h.mutex.Lock() + defer h.mutex.Unlock() + // We can read the options only once, will cause a data race otherwise if + // changed multiple times for multiple incoming commands. + h.once.Do(func() { + if quiet, _ := options["quiet"]; quiet == "true" { + dlog.Server.Debug(h.user, "Enabling quiet mode") + h.quiet = true + } + if spartan, _ := options["spartan"]; spartan == "true" { + dlog.Server.Debug(h.user, "Enabling spartan mode") + h.spartan = true + } + if serverless, _ := options["serverless"]; serverless == "true" { + dlog.Server.Debug(h.user, "Enabling serverless mode") + h.serverless = true + } + }) +} + func (h *baseHandler) send(ch chan<- string, message string) { select { case ch <- message: diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index 8d6c400..0425696 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -41,7 +41,7 @@ func NewHealthHandler(user *user.User) *HealthHandler { } func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, - args []string, commandName string, options map[string]string) { + args []string, commandName string) { dlog.Server.Debug(h.user, "Handling health command", argc, args) switch commandName { diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index f12d590..52c4570 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,7 +4,6 @@ import ( "context" "os" "strings" - "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" @@ -55,7 +54,7 @@ func NewServerHandler(user *user.User, catLimiter, } func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, - args []string, commandName string, options map[string]string) { + args []string, commandName string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -65,19 +64,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, } } - if quiet, _ := options["quiet"]; quiet == "true" { - dlog.Server.Debug(h.user, "Enabling quiet mode") - h.quiet = true - } - if spartan, _ := options["spartan"]; spartan == "true" { - dlog.Server.Debug(h.user, "Enabling spartan mode") - h.spartan = true - } - if serverless, _ := options["serverless"]; serverless == "true" { - dlog.Server.Debug(h.user, "Enabling serverless mode") - atomic.AddInt32(&h.serverless, 1) - } - switch commandName { case "grep", "cat": command := newReadCommand(h, omode.CatClient) @@ -109,7 +95,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, commandFinished() default: h.send(h.serverMessages, dlog.Server.Error(h.user, - "Received unknown user command", commandName, argc, args, options)) + "Received unknown user command", commandName, argc, args)) commandFinished() } } |
