summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-09-10 14:57:52 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-09-10 14:57:52 +0100
commit1c7c0dbb5174b5255912183b9ec5870ccdef3426 (patch)
tree50a5e492ac42221178320fb06a16c34e1b0a4bba
parent40cbef0c243042521bdf589b3c4549ff32508592 (diff)
printing client stats every other second only if the connection count has changed or when SIGUSR1 or SIGINFO recieved
-rw-r--r--cmd/dcat/main.go3
-rw-r--r--cmd/dgrep/main.go3
-rw-r--r--cmd/dmap/main.go3
-rw-r--r--cmd/drun/main.go3
-rw-r--r--cmd/dtail/main.go8
-rw-r--r--internal/clients/baseclient.go6
-rw-r--r--internal/clients/client.go2
-rw-r--r--internal/clients/maprclient.go4
-rw-r--r--internal/clients/stats.go20
-rw-r--r--internal/io/logger/logger.go3
-rw-r--r--internal/regex/flag.go42
-rw-r--r--internal/server/continuous.go2
-rw-r--r--internal/server/handlers/serverhandler.go5
-rw-r--r--internal/server/scheduler.go2
14 files changed, 79 insertions, 27 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index e0931d8..f0ea946 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
)
@@ -54,7 +55,7 @@ func main() {
panic(err)
}
- status := client.Start(ctx)
+ status := client.Start(ctx, signal.StatsCh(ctx))
logger.Flush()
os.Exit(status)
}
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index e58ea8f..d1fdc21 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
)
@@ -62,7 +63,7 @@ func main() {
panic(err)
}
- status := client.Start(ctx)
+ status := client.Start(ctx, signal.StatsCh(ctx))
logger.Flush()
os.Exit(status)
}
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index 470ce76..279b343 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
@@ -61,7 +62,7 @@ func main() {
panic(err)
}
- status := client.Start(ctx)
+ status := client.Start(ctx, signal.StatsCh(ctx))
logger.Flush()
os.Exit(status)
}
diff --git a/cmd/drun/main.go b/cmd/drun/main.go
index c948c0f..ffdf7bf 100644
--- a/cmd/drun/main.go
+++ b/cmd/drun/main.go
@@ -11,6 +11,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
)
@@ -63,7 +64,7 @@ func main() {
panic(err)
}
- status := client.Start(ctx)
+ status := client.Start(ctx, signal.StatsCh(ctx))
logger.Flush()
os.Exit(status)
}
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 1be2a29..ff9028b 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -14,6 +14,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
@@ -105,12 +106,7 @@ func main() {
}
}
- /*
- sigCh := make(chan os.Signal)
- signal.Notify(sigCh, os.Interrupt, syscall.SIGINFO)
- */
-
- status := client.Start(ctx)
+ status := client.Start(ctx, signal.StatsCh(ctx))
logger.Flush()
os.Exit(status)
}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index ba18f95..008a01e 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -66,11 +66,11 @@ func (c *baseClient) makeConnections(maker maker) {
c.stats = newTailStats(len(c.connections))
}
-func (c *baseClient) Start(ctx context.Context) (status int) {
+func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
go c.hostKeyCallback.PromptAddHosts(ctx)
- // Periodically print out connection stats to the client.
- go c.stats.periodicLogStats(ctx, c.throttleCh)
+ // Print client stats every time something on statsCh is recieved.
+ go c.stats.Start(ctx, c.throttleCh, statsCh)
// Keep count of active connections
active := make(chan struct{}, len(c.connections))
diff --git a/internal/clients/client.go b/internal/clients/client.go
index 1fc5e23..eb8452d 100644
--- a/internal/clients/client.go
+++ b/internal/clients/client.go
@@ -4,5 +4,5 @@ import "context"
// Client is the interface for the end user command line client.
type Client interface {
- Start(ctx context.Context) int
+ Start(ctx context.Context, statsCh <-chan struct{}) int
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index d154c9d..581db44 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -94,10 +94,10 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
}
// Start starts the mapreduce client.
-func (c *MaprClient) Start(ctx context.Context) (status int) {
+func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
go c.periodicReportResults(ctx)
- status = c.baseClient.Start(ctx)
+ status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
logger.Info("Received final mapreduce result")
c.reportResults()
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 481d157..a6ac0c5 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -5,6 +5,7 @@ import (
"fmt"
"runtime"
"sync"
+ "time"
"github.com/mimecast/dtail/internal/io/logger"
)
@@ -29,12 +30,18 @@ func newTailStats(connectionsTotal int) *stats {
}
}
-func (s *stats) logStatsOnSignal(ctx context.Context, throttleCh chan struct{}, sigCh chan struct{}) {
+// Start starts printing client connection stats every time a signal is recieved or
+// connection count has changed.
+func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) {
var connectedLast int
for {
+ var force bool
+
select {
- case <-sigCh:
+ case <-statsCh:
+ force = true
+ case <-time.After(time.Second * 2):
case <-ctx.Done():
return
}
@@ -43,13 +50,16 @@ func (s *stats) logStatsOnSignal(ctx context.Context, throttleCh chan struct{},
throttle := len(throttleCh)
newConnections := connected - connectedLast
- s.log(connected, newConnections, throttle)
- s.mutex.Lock()
- defer s.mutex.Unlock()
+ if connected == connectedLast && !force {
+ continue
+ }
+ s.log(connected, newConnections, throttle)
connectedLast = connected
+ s.mutex.Lock()
s.connected = connected
+ s.mutex.Unlock()
}
}
diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go
index 8bfb94f..d059cbb 100644
--- a/internal/io/logger/logger.go
+++ b/internal/io/logger/logger.go
@@ -223,9 +223,6 @@ func log(what string, severity string, args []interface{}) string {
if Mode.Nothing {
return ""
}
- if severity != errorStr && severity != fatalStr {
- return ""
- }
messages := []string{severity}
diff --git a/internal/regex/flag.go b/internal/regex/flag.go
new file mode 100644
index 0000000..d3ff712
--- /dev/null
+++ b/internal/regex/flag.go
@@ -0,0 +1,42 @@
+package regex
+
+import "fmt"
+
+type Flag int
+
+const (
+ // Undefined flag set
+ Undefined Flag = iota
+ // Default is the default regex mode (positive matching)
+ Default Flag = iota
+ // Invert inverts the regex
+ Invert Flag = iota
+ // Noop means no regex matching enabled, all defaults to true
+ Noop Flag = iota
+)
+
+func NewFlag(str string) (Flag, error) {
+ switch str {
+ case "default":
+ return Default, nil
+ case "invert":
+ return Invert, nil
+ case "noop":
+ return Noop, nil
+ default:
+ return Undefined, fmt.Errorf("unknown regex flag '%s', setting to 'undefined'", str)
+ }
+}
+
+func (f Flag) String() string {
+ switch f {
+ case Default:
+ return "default"
+ case Invert:
+ return "invert"
+ case Noop:
+ return "noop"
+ default:
+ return "undefined"
+ }
+}
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index f3993a1..583d136 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -92,7 +92,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
}
logger.Info(fmt.Sprintf("Starting job %s", job.Name))
- status := client.Start(jobCtx)
+ status := client.Start(jobCtx, make(chan struct{}))
logMessage := fmt.Sprintf("Job exited with status %d", status)
if status != 0 {
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 34d2e30..7017f3e 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -85,8 +85,11 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca
func (h *ServerHandler) Read(p []byte) (n int, err error) {
for {
select {
-
case message := <-h.serverMessages:
+ if len(message) == 0 {
+ logger.Warn(h.user, "Empty message recieved")
+ return
+ }
if message[0] == '.' {
// Handle hidden message (don't display to the user, interpreted by dtail client)
wholePayload := []byte(fmt.Sprintf("%s\n", message))
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index 3345d69..9d76a3b 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -93,7 +93,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
defer cancel()
logger.Info(fmt.Sprintf("Starting job %s", job.Name))
- status := client.Start(jobCtx)
+ status := client.Start(jobCtx, make(chan struct{}))
logMessage := fmt.Sprintf("Job exited with status %d", status)
if status != 0 {