diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-09-10 14:57:52 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-09-10 14:57:52 +0100 |
| commit | 1c7c0dbb5174b5255912183b9ec5870ccdef3426 (patch) | |
| tree | 50a5e492ac42221178320fb06a16c34e1b0a4bba | |
| parent | 40cbef0c243042521bdf589b3c4549ff32508592 (diff) | |
printing client stats every other second only if the connection count has changed or when SIGUSR1 or SIGINFO recieved
| -rw-r--r-- | cmd/dcat/main.go | 3 | ||||
| -rw-r--r-- | cmd/dgrep/main.go | 3 | ||||
| -rw-r--r-- | cmd/dmap/main.go | 3 | ||||
| -rw-r--r-- | cmd/drun/main.go | 3 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 8 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 6 | ||||
| -rw-r--r-- | internal/clients/client.go | 2 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/stats.go | 20 | ||||
| -rw-r--r-- | internal/io/logger/logger.go | 3 | ||||
| -rw-r--r-- | internal/regex/flag.go | 42 | ||||
| -rw-r--r-- | internal/server/continuous.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 5 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 2 |
14 files changed, 79 insertions, 27 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index e0931d8..f0ea946 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/user" "github.com/mimecast/dtail/internal/version" ) @@ -54,7 +55,7 @@ func main() { panic(err) } - status := client.Start(ctx) + status := client.Start(ctx, signal.StatsCh(ctx)) logger.Flush() os.Exit(status) } diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index e58ea8f..d1fdc21 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/user" "github.com/mimecast/dtail/internal/version" ) @@ -62,7 +63,7 @@ func main() { panic(err) } - status := client.Start(ctx) + status := client.Start(ctx, signal.StatsCh(ctx)) logger.Flush() os.Exit(status) } diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index 470ce76..279b343 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/user" "github.com/mimecast/dtail/internal/version" @@ -61,7 +62,7 @@ func main() { panic(err) } - status := client.Start(ctx) + status := client.Start(ctx, signal.StatsCh(ctx)) logger.Flush() os.Exit(status) } diff --git a/cmd/drun/main.go b/cmd/drun/main.go index c948c0f..ffdf7bf 100644 --- a/cmd/drun/main.go +++ b/cmd/drun/main.go @@ -11,6 +11,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/user" "github.com/mimecast/dtail/internal/version" ) @@ -63,7 +64,7 @@ func main() { panic(err) } - status := client.Start(ctx) + status := client.Start(ctx, signal.StatsCh(ctx)) logger.Flush() os.Exit(status) } diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index 1be2a29..ff9028b 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -14,6 +14,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/user" "github.com/mimecast/dtail/internal/version" @@ -105,12 +106,7 @@ func main() { } } - /* - sigCh := make(chan os.Signal) - signal.Notify(sigCh, os.Interrupt, syscall.SIGINFO) - */ - - status := client.Start(ctx) + status := client.Start(ctx, signal.StatsCh(ctx)) logger.Flush() os.Exit(status) } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index ba18f95..008a01e 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -66,11 +66,11 @@ func (c *baseClient) makeConnections(maker maker) { c.stats = newTailStats(len(c.connections)) } -func (c *baseClient) Start(ctx context.Context) (status int) { +func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) - // Periodically print out connection stats to the client. - go c.stats.periodicLogStats(ctx, c.throttleCh) + // Print client stats every time something on statsCh is recieved. + go c.stats.Start(ctx, c.throttleCh, statsCh) // Keep count of active connections active := make(chan struct{}, len(c.connections)) diff --git a/internal/clients/client.go b/internal/clients/client.go index 1fc5e23..eb8452d 100644 --- a/internal/clients/client.go +++ b/internal/clients/client.go @@ -4,5 +4,5 @@ import "context" // Client is the interface for the end user command line client. type Client interface { - Start(ctx context.Context) int + Start(ctx context.Context, statsCh <-chan struct{}) int } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index d154c9d..581db44 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -94,10 +94,10 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* } // Start starts the mapreduce client. -func (c *MaprClient) Start(ctx context.Context) (status int) { +func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { go c.periodicReportResults(ctx) - status = c.baseClient.Start(ctx) + status = c.baseClient.Start(ctx, statsCh) if c.cumulative { logger.Info("Received final mapreduce result") c.reportResults() diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 481d157..a6ac0c5 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -5,6 +5,7 @@ import ( "fmt" "runtime" "sync" + "time" "github.com/mimecast/dtail/internal/io/logger" ) @@ -29,12 +30,18 @@ func newTailStats(connectionsTotal int) *stats { } } -func (s *stats) logStatsOnSignal(ctx context.Context, throttleCh chan struct{}, sigCh chan struct{}) { +// Start starts printing client connection stats every time a signal is recieved or +// connection count has changed. +func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) { var connectedLast int for { + var force bool + select { - case <-sigCh: + case <-statsCh: + force = true + case <-time.After(time.Second * 2): case <-ctx.Done(): return } @@ -43,13 +50,16 @@ func (s *stats) logStatsOnSignal(ctx context.Context, throttleCh chan struct{}, throttle := len(throttleCh) newConnections := connected - connectedLast - s.log(connected, newConnections, throttle) - s.mutex.Lock() - defer s.mutex.Unlock() + if connected == connectedLast && !force { + continue + } + s.log(connected, newConnections, throttle) connectedLast = connected + s.mutex.Lock() s.connected = connected + s.mutex.Unlock() } } diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 8bfb94f..d059cbb 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -223,9 +223,6 @@ func log(what string, severity string, args []interface{}) string { if Mode.Nothing { return "" } - if severity != errorStr && severity != fatalStr { - return "" - } messages := []string{severity} diff --git a/internal/regex/flag.go b/internal/regex/flag.go new file mode 100644 index 0000000..d3ff712 --- /dev/null +++ b/internal/regex/flag.go @@ -0,0 +1,42 @@ +package regex + +import "fmt" + +type Flag int + +const ( + // Undefined flag set + Undefined Flag = iota + // Default is the default regex mode (positive matching) + Default Flag = iota + // Invert inverts the regex + Invert Flag = iota + // Noop means no regex matching enabled, all defaults to true + Noop Flag = iota +) + +func NewFlag(str string) (Flag, error) { + switch str { + case "default": + return Default, nil + case "invert": + return Invert, nil + case "noop": + return Noop, nil + default: + return Undefined, fmt.Errorf("unknown regex flag '%s', setting to 'undefined'", str) + } +} + +func (f Flag) String() string { + switch f { + case Default: + return "default" + case Invert: + return "invert" + case Noop: + return "noop" + default: + return "undefined" + } +} diff --git a/internal/server/continuous.go b/internal/server/continuous.go index f3993a1..583d136 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -92,7 +92,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { } logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx) + status := client.Start(jobCtx, make(chan struct{})) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 34d2e30..7017f3e 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -85,8 +85,11 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca func (h *ServerHandler) Read(p []byte) (n int, err error) { for { select { - case message := <-h.serverMessages: + if len(message) == 0 { + logger.Warn(h.user, "Empty message recieved") + return + } if message[0] == '.' { // Handle hidden message (don't display to the user, interpreted by dtail client) wholePayload := []byte(fmt.Sprintf("%s\n", message)) diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 3345d69..9d76a3b 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -93,7 +93,7 @@ func (s *scheduler) runJobs(ctx context.Context) { defer cancel() logger.Info(fmt.Sprintf("Starting job %s", job.Name)) - status := client.Start(jobCtx) + status := client.Start(jobCtx, make(chan struct{})) logMessage := fmt.Sprintf("Job exited with status %d", status) if status != 0 { |
