summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-10 14:02:12 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 14:15:26 +0300
commitf44792c9102488774c9993b080f35c65287a64b1 (patch)
tree94b0be371a2135f4fea63bca87e14ce057fc172e /internal
parent97747ea0f3178f7f5890512d483fdccaa82846b0 (diff)
add another dmap test - reading 100 source files at once
fix a data race when reading multiple files on one server from the same session at once
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/maprclient.go2
-rw-r--r--internal/config/config.go2
-rw-r--r--internal/config/initializer.go4
-rw-r--r--internal/io/dlog/level.go5
-rw-r--r--internal/io/fs/permissions/permission.go2
-rw-r--r--internal/server/handlers/basehandler.go40
-rw-r--r--internal/server/handlers/healthhandler.go2
-rw-r--r--internal/server/handlers/serverhandler.go18
8 files changed, 48 insertions, 27 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 04f258d..074494c 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -108,7 +108,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
}
// NEXT: Make this a callback function rather trying to use polymorphism to call
-// this. This applies to all clients.
+// this. This applies to all clients. It will make the code easier to read.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
diff --git a/internal/config/config.go b/internal/config/config.go
index b99b22b..ee23829 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -16,7 +16,7 @@ const (
// DefaultSSHPort is the default DServer port.
DefaultSSHPort int = 2222
// DefaultLogLevel specifies the default log level (obviously)
- DefaultLogLevel string = "INFO"
+ DefaultLogLevel string = "info"
// DefaultClientLogger specifies the default logger for the client commands.
DefaultClientLogger string = "fout"
// DefaultServerLogger specifies the default logger for dtail server.
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index 0a913db..0c6dfdf 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -147,7 +147,9 @@ func transformClient(in *initializer, args *Args, additionalArgs []string) error
strings.ToLower(args.ServersStr) == "serverless") {
// We are not connecting to any servers.
args.Serverless = true
- in.Common.LogLevel = "warn"
+ if args.LogLevel == DefaultLogLevel {
+ in.Common.LogLevel = "warn"
+ }
}
return nil
}
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
index 0971094..05d9ed9 100644
--- a/internal/io/dlog/level.go
+++ b/internal/io/dlog/level.go
@@ -9,6 +9,7 @@ type level int
// Available log levels.
const (
+ None level = iota
Fatal level = iota
Error level = iota
Warn level = iota
@@ -26,6 +27,8 @@ var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug,
func newLevel(l string) level {
switch strings.ToLower(l) {
+ case "none":
+ return None
case "fatal":
return Fatal
case "error":
@@ -54,6 +57,8 @@ func newLevel(l string) level {
func (l level) String() string {
switch l {
+ case None:
+ return "NONE"
case Fatal:
return "FATAL"
case Error:
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
index e80dbb2..d621c09 100644
--- a/internal/io/fs/permissions/permission.go
+++ b/internal/io/fs/permissions/permission.go
@@ -9,6 +9,6 @@ import (
// ToRead is to check whether user has read permissions to a given file.
func ToRead(user, filePath string) (bool, error) {
// Only implemented for Linux, always expect true
- dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in")
+ dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in")
return true, nil
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 847e8f9..d814cc9 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -9,6 +9,7 @@ import (
"io"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -22,7 +23,7 @@ import (
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string, string, map[string]string)
+type handleCommandCb func(context.Context, int, []string, string)
type baseHandler struct {
done *internal.Done
@@ -35,11 +36,15 @@ type baseHandler struct {
user *user.User
ackCloseReceived chan struct{}
activeCommands int32
- quiet bool
- spartan bool
- serverless int32
readBuf bytes.Buffer
writeBuf bytes.Buffer
+
+ // Some global options + sync primitives required.
+ once sync.Once
+ mutex sync.Mutex
+ quiet bool
+ spartan bool
+ serverless bool
}
// Shutdown the handler.
@@ -66,7 +71,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
return
}
- if h.serverless > 0 {
+ if h.serverless {
return
}
@@ -160,8 +165,9 @@ func (h *baseHandler) handleCommand(commandStr string) {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
+ h.setOptions(options)
- h.handleCommandCb(ctx, argc, args, commandName, options)
+ h.handleCommandCb(ctx, argc, args, commandName)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -232,6 +238,28 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) {
}
}
+func (h *baseHandler) setOptions(options map[string]string) {
+ // We have to make sure that this block is executed only once.
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ // We can read the options only once, will cause a data race otherwise if
+ // changed multiple times for multiple incoming commands.
+ h.once.Do(func() {
+ if quiet, _ := options["quiet"]; quiet == "true" {
+ dlog.Server.Debug(h.user, "Enabling quiet mode")
+ h.quiet = true
+ }
+ if spartan, _ := options["spartan"]; spartan == "true" {
+ dlog.Server.Debug(h.user, "Enabling spartan mode")
+ h.spartan = true
+ }
+ if serverless, _ := options["serverless"]; serverless == "true" {
+ dlog.Server.Debug(h.user, "Enabling serverless mode")
+ h.serverless = true
+ }
+ })
+}
+
func (h *baseHandler) send(ch chan<- string, message string) {
select {
case ch <- message:
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 8d6c400..0425696 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -41,7 +41,7 @@ func NewHealthHandler(user *user.User) *HealthHandler {
}
func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int,
- args []string, commandName string, options map[string]string) {
+ args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling health command", argc, args)
switch commandName {
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index f12d590..52c4570 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -4,7 +4,6 @@ import (
"context"
"os"
"strings"
- "sync/atomic"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -55,7 +54,7 @@ func NewServerHandler(user *user.User, catLimiter,
}
func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
- args []string, commandName string, options map[string]string) {
+ args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
@@ -65,19 +64,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
}
}
- if quiet, _ := options["quiet"]; quiet == "true" {
- dlog.Server.Debug(h.user, "Enabling quiet mode")
- h.quiet = true
- }
- if spartan, _ := options["spartan"]; spartan == "true" {
- dlog.Server.Debug(h.user, "Enabling spartan mode")
- h.spartan = true
- }
- if serverless, _ := options["serverless"]; serverless == "true" {
- dlog.Server.Debug(h.user, "Enabling serverless mode")
- atomic.AddInt32(&h.serverless, 1)
- }
-
switch commandName {
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
@@ -109,7 +95,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
commandFinished()
default:
h.send(h.serverMessages, dlog.Server.Error(h.user,
- "Received unknown user command", commandName, argc, args, options))
+ "Received unknown user command", commandName, argc, args))
commandFinished()
}
}