diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-09 21:10:29 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 13:36:41 +0300 |
| commit | 97747ea0f3178f7f5890512d483fdccaa82846b0 (patch) | |
| tree | 9ff1335ca26afc90e55fd6de416457e252d75a35 | |
| parent | 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff) | |
vetting and linting and some code restyling
93 files changed, 670 insertions, 581 deletions
@@ -1,5 +1,5 @@ GO ?= go -all: build test +all: build build: dserver dcat dgrep dmap dtail dtailhealthcheck dserver: ifndef USE_ACL @@ -43,7 +43,7 @@ lint: find . -type d | while read dir; do \ echo golint $$dir; \ golint $$dir; \ - done + done | grep -F .go: test: ${GO} clean -testcache ifndef USE_ACL @@ -15,5 +15,5 @@ This is a loose list of what to do. Maybe for the next releae or maybe for a lat [ ] Manual test/adjust dtail colors [ ] More integration test colors (via dcat?) [ ] Integration test for dtail in serverless mode -[ ] Integration test for health check serverless mode -[ ] Rewrite + test health client (copy catclient) +[ ] Go through the whole source and change indentation (try not to exceed 80char line lengths by too much) +[ ] Fix the sync.Pools (they aren't concurrent as it seems and can cause a panic) diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index 662a50d..87ece9d 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -33,7 +33,8 @@ func main() { flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") - flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, + "How many connections established per CPU core concurrently") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index 529331d..576e22b 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -20,7 +20,6 @@ func main() { var args config.Args var displayVersion bool var grep string - userName := user.Name() flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors") @@ -29,7 +28,8 @@ func main() { flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") - flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, + "How many connections established per CPU core concurrently") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index acc1dc6..1f44076 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -19,11 +19,9 @@ import ( // The evil begins here. func main() { var displayVersion bool - args := config.Args{ Mode: omode.MapClient, } - userName := user.Name() flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors") @@ -31,7 +29,8 @@ func main() { flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode") flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys") flag.BoolVar(&displayVersion, "version", false, "Display version") - flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, + "How many connections established per CPU core concurrently") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go index b4fc873..cf726cf 100644 --- a/cmd/dserver/main.go +++ b/cmd/dserver/main.go @@ -32,7 +32,8 @@ func main() { user.NoRootCheck() flag.BoolVar(&color, "color", false, "Enable ANSII terminal colors") - flag.BoolVar(&config.ServerRelaxedAuthEnable, "relaxedAuth", false, "Enable relaxced SSH auth mode (don't use in production!)") + flag.BoolVar(&config.ServerRelaxedAuthEnable, "relaxedAuth", false, + "Enable relaxced SSH auth mode (don't use in production!)") flag.BoolVar(&displayVersion, "version", false, "Display version") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index adfeaa5..2863370 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -44,7 +44,8 @@ func main() { flag.BoolVar(&displayColorTable, "colorTable", false, "Show color table") flag.BoolVar(&displayWideColorTable, "wideColorTable", false, "Show a large color table") flag.BoolVar(&displayVersion, "version", false, "Display version") - flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, + "How many connections established per CPU core concurrently") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") @@ -93,7 +94,8 @@ func main() { dlog.Start(ctx, &wg, source.Client) if checkHealth { - fmt.Println("WARN: DTail health check has moved to separate binary dtailhealtcheck - please adjust the monitoring scripts!") + fmt.Println("WARN: DTail health check has moved to separate binary dtailhealtcheck" + + " - please adjust the monitoring scripts!") cancel() os.Exit(1) } diff --git a/cmd/dtailhealthcheck/main.go b/cmd/dtailhealthcheck/main.go index 0f37f8a..b0ba4cd 100644 --- a/cmd/dtailhealthcheck/main.go +++ b/cmd/dtailhealthcheck/main.go @@ -35,7 +35,6 @@ func main() { defer cancel() var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.HealthCheck) if pprof > -1 { diff --git a/integrationtests/commons.go b/integrationtests/commons.go index f96b532..2fdbfc3 100644 --- a/integrationtests/commons.go +++ b/integrationtests/commons.go @@ -15,10 +15,12 @@ import ( ) func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) (int, error) { - return runCommandContext(t, context.TODO(), cmd, args, stdoutFile) + return runCommandContext(context.TODO(), t, cmd, args, stdoutFile) } -func runCommandContext(t *testing.T, ctx context.Context, cmd string, args []string, stdoutFile string) (int, error) { +func runCommandContext(ctx context.Context, t *testing.T, cmd string, args []string, + stdoutFile string) (int, error) { + if _, err := os.Stat(cmd); err != nil { return -1, fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err) } @@ -76,7 +78,8 @@ func compareFilesContents(t *testing.T, fileA, fileB string) error { return fmt.Errorf("Files differ, line '%s' is missing in one of them", line) } if countA != countB { - return fmt.Errorf("Files differ, count of line '%s' is %d in one but %d in another", line, countA, countB) + return fmt.Errorf("Files differ, count of line '%s' is %d in one but %d in another", + line, countA, countB) } } return nil @@ -92,11 +95,13 @@ func compareFilesContents(t *testing.T, fileA, fileB string) error { } // The mapreduce result can be in a different order each time (Golang maps are not sorted). - t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", fileA, fileB)) + t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", + fileA, fileB)) if err := compareMaps(a, b); err != nil { return err } - t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", fileB, fileA)) + t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", + fileB, fileA)) if err := compareMaps(b, a); err != nil { return err } diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go index a164960..342ebd0 100644 --- a/integrationtests/dcat_test.go +++ b/integrationtests/dcat_test.go @@ -8,12 +8,12 @@ import ( func TestDCat(t *testing.T) { testdataFile := "dcat.txt.expected" stdoutFile := "dcat.out" + args := []string{"-spartan", testdataFile} - if _, err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dcat", args, stdoutFile); err != nil { t.Error(err) return } - if err := compareFiles(t, stdoutFile, testdataFile); err != nil { t.Error(err) return diff --git a/integrationtests/dgrep_test.go b/integrationtests/dgrep_test.go index 6a15ebd..4d54a2d 100644 --- a/integrationtests/dgrep_test.go +++ b/integrationtests/dgrep_test.go @@ -9,12 +9,12 @@ func TestDGrep(t *testing.T) { inFile := "mapr_testdata.log" stdoutFile := "dgrep.stdout.tmp" expectedStdoutFile := "dgrep.txt.expected" + args := []string{"-spartan", "--grep", "20211002-071947", inFile} - if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dgrep", args, stdoutFile); err != nil { t.Error(err) return } - if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { t.Error(err) return @@ -26,12 +26,12 @@ func TestDGrep2(t *testing.T) { inFile := "mapr_testdata.log" stdoutFile := "dgrep2.stdout.tmp" expectedStdoutFile := "dgrep2.txt.expected" + args := []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile} - if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dgrep", args, stdoutFile); err != nil { t.Error(err) return } - if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { t.Error(err) return diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index b512985..f5c78e0 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -14,9 +14,12 @@ func TestDMap(t *testing.T) { queryFile := fmt.Sprintf("%s.query", csvFile) expectedQueryFile := "dmap.csv.query.expected" - query := fmt.Sprintf("from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile %s", csvFile) + query := fmt.Sprintf("from STATS select count($line),last($time),"+ + "avg($goroutines),min(concurrentConnections),max(lifetimeConnections) "+ + "group by $hostname outfile %s", csvFile) + args := []string{"-query", query, inFile} - if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { t.Error(err) return } @@ -42,9 +45,48 @@ func TestDMap2(t *testing.T) { queryFile := fmt.Sprintf("%s.query", csvFile) expectedQueryFile := "dmap2.csv.query.expected" - query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile %s", csvFile) + query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ + "avg($goroutines),min($goroutines) group by $time order by count($time) "+ + "outfile %s", csvFile) - if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { + args := []string{"-query", query, inFile} + if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { + t.Error(err) + return + } + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(stdoutFile) + os.Remove(csvFile) + os.Remove(queryFile) +} + +func TestDMap3(t *testing.T) { + inFile := "mapr_testdata.log" + stdoutFile := "dmap3.stdout.tmp" + csvFile := "dmap3.csv.tmp" + expectedCsvFile := "dmap3.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap3.csv.query.expected" + + query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ + "avg($goroutines),min($goroutines) group by $time order by count($time) "+ + "outfile %s", csvFile) + + // Read many input files at once. + args := []string{"-query", query} + for i := 0; i < 100; i++ { + args = append(args, inFile) + } + + if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { t.Error(err) return } diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go index 9971f1a..36eadc0 100644 --- a/integrationtests/dtail_test.go +++ b/integrationtests/dtail_test.go @@ -8,8 +8,9 @@ import ( func TestDTailColorTable(t *testing.T) { stdoutFile := "dtailcolortable.stdout.tmp" expectedStdoutFile := "dtailcolortable.expected" + args := []string{"-colorTable"} - if _, err := runCommand(t, "../dtail", []string{"-colorTable"}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dtail", args, stdoutFile); err != nil { t.Error(err) return } @@ -17,6 +18,5 @@ func TestDTailColorTable(t *testing.T) { t.Error(err) return } - os.Remove(stdoutFile) } diff --git a/integrationtests/dtailhealthcheck_test.go b/integrationtests/dtailhealthcheck_test.go index 97fa5f2..d562239 100644 --- a/integrationtests/dtailhealthcheck_test.go +++ b/integrationtests/dtailhealthcheck_test.go @@ -30,9 +30,10 @@ func TestDTailHealthCheck(t *testing.T) { func TestDTailHealthCheck2(t *testing.T) { stdoutFile := "dtailhealthcheck2.stdout.tmp" expectedStdoutFile := "dtailhealthcheck2.expected" + args := []string{"--server", "example:1"} t.Log("Negative test, is supposed to exit with a critical state.") - exitCode, err := runCommand(t, "../dtailhealthcheck", []string{"--server", "example:1"}, stdoutFile) + exitCode, err := runCommand(t, "../dtailhealthcheck", args, stdoutFile) if exitCode != 2 { t.Error(fmt.Sprintf("Expected exit code '2' but got '%d': %v", exitCode, err)) return @@ -55,16 +56,17 @@ func TestDTailHealthCheck3(t *testing.T) { defer cancel() go func() { - serverArgs := []string{"--logger", "stdout", "--logLevel", "trace", "--port", "4242"} - runCommandContext(t, ctx, "../dserver", serverArgs, serverStdoutFile) + args := []string{"--logger", "stdout", "--logLevel", "trace", "--port", "4242"} + runCommandContext(ctx, t, "../dserver", args, serverStdoutFile) }() var err error + args := []string{"--server", "localhost:4242"} for i := 0; i < 30; i++ { t.Log("Waiting for dserver to start", i) time.Sleep(time.Second) var exitCode int - if exitCode, err = runCommand(t, "../dtailhealthcheck", []string{"--server", "localhost:4242"}, stdoutFile); exitCode == 0 { + if exitCode, err = runCommand(t, "../dtailhealthcheck", args, stdoutFile); exitCode == 0 { break } } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d5d7c2c..4a7bd84 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -64,7 +64,8 @@ func (c *baseClient) makeConnections(maker maker) { discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle) for _, server := range discoveryService.ServerList() { - c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) + c.connections = append(c.connections, c.makeConnection(server, + c.sshAuthMethods, c.hostKeyCallback)) } c.stats = newTailStats(len(c.connections)) @@ -100,7 +101,9 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { +func (c *baseClient) startConnection(ctx context.Context, i int, + conn connectors.Connector) (status int) { + for { connCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 2726e7e..bd65560 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -22,7 +22,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } - args.Mode = omode.CatClient c := CatClient{ @@ -35,7 +34,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { c.init() c.makeConnections(c) - return &c, nil } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 1666a79..2d7b45a 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -16,7 +16,8 @@ import ( "golang.org/x/crypto/ssh" ) -// ServerConnection represents a connection to a single remote dtail server via SSH protocol. +// ServerConnection represents a connection to a single remote dtail server via +// SSH protocol. type ServerConnection struct { server string port int @@ -28,9 +29,11 @@ type ServerConnection struct { } // NewServerConnection returns a new DTail SSH server connection. -func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - dlog.Client.Debug(server, "Creating new connection", server, handler, commands) +func NewServerConnection(server string, userName string, + authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, + handler handlers.Handler, commands []string) *ServerConnection { + dlog.Client.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, @@ -48,10 +51,12 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM return &c } +// Server returns the server hostname connected to. func (c *ServerConnection) Server() string { return c.server } +// Handler returns the handler used for the connection. func (c *ServerConnection) Handler() handlers.Handler { return c.handler } @@ -72,23 +77,29 @@ func (c *ServerConnection) initServerPort() { } } -func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the connection to the server. +func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + // Throttle how many connections can be established concurrently (based on ch length) dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) select { case throttleCh <- struct{}{}: case <-ctx.Done(): - dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Not establishing connection as context is done", + len(throttleCh), cap(throttleCh)) return } - dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Throttling says that the connection can be established", + len(throttleCh), cap(throttleCh)) go func() { defer func() { if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (1)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } @@ -107,7 +118,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, } // Dail into a new SSH connection. Close connection in case of an error. -func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { +func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) error { + dlog.Client.Debug(c.server, "Incrementing connection stats") statsCh <- struct{}{} defer func() { @@ -128,31 +141,30 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, } // Create the SSH session. Close the session in case of an error. -func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating SSH session") +func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, + client *ssh.Client, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { return err } defer session.Close() - return c.handle(ctx, cancel, session, throttleCh) } -func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating handler for SSH session") +func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, + session *ssh.Session, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { return err } - stdoutPipe, err := session.StdoutPipe() if err != nil { return err } - if err := session.Shell(); err != nil { return err } @@ -161,12 +173,10 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc io.Copy(stdinPipe, c.handler) cancel() }() - go func() { io.Copy(c.handler, stdoutPipe) cancel() }() - go func() { select { case <-c.handler.Done(): @@ -182,13 +192,13 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc } if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (2)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } <-ctx.Done() c.handler.Shutdown() - return nil } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 768a5ce..2ff490a 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -18,8 +18,10 @@ type Serverless struct { userName string } -// NewServerConnection returns a new connection. -func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { +// NewServerless starts a new serverless session. +func NewServerless(userName string, handler handlers.Handler, + commands []string) *Serverless { + dlog.Client.Debug("Creating new serverless connector", handler, commands) return &Serverless{ userName: userName, @@ -28,15 +30,20 @@ func NewServerless(userName string, handler handlers.Handler, commands []string) } } +// Server returns serverless server indicator. func (s *Serverless) Server() string { return "local(serverless)" } +// Handler returns the handler used for the serverless connection. func (s *Serverless) Handler() handlers.Handler { return s.handler } -func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the serverless connection. +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -81,13 +88,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() - go func() { io.Copy(s.handler, serverHandler) dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() - go func() { select { case <-s.handler.Done(): @@ -107,6 +112,5 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro <-ctx.Done() dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() - return nil } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index ae21ff2..7521c67 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -12,7 +12,8 @@ import ( "github.com/mimecast/dtail/internal/omode" ) -// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. +// GrepClient searches a remote file for all lines matching a regular +// expression. Only the matching lines are displayed. type GrepClient struct { baseClient } @@ -34,7 +35,6 @@ func NewGrepClient(args config.Args) (*GrepClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -51,6 +51,5 @@ func (c GrepClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - return } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 10ba1f7..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,7 +8,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler is the handler used on the client side for running mapreduce aggregations. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { baseHandler } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d1acfbd..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -10,7 +10,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate @@ -18,7 +19,9 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index ac1dc20..1a02827 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -32,7 +32,6 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { c.init() c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) - return &c, nil } @@ -45,28 +44,34 @@ func (c HealthClient) makeCommands() (commands []string) { return } +// Start the health client. func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int { status := c.baseClient.Start(ctx, statsCh) switch status { case 0: if c.Serverless { - fmt.Printf("WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port\n") + fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" + + ", please specify a remote server via --server hostname:port\n") return 1 } fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr) case 2: if c.Serverless { - fmt.Printf("CRITICAL: DTail server not operating properly (using serverless connction)!\n") + fmt.Printf("CRITICAL: DTail server not operating properly (using " + + "serverless connction)!\n") return 2 } - fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", c.ServersStr) + fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", + c.ServersStr) default: if c.Serverless { - fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless connection)\n", status) + fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+ + "connection)\n", status) return status } - fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", status, c.ServersStr) + fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", + status, c.ServersStr) } return status diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 412a219..04f258d 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -107,15 +107,14 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -// NEXT: Make this a callback function rather trying to use polymorphism to call this. -// This applies to all clients. +// NEXT: Make this a callback function rather trying to use polymorphism to call +// this. This applies to all clients. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" @@ -134,7 +133,6 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, c.Args.SerializeOptions(), file, regex)) } - return } @@ -155,7 +153,6 @@ func (c *MaprClient) reportResults() { c.writeResultsToOutfile() return } - c.printResults() } @@ -176,7 +173,6 @@ func (c *MaprClient) printResults() { } else { result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } - if err != nil { dlog.Client.FatalPanic(err) } @@ -202,8 +198,8 @@ func (c *MaprClient) printResults() { dlog.Client.Raw(rawQuery) if rowsLimit > 0 && numRows > rowsLimit { - dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output to %d rows! Use 'limit' clause to override!", - numRows, rowsLimit)) + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+ + "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } dlog.Client.Raw(result) } @@ -215,7 +211,6 @@ func (c *MaprClient) writeResultsToOutfile() { } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { dlog.Client.FatalPanic(err) } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index fbef572..1315aea 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -36,9 +36,10 @@ func newTailStats(servers int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { - var connectedLast int +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, + statsCh <-chan string, quiet bool) { + var connectedLast int for { var force bool var messages []string @@ -94,7 +95,9 @@ func (s *stats) printStatsDueInterrupt(messages []string) { dlog.Client.Resume() } -func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} { +func (s *stats) statsData(connected, newConnections int, + throttle int) map[string]interface{} { + percConnected := percentOf(float64(s.servers), float64(connected)) data := make(map[string]interface{}) @@ -112,7 +115,6 @@ func (s *stats) statsData(connected, newConnections int, throttle int) map[strin func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb := strings.Builder{} - i := 0 for k, v := range s.statsData(connected, newConnections, throttle) { if i > 0 { @@ -123,7 +125,6 @@ func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb.WriteString(fmt.Sprintf("%v", v)) i++ } - return sb.String() } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index d42a0e4..35c01d4 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -19,7 +19,6 @@ type TailClient struct { // NewTailClient returns a new TailClient. func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient - c := TailClient{ baseClient: baseClient{ Args: args, @@ -30,7 +29,6 @@ func NewTailClient(args config.Args) (*TailClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -48,6 +46,5 @@ func (c TailClient) makeCommands() (commands []string) { c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } dlog.Client.Debug(commands) - return } diff --git a/internal/color/brush/brush.go b/internal/color/brush/brush.go index 82fa410..63d63d8 100644 --- a/internal/color/brush/brush.go +++ b/internal/color/brush/brush.go @@ -32,7 +32,6 @@ func paintSeverity(sb *strings.Builder, text string) bool { default: return false } - return true } @@ -87,9 +86,9 @@ func paintRemote(sb *strings.Builder, line string) { config.Client.TermColors.Remote.DelimiterAttr) color.PaintWithAttr(sb, splitted[4], - config.Client.TermColors.Remote.IdFg, - config.Client.TermColors.Remote.IdBg, - config.Client.TermColors.Remote.IdAttr) + config.Client.TermColors.Remote.IDFg, + config.Client.TermColors.Remote.IDBg, + config.Client.TermColors.Remote.IDAttr) color.PaintWithAttr(sb, protocol.FieldDelimiter, config.Client.TermColors.Remote.DelimiterFg, config.Client.TermColors.Remote.DelimiterBg, @@ -191,6 +190,5 @@ func Colorfy(line string) string { color.BgDefault, color.AttrNone) } - return sb.String() } diff --git a/internal/color/color.go b/internal/color/color.go index 5c25855..9d0bc2e 100644 --- a/internal/color/color.go +++ b/internal/color/color.go @@ -52,15 +52,19 @@ const ( AttrHidden Attribute = escape + "[8m" ) +// ColorNames is the list of all supported terminal colors. var ColorNames = []string{ "Black", "Red", "Green", "Yellow", "Blue", "Magenta", "Cyan", "White", "Default", } +// AttributeNames is the list of all supported terminal text attributes. var AttributeNames = []string{ - "Bold", "Dim", "Italic", "Underline", "Blink", "SlowBlink", "RapidBlink", "Reverse", "Hidden", "None", + "Bold", "Dim", "Italic", "Underline", "Blink", "SlowBlink", "RapidBlink", + "Reverse", "Hidden", "None", } -// ToFgColor converts a given string (e.g. from a config file) into a foreground color code. +// ToFgColor converts a given string (e.g. from a config file) into a foreground +// color code. func ToFgColor(s string) (FgColor, error) { switch strings.ToLower(s) { case "black": @@ -86,7 +90,8 @@ func ToFgColor(s string) (FgColor, error) { } } -// ToBgColor converts a given string (e.g. from a config file) into a background color code. +// ToBgColor converts a given string (e.g. from a config file) into a background +// color code. func ToBgColor(s string) (BgColor, error) { switch strings.ToLower(s) { case "black": diff --git a/internal/color/paint.go b/internal/color/paint.go index 53c9abb..7735d87 100644 --- a/internal/color/paint.go +++ b/internal/color/paint.go @@ -10,12 +10,14 @@ func PaintStr(text string, fg FgColor, bg BgColor) string { return fmt.Sprintf("%s%s%s%s%s", fg, bg, text, BgDefault, FgDefault) } -// PaintStrWithAttr paints a given text in a given foreground/background/attribute combination +// PaintStrWithAttr paints a given text in a given foreground/background/attribute +// combination func PaintStrWithAttr(text string, fg FgColor, bg BgColor, attr Attribute) string { if attr == AttrNone { return PaintStr(text, fg, bg) } - return fmt.Sprintf("%s%s%s%s%s%s%s", fg, bg, attr, text, AttrReset, BgDefault, FgDefault) + return fmt.Sprintf("%s%s%s%s%s%s%s", fg, bg, attr, text, AttrReset, + BgDefault, FgDefault) } // PaintStrFg paints a given text in a given foreground color. @@ -48,8 +50,11 @@ func Reset(sb *strings.Builder) { sb.WriteString(string(FgDefault)) } -// PaintWithAttr starts painting a given text in a given foreground/background/attribute combination. -func PaintWithAttr(sb *strings.Builder, text string, fg FgColor, bg BgColor, attr Attribute) { +// PaintWithAttr starts painting a given text in a given foreground/background/ +// attribute combination. +func PaintWithAttr(sb *strings.Builder, text string, fg FgColor, bg BgColor, + attr Attribute) { + if attr == AttrNone { Paint(sb, text, fg, bg) return @@ -63,8 +68,10 @@ func PaintWithAttr(sb *strings.Builder, text string, fg FgColor, bg BgColor, att sb.WriteString(string(FgDefault)) } -// PaintWithAttrs is similar to PaintWithAttr, but it takes multiple text attributes. -func PaintWithAttrs(sb *strings.Builder, text string, fg FgColor, bg BgColor, attrs []Attribute) { +// PaintWithAttrs is similar to PaintWithAttr, but it takes multiple attributes. +func PaintWithAttrs(sb *strings.Builder, text string, fg FgColor, bg BgColor, + attrs []Attribute) { + sb.WriteString(string(fg)) sb.WriteString(string(bg)) for _, attr := range attrs { diff --git a/internal/color/table.go b/internal/color/table.go index 2115edf..e0e4946 100644 --- a/internal/color/table.go +++ b/internal/color/table.go @@ -5,8 +5,19 @@ import ( "os" ) -const sampleParagraph string = "Mimecast is Making Email Safer for Business. We believe that securely operating a business in the cloud requires new levels of IT preparedness, centered around cyber resilience. This is why we unify the delivery and management of security, continuity and data protection for email via one, simple-to-use cloud platform. Thousands of organizations trust us to increase their cyber resilience preparedness, streamline compliance, reduce IT complexity and keep their business running. We give employees fast and secure access to sensitive business information, and ensure email keeps flowing in the event of an outage. Mimecast will remain committed to protecting your IT assets through constant innovation and focus on your success." +const sampleParagraph string = "Mimecast is Making Email Safer for Business. " + + "We believe that securely operating a business in the cloud requires new " + + "levels of IT preparedness, centered around cyber resilience. This is why " + + "we unify the delivery and management of security, continuity and data " + + "protection for email via one, simple-to-use cloud platform. Thousands of " + + "organizations trust us to increase their cyber resilience preparedness, " + + "streamline compliance, reduce IT complexity and keep their business running. " + + "We give employees fast and secure access to sensitive business information, " + + "and ensure email keeps flowing in the event of an outage. Mimecast will " + + "remain committed to protecting your IT assets through constant innovation " + + "and focus on your success." +// TablePrintAndExit prints the color table and then exits the process. func TablePrintAndExit(displaySampleParagraph bool) { for _, attr := range AttributeNames { if attr == "Hidden" || attr == "SlowBlink" { @@ -24,11 +35,13 @@ func printColorTable(attr string, displaySampleParagraph bool) { if fg == bg { continue } + bgColor, _ := ToBgColor(bg) attribute, _ := ToAttribute(attr) - - text := fmt.Sprintf(" Foreground:%10s | Background:%10s | Attribute:%10s ", fg, bg, attr) + text := fmt.Sprintf(" Foreground:%10s | Background:%10s | Attribute:%10s ", + fg, bg, attr) fmt.Print(PaintStrWithAttr(text, fgColor, bgColor, attribute)) + if displaySampleParagraph { fmt.Print("\n") fmt.Print(PaintStrWithAttr(sampleParagraph, fgColor, bgColor, attribute)) diff --git a/internal/config/args.go b/internal/config/args.go index a671ae3..f721390 100644 --- a/internal/config/args.go +++ b/internal/config/args.go @@ -74,13 +74,13 @@ func (a *Args) String() string { // SerializeOptions returns a string ready to be sent over the wire to the server. func (a *Args) SerializeOptions() string { - return fmt.Sprintf("quiet=%v:spartan=%v:serverless=%v", a.Quiet, a.Spartan, a.Serverless) + return fmt.Sprintf("quiet=%v:spartan=%v:serverless=%v", a.Quiet, a.Spartan, + a.Serverless) } // DeserializeOptions deserializes the options, but into a map. func DeserializeOptions(opts []string) (map[string]string, error) { options := make(map[string]string, len(opts)) - for _, o := range opts { kv := strings.SplitN(o, "=", 2) if len(kv) != 2 { @@ -97,9 +97,7 @@ func DeserializeOptions(opts []string) (map[string]string, error) { } val = string(decoded) } - options[key] = val } - return options, nil } diff --git a/internal/config/client.go b/internal/config/client.go index ecd05c5..8227c68 100644 --- a/internal/config/client.go +++ b/internal/config/client.go @@ -15,9 +15,9 @@ type remoteTermColors struct { HostnameAttr color.Attribute HostnameBg color.BgColor HostnameFg color.FgColor - IdAttr color.Attribute - IdBg color.BgColor - IdFg color.FgColor + IDAttr color.Attribute + IDBg color.BgColor + IDFg color.FgColor StatsOkAttr color.Attribute StatsOkBg color.BgColor StatsOkFg color.FgColor @@ -124,9 +124,9 @@ func newDefaultClientConfig() *ClientConfig { HostnameAttr: color.AttrBold, HostnameBg: color.BgBlue, HostnameFg: color.FgWhite, - IdAttr: color.AttrDim, - IdBg: color.BgBlue, - IdFg: color.FgWhite, + IDAttr: color.AttrDim, + IDBg: color.BgBlue, + IDFg: color.FgWhite, StatsOkAttr: color.AttrNone, StatsOkBg: color.BgGreen, StatsOkFg: color.FgBlack, diff --git a/internal/config/config.go b/internal/config/config.go index 077a658..b99b22b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,11 +9,11 @@ const ( ScheduleUser string = "DTAIL-SCHEDULE" // ContinuousUser is used for non-interactive continuous mapreduce queries. ContinuousUser string = "DTAIL-CONTINUOUS" - // InterruptTimeoutS is used to terminate DTail when Ctrl+C was pressed twice within a given interval. + // InterruptTimeoutS specifies the Ctrl+C log pause interval. InterruptTimeoutS int = 3 - // ConnectionsPerCPU controls how many connections are established concurrently as a start (slow start) + // DefaultConnectionsPerCPU controls how many connections are established concurrently. DefaultConnectionsPerCPU int = 10 - // DTailSSHServerDefaultPort is the default DServer port. + // DefaultSSHPort is the default DServer port. DefaultSSHPort int = 2222 // DefaultLogLevel specifies the default log level (obviously) DefaultLogLevel string = "INFO" diff --git a/internal/config/initializer.go b/internal/config/initializer.go index e4cbeaf..0a913db 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -20,13 +20,13 @@ type initializer struct { type transformCb func(*initializer, *Args, []string) error -func (c *initializer) parseConfig(args *Args) error { +func (in *initializer) parseConfig(args *Args) error { if strings.ToUpper(args.ConfigFile) == "NONE" { return nil } if args.ConfigFile != "" { - return c.parseSpecificConfig(args.ConfigFile) + return in.parseSpecificConfig(args.ConfigFile) } if homeDir, err := os.UserHomeDir(); err != nil { @@ -35,7 +35,7 @@ func (c *initializer) parseConfig(args *Args) error { paths = append(paths, fmt.Sprintf("%s/.dtail.conf", homeDir)) for _, configPath := range paths { if _, err := os.Stat(configPath); !os.IsNotExist(err) { - c.parseSpecificConfig(configPath) + in.parseSpecificConfig(configPath) } } } @@ -43,7 +43,7 @@ func (c *initializer) parseConfig(args *Args) error { return nil } -func (c *initializer) parseSpecificConfig(configFile string) error { +func (in *initializer) parseSpecificConfig(configFile string) error { fd, err := os.Open(configFile) if err != nil { return fmt.Errorf("Unable to read config file: %v", err) @@ -55,68 +55,74 @@ func (c *initializer) parseSpecificConfig(configFile string) error { return fmt.Errorf("Unable to read config file %s: %v", configFile, err) } - if err := json.Unmarshal([]byte(cfgBytes), c); err != nil { + if err := json.Unmarshal([]byte(cfgBytes), in); err != nil { return fmt.Errorf("Unable to parse config file %s: %v", configFile, err) } return nil } -func (i *initializer) transformConfig(sourceProcess source.Source, args *Args, additionalArgs []string) error { +func (in *initializer) transformConfig(sourceProcess source.Source, args *Args, + additionalArgs []string) error { switch sourceProcess { case source.Server: - return i.optimusPrime(transformServer, args, additionalArgs) + return in.optimusPrime(transformServer, args, additionalArgs) case source.Client: - return i.optimusPrime(transformClient, args, additionalArgs) + return in.optimusPrime(transformClient, args, additionalArgs) case source.HealthCheck: - return i.optimusPrime(transformHealthCheck, args, additionalArgs) + return in.optimusPrime(transformHealthCheck, args, additionalArgs) default: - return fmt.Errorf("Unable to transform config, unknown source '%s'", sourceProcess) + return fmt.Errorf("Unable to transform config, unknown source '%s'", + sourceProcess) } } -func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalArgs []string) error { +func (in *initializer) optimusPrime(sourceCb transformCb, args *Args, + additionalArgs []string) error { + // Copy args to config objects. + // NEXT: Maybe unify args and config structs? if args.SSHPort != DefaultSSHPort { - i.Common.SSHPort = args.SSHPort + in.Common.SSHPort = args.SSHPort } if args.LogLevel != DefaultLogLevel { - i.Common.LogLevel = args.LogLevel + in.Common.LogLevel = args.LogLevel } if args.NoColor { - i.Client.TermColorsEnable = false + in.Client.TermColorsEnable = false } if args.LogDir != "" { - i.Common.LogDir = args.LogDir + in.Common.LogDir = args.LogDir } if args.Logger != "" { - i.Common.Logger = args.Logger + in.Common.Logger = args.Logger } if args.ConnectionsPerCPU == 0 { args.ConnectionsPerCPU = DefaultConnectionsPerCPU } // Setup log directory. - if strings.Contains(i.Common.LogDir, "~/") { + if strings.Contains(in.Common.LogDir, "~/") { homeDir, err := os.UserHomeDir() if err != nil { panic(err) } - i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir)) + in.Common.LogDir = strings.ReplaceAll(in.Common.LogDir, "~/", + fmt.Sprintf("%s/", homeDir)) } // Source type specific transormations. - sourceCb(i, args, additionalArgs) + sourceCb(in, args, additionalArgs) // Spartan mode. if args.Spartan { args.Quiet = true args.NoColor = true - i.Client.TermColorsEnable = false + in.Client.TermColorsEnable = false if args.LogLevel == "" { args.LogLevel = "ERROR" - i.Common.LogLevel = "ERROR" + in.Common.LogLevel = "ERROR" } } // Interpret additional args as file list or as query. @@ -135,29 +141,28 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA return nil } -func transformClient(i *initializer, args *Args, additionalArgs []string) error { +func transformClient(in *initializer, args *Args, additionalArgs []string) error { // Serverless mode. if args.Discovery == "" && (args.ServersStr == "" || strings.ToLower(args.ServersStr) == "serverless") { // We are not connecting to any servers. args.Serverless = true - i.Common.LogLevel = "warn" + in.Common.LogLevel = "warn" } - return nil } -func transformServer(i *initializer, args *Args, additionalArgs []string) error { +func transformServer(in *initializer, args *Args, additionalArgs []string) error { return nil } -func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error { +func transformHealthCheck(in *initializer, args *Args, additionalArgs []string) error { // Serverless mode. if args.Discovery == "" && (args.ServersStr == "" || strings.ToLower(args.ServersStr) == "serverless") { // We are not connecting to any servers. args.Serverless = true - i.Common.LogLevel = "warn" + in.Common.LogLevel = "warn" } args.TrustAllHosts = true return nil diff --git a/internal/config/server.go b/internal/config/server.go index 8bbb394..677f5ac 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -4,8 +4,8 @@ import ( "errors" ) -// Permissions map. Each SSH user has a list of permissions which -// log files it is allowed to follow and which ones not. +// Permissions map. Each SSH user has a list of permissions which log files it +// is allowed to follow and which ones not. type Permissions struct { // The default user permissions. Default []string @@ -68,7 +68,6 @@ var ServerRelaxedAuthEnable bool func newDefaultServerConfig() *ServerConfig { defaultPermissions := []string{"^/.*"} defaultBindAddress := "0.0.0.0" - return &ServerConfig{ SSHBindAddress: defaultBindAddress, MaxConnections: 10, @@ -89,10 +88,8 @@ func ServerUserPermissions(userName string) (permissions []string, err error) { if p, ok := Server.Permissions.Users[userName]; ok { permissions = p } - if len(permissions) == 0 { err = errors.New("Empty set of permission, user won't be able to open any files") } - return } diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 83ee95e..8bb1e85 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -73,7 +73,6 @@ func (d *Discovery) initRegex() { regexStr := string(runes) dlog.Common.Debug("Using filter regex", regexStr) - regex, err := regexp.Compile(regexStr) if err != nil { dlog.Common.FatalPanic("Could not compile regex", regexStr, err) @@ -90,9 +89,7 @@ func (d *Discovery) ServerList() []string { if d.regex != nil { servers = d.filterList(servers) } - servers = d.dedupList(servers) - if d.order == Shuffle { servers = d.shuffleList(servers) } @@ -105,12 +102,10 @@ func (d *Discovery) serverListFromModule() []string { if d.module != "" { return d.serverListFromReflectedModule() } - if _, err := os.Stat(d.server); err == nil { // Appears to be a file name, now try to read from that file. return d.ServerListFromFILE() } - // Appears to be a list of FQDNs (or a single FQDN) return d.ServerListFromCOMMA() } @@ -120,18 +115,16 @@ func (d *Discovery) serverListFromModule() []string { // Discovery. Whereas MODULENAME must be a upeprcase string. func (d *Discovery) serverListFromReflectedModule() []string { methodName := fmt.Sprintf("ServerListFrom%s", d.module) - + // Now we are reflecting the serve discovery function by it's name. rt := reflect.TypeOf(d) reflectedMethod, ok := rt.MethodByName(methodName) if !ok { dlog.Common.FatalPanic("No such server discovery module", d.module, methodName) } - inputValues := make([]reflect.Value, 1) // Thist input value is method receiver. inputValues[0] = reflect.ValueOf(d) returnValues := reflectedMethod.Func.Call(inputValues) - // First return value is server list. return returnValues[0].Interface().([]string) } @@ -139,27 +132,23 @@ func (d *Discovery) serverListFromReflectedModule() []string { // Filter server list based on a regexp. func (d *Discovery) filterList(servers []string) (filtered []string) { dlog.Common.Debug("Filtering server list") - for _, server := range servers { if d.regex.MatchString(server) { filtered = append(filtered, server) } } - return } // Deduplicate the server list. func (d *Discovery) dedupList(servers []string) (deduped []string) { serverMap := make(map[string]struct{}, len(servers)) - for _, server := range servers { if _, ok := serverMap[server]; !ok { serverMap[server] = struct{}{} deduped = append(deduped, server) } } - dlog.Common.Debug("Deduped server list", len(servers), len(deduped)) return } diff --git a/internal/done.go b/internal/done.go index 5ea22a0..94f9289 100644 --- a/internal/done.go +++ b/internal/done.go @@ -35,7 +35,6 @@ func (d *Done) Done() <-chan struct{} { func (d *Done) Shutdown() { d.mutex.Lock() defer d.mutex.Unlock() - select { case <-d.ch: return diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 28e6882..da67585 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -82,7 +82,7 @@ func new(sourceProcess, sourcePackage source.Source) *DLog { if err != nil { panic(err) } - strategy := loggers.GetStrategy(config.Common.LogStrategy) + strategy := loggers.NewStrategy(config.Common.LogStrategy) loggerName := config.Common.Logger level := newLevel(config.Common.LogLevel) @@ -153,6 +153,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { } } +// FatalPanic terminates the process with a fatal error. func (d *DLog) FatalPanic(args ...interface{}) { d.log(Fatal, args) d.Flush() @@ -162,42 +163,51 @@ func (d *DLog) FatalPanic(args ...interface{}) { panic(sb.String()) } +// Fatal logs a fatal error. func (d *DLog) Fatal(args ...interface{}) string { return d.log(Fatal, args) } +// Error logging. func (d *DLog) Error(args ...interface{}) string { return d.log(Error, args) } +// Warn logs a warning message. func (d *DLog) Warn(args ...interface{}) string { return d.log(Warn, args) } +// Info logging. func (d *DLog) Info(args ...interface{}) string { return d.log(Info, args) } +// Verbose logging. func (d *DLog) Verbose(args ...interface{}) string { return d.log(Verbose, args) } +// Debug logging. func (d *DLog) Debug(args ...interface{}) string { return d.log(Debug, args) } +// Trace logging. func (d *DLog) Trace(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Trace, args) } +// Devel used for development purpose only logging (e.g. "print" debugging). func (d *DLog) Devel(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Devel, args) } +// Raw message logging. func (d *DLog) Raw(message string) string { if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { d.logger.Log(time.Now(), message) @@ -207,6 +217,7 @@ func (d *DLog) Raw(message string) string { return message } +// Mapreduce logging. func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args := make([]interface{}, len(data)+1) @@ -251,6 +262,11 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { return d.log(Info, args) } -func (d *DLog) Flush() { d.logger.Flush() } -func (d *DLog) Pause() { d.logger.Pause() } +// Flush the log buffers. +func (d *DLog) Flush() { d.logger.Flush() } + +// Pause the logging. +func (d *DLog) Pause() { d.logger.Pause() } + +// Resume the logging. func (d *DLog) Resume() { d.logger.Resume() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 248ad83..0971094 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -7,6 +7,7 @@ import ( type level int +// Available log levels. const ( Fatal level = iota Error level = iota @@ -20,7 +21,8 @@ const ( All level = iota ) -var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All} +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, + Devel, Trace, All} func newLevel(l string) level { switch strings.ToLower(l) { @@ -73,6 +75,5 @@ func (l level) String() string { case All: return "ALL" } - panic("Unknown log level " + fmt.Sprintf("%d", l)) } diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index dda3ee6..415d7fb 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -9,11 +9,13 @@ import ( var factoryMap map[string]Logger var factoryMutex sync.Mutex +// Factory is there to retrieve a logger based on various settings. func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() - id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName) + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, + rotationStrategy.FileBase, loggerName) if factoryMap == nil { factoryMap = make(map[string]Logger) } @@ -36,10 +38,10 @@ func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) } } - return singleton } +// FactoryRotate invokes a log rotation of all loggers. func FactoryRotate() { factoryMutex.Lock() defer factoryMutex.Unlock() diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 87280fd..94824fe 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -12,8 +12,7 @@ import ( "github.com/mimecast/dtail/internal/config" ) -type fileWriter struct { -} +type fileWriter struct{} type fileMessageBuf struct { now time.Time @@ -35,7 +34,7 @@ type file struct { } func newFile(strategy Strategy) *file { - f := file{ + return &file{ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), @@ -43,16 +42,17 @@ func newFile(strategy Strategy) *file { flushCh: make(chan struct{}), strategy: strategy, } - - return &f } func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { f.mutex.Lock() - defer f.mutex.Unlock() + defer func() { + f.started = true + f.mutex.Unlock() + }() - // Logger already started from another Goroutine. if f.started { + // Logger already started from another Goroutine. wg.Done() return } @@ -68,7 +68,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { go func() { defer wg.Done() - for { select { case m := <-f.bufferCh: @@ -84,8 +83,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { } } }() - - f.started = true } func (f *file) Log(now time.Time, message string) { diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go index c88900d..d4e85de 100644 --- a/internal/io/dlog/loggers/logger.go +++ b/internal/io/dlog/loggers/logger.go @@ -6,6 +6,7 @@ import ( "time" ) +// Logger is there to plug in your own log implementation. type Logger interface { Log(now time.Time, message string) LogWithColors(now time.Time, message, messageWithColors string) diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go index a1b9355..25d10f0 100644 --- a/internal/io/dlog/loggers/strategy.go +++ b/internal/io/dlog/loggers/strategy.go @@ -5,19 +5,26 @@ import ( "path/filepath" ) +// Rotation is the actual strategy used for log rotation.. type Rotation int const ( - DailyRotation Rotation = iota + // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP. + DailyRotation Rotation = iota + // SignalRotation tells DTail to rotate its logs only on SIGHUP. SignalRotation Rotation = iota ) +// Strategy is a pair of the rotation and the file base. type Strategy struct { + // Rotation is the actual rotation strategy used. Rotation Rotation + // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used. FileBase string } -func GetStrategy(name string) Strategy { +// NewStrategy returns the stratey based on its name. +func NewStrategy(name string) Strategy { switch name { case "daily": return Strategy{DailyRotation, ""} diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index 7f387bc..01c15ba 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 0774837..7773142 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -7,7 +7,8 @@ import ( "github.com/mimecast/dtail/internal/regex" ) -// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. +// FileReader is the interface used on the dtail server to read/cat/grep/mapr... +// a file. type FileReader interface { Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error FilePath() string diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go index 7d2d7ca..904b90f 100644 --- a/internal/io/fs/permissions/permission_linuxacl.go +++ b/internal/io/fs/permissions/permission_linuxacl.go @@ -13,7 +13,7 @@ import ( "unsafe" ) -// ToRead checks whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index f128c07..92f85b6 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -42,7 +42,8 @@ type readFile struct { // String returns the string representation of the readFile func (f readFile) String() string { - return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", + return fmt.Sprintf( + "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", f.filePath, f.globID, f.retry, @@ -61,7 +62,9 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lines chan<- line.Line, + re regex.Regex) error { + dlog.Common.Debug("readFile", f) defer func() { select { @@ -74,7 +77,8 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re case f.limiter <- struct{}{}: default: select { - case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, + "Server limit reached. Queuing file..."): case <-ctx.Done(): return nil } @@ -139,13 +143,11 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { default: reader = bufio.NewReader(fd) } - return } func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 - reader, err := f.makeReader(fd) if err != nil { return err @@ -193,7 +195,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu default: if message.Len() >= lineLengthThreshold { if !warnedAboutLongLine { - f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") warnedAboutLongLine = true } message.WriteString("\n") @@ -210,9 +213,10 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() +func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + defer wg.Done() for { select { case line, ok := <-rawLines: @@ -231,9 +235,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha } } -func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { - var read line.Line +func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, + re regex.Regex) (line.Line, bool) { + var read line.Line if !re.Match(lineBytes.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() @@ -254,7 +259,6 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, r Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), } - return read, true } @@ -267,7 +271,6 @@ func (f readFile) truncated(fd *os.File) (bool, error) { if err != nil { return true, err } - // Can not open file at original path. pathFd, err := os.Open(f.filePath) if err != nil { @@ -280,10 +283,8 @@ func (f readFile) truncated(fd *os.File) (bool, error) { if err != nil { return true, err } - if curPos > pathPos { return true, errors.New("File got truncated") } - return false, nil } diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 14994e5..b03b45d 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go index c9dc221..89fcf81 100644 --- a/internal/io/pool/builder.go +++ b/internal/io/pool/builder.go @@ -5,6 +5,8 @@ import ( "sync" ) +// BuilderBuffer is there to optimize memory allocations (DTail allocates a lot +// of memory while reading log data otherwise) var BuilderBuffer = sync.Pool{ New: func() interface{} { sb := strings.Builder{} @@ -12,6 +14,7 @@ var BuilderBuffer = sync.Pool{ }, } +// RecycleBuilderBuffer recycles the buffer again. func RecycleBuilderBuffer(sb *strings.Builder) { sb.Reset() BuilderBuffer.Put(sb) diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go index 0a159f5..3d48f2c 100644 --- a/internal/io/pool/bytesbuffer.go +++ b/internal/io/pool/bytesbuffer.go @@ -5,6 +5,8 @@ import ( "sync" ) +// BytesBuffer is there to optimize memory allocations. DTail otherwise allocates +// a lot of memory while reading logs. var BytesBuffer = sync.Pool{ New: func() interface{} { b := bytes.Buffer{} @@ -13,6 +15,7 @@ var BytesBuffer = sync.Pool{ }, } +// RecycleBytesBuffer recycles the buffer again. func RecycleBytesBuffer(b *bytes.Buffer) { b.Reset() BytesBuffer.Put(b) diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index 7c3cdb5..e82132d 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -19,7 +19,8 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - AskAgain bool + // AskAgain can be used to not to ask again about the question. + AskAgain bool } // Prompt used for interactive user input. @@ -30,7 +31,6 @@ type Prompt struct { func (p *Prompt) askString() string { var sb strings.Builder - sb.WriteString(p.question) sb.WriteString("? (") @@ -41,7 +41,6 @@ func (p *Prompt) askString() string { sb.WriteString(strings.Join(ax, ",")) sb.WriteString("): ") - return sb.String() } @@ -68,7 +67,6 @@ func (p *Prompt) Ask() { if a.Callback != nil { a.Callback() } - if !a.AskAgain { dlog.Common.Resume() if a.EndCallback != nil { @@ -90,6 +88,5 @@ func (p *Prompt) answer(answerStr string) (*Answer, bool) { default: } } - return nil, false } diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index 14056c4..584b59c 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -14,10 +14,8 @@ import ( func InterruptCh(ctx context.Context) <-chan string { sigIntCh := make(chan os.Signal) gosignal.Notify(sigIntCh, os.Interrupt) - sigOtherCh := make(chan os.Signal) gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) - statsCh := make(chan string) go func() { @@ -41,7 +39,6 @@ func InterruptCh(ctx context.Context) <-chan string { } } }() - return statsCh } diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 14e6943..c50c7a1 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -38,7 +38,6 @@ func (s *AggregateSet) String() string { func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { s.Samples += set.Samples //dlog.Common.Trace("Merge", set) - for _, sc := range query.Select { storage := sc.FieldStorage switch sc.Operation { @@ -115,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) { s.FValues[key] = value return } - if f > value { s.FValues[key] = value } @@ -128,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) { s.FValues[key] = value return } - if f < value { s.FValues[key] = value } @@ -147,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) { // Aggregate data to the aggregate set. func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { var f float64 - // First check if we can aggregate anything without converting value to float. switch agg { case Count: diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index d0c1d70..02a6a5a 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -23,7 +23,9 @@ type Aggregate struct { } // NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { +func NewAggregate(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ query: query, group: mapr.NewGroupSet(), @@ -47,8 +49,8 @@ func (a *Aggregate) Aggregate(message string) error { fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) - var addedSamples bool + for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { @@ -71,14 +73,12 @@ func (a *Aggregate) Aggregate(message string) error { // Re-init local group (make it empty again). a.group.InitSet() } - return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) - for _, part := range parts { kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) if len(kv) != 2 { @@ -86,6 +86,5 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { } fields[kv[0]] = kv[1] } - return fields } diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 0433b9a..418d86f 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -19,13 +19,12 @@ type Function struct { // FunctionStack is a list of functions stacked each other type FunctionStack []Function -// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack. +// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns +// a corresponding function stack. func NewFunctionStack(in string) (FunctionStack, string, error) { var fs FunctionStack - getCallback := func(name string) (CallbackFunc, error) { var cb CallbackFunc - switch name { case "md5sum": return Md5Sum, nil @@ -51,7 +50,6 @@ func NewFunctionStack(in string) (FunctionStack, string, error) { fs = append(fs, Function{name, call}) aux = aux[index+1 : len(aux)-1] } - return fs, aux, nil } @@ -62,6 +60,5 @@ func (fs FunctionStack) Call(str string) string { str = fs[i].call(str) //dlog.Common.Debug("Call.result", fs[i].Name, str) } - return str } diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go index 415683c..8b5d8b7 100644 --- a/internal/mapr/funcs/function_test.go +++ b/internal/mapr/funcs/function_test.go @@ -6,16 +6,19 @@ func TestFunction(t *testing.T) { input := "md5sum($line)" fs, arg, err := NewFunctionStack(input) if err != nil { - t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) + t.Errorf("error parsing function input '%s': %s (%v)\n", + input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result := fs.Call(input) if result != "b38699013d79e50d9d122433753959c1" { - t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) } input = "maskdigits(md5sum(maskdigits($line)))" @@ -24,22 +27,26 @@ func TestFunction(t *testing.T) { t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result = fs.Call(input) if result != ".fac.bbe..bb.........d...a.c..b." { - t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) } input = "md5sum$line)" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } input = "md5sum(makedigits$line))" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } } diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go index d51f3d8..925ec4d 100644 --- a/internal/mapr/funcs/maskdigits.go +++ b/internal/mapr/funcs/maskdigits.go @@ -3,12 +3,10 @@ package funcs // MaskDigits masks all digits (replaces them with .) func MaskDigits(input string) string { s := []byte(input) - for i, b := range s { if '0' <= b && b <= '9' { s[i] = '.' } } - return string(s) } diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index 50bac37..2d7f10b 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -17,7 +17,6 @@ func NewGlobalGroupSet() *GlobalGroupSet { semaphore: make(chan struct{}, 1), } g.InitSet() - return &g } @@ -30,7 +29,6 @@ func (g *GlobalGroupSet) String() string { func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.merge(query, group) } @@ -48,14 +46,12 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro // Merge a group set into the global group set. func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { - for groupKey, set := range group.sets { s := g.GetSet(groupKey) if err := s.Merge(query, set); err != nil { return err } } - return nil } @@ -68,7 +64,6 @@ func (g *GlobalGroupSet) IsEmpty() bool { func (g *GlobalGroupSet) NumSets() int { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return len(g.sets) } @@ -80,7 +75,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { set := &GroupSet{sets: g.sets} g.InitSet() - return set } @@ -88,7 +82,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { func (g *GlobalGroupSet) WriteResult(query *Query) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.WriteResult(query) } @@ -96,6 +89,5 @@ func (g *GlobalGroupSet) WriteResult(query *Query) error { func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.Result(query, rowsLimit) } diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index ce7630d..6ffc8b9 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -73,7 +73,6 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { if err != nil { return "", 0, err } - if query.Limit != -1 { rowsLimit = query.Limit } @@ -91,12 +90,14 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { if sc.FieldStorage == query.OrderBy { attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr) } + for _, groupBy := range query.GroupBy { if sc.FieldStorage == groupBy { attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr) break } } + color.PaintWithAttrs(sb, str, config.Client.TermColors.MaprTable.HeaderFg, config.Client.TermColors.MaprTable.HeaderBg, @@ -191,7 +192,6 @@ func (*GroupSet) writeQueryFile(query *Query) error { fd.WriteString(query.RawQuery) os.Rename(tmpQueryFile, queryFile) - return nil } @@ -256,7 +256,6 @@ func (g *GroupSet) WriteResult(query *Query) error { func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) { var rows []result widths := make([]int, len(query.Select)) - var valueStr string var value float64 @@ -284,7 +283,8 @@ func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, err value = set.FValues[sc.FieldStorage] / float64(set.Samples) valueStr = fmt.Sprintf("%f", value) default: - return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", + sc.Operation) } if sc.FieldStorage == query.OrderBy { @@ -302,7 +302,6 @@ func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, err widths[i] = len(valueStr) } } - rows = append(rows, r) } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 8016667..9b6c855 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -11,9 +11,10 @@ import ( func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) - if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { + if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || + !strings.HasPrefix(splitted[0], "INFO") { // Not a DTail mapreduce log line. - return nil, IgnoreFieldsErr + return nil, ErrIgnoreFields } fields := make(map[string]string, len(splitted)+8) diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index a777156..28e1acc 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -32,49 +32,57 @@ func TestDefaultLogFormat(t *testing.T) { if val, ok := fields["$severity"]; !ok { t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) } else if val != "INFO" { - t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", + val, input) } if val, ok := fields["$time"]; !ok { t.Errorf("Expected field '$time', but no such field there in '%s'\n", input) } else if val != time { - t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", time, val, input) + t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", + time, val, input) } if val, ok := fields["$date"]; !ok { t.Errorf("Expected field '$date', but no such field there in '%s'\n", input) } else if val != date { - t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", date, val, input) + t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", + date, val, input) } if val, ok := fields["$hour"]; !ok { t.Errorf("Expected field '$hour', but no such field there in '%s'\n", input) } else if val != hour { - t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", hour, val, input) + t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", + hour, val, input) } if val, ok := fields["$minute"]; !ok { t.Errorf("Expected field '$minute', but no such field there in '%s'\n", input) } else if val != minute { - t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", minute, val, input) + t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", + minute, val, input) } if val, ok := fields["$second"]; !ok { t.Errorf("Expected field '$second', but no such field there in '%s'\n", input) } else if val != second { - t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", second, val, input) + t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", + second, val, input) } if val, ok := fields["foo"]; !ok { t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) } else if val != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", + val, input) } if val, ok := fields["bar"]; !ok { t.Errorf("Expected field 'bar', but no such field there in '%s'\n", input) } else if val != "foo" { - t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", + val, input) } } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go index 3769c22..433eb5f 100644 --- a/internal/mapr/logformat/generickv.go +++ b/internal/mapr/logformat/generickv.go @@ -6,7 +6,7 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MakeFieldsGENERICKV is the generic key-value logfile parser. +// MakeFieldsGENERIGKV is the generic key-value logfile parser. func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) fields := make(map[string]string, len(splitted)) diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index a352580..129081d 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -11,7 +11,8 @@ import ( "github.com/mimecast/dtail/internal/mapr" ) -var IgnoreFieldsErr error = errors.New("Ignore this field set") +// ErrIgnoreFields indicates that the fields should be ignored. +var ErrIgnoreFields error = errors.New("Ignore this field set") // Parser is used to parse the mapreduce information from the server log files. type Parser struct { @@ -26,11 +27,9 @@ type Parser struct { // NewParser returns a new log parser. func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { hostname, err := os.Hostname() - if err != nil { return nil, err } - now := time.Now() zone, offset := now.Zone() @@ -44,7 +43,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { if err != nil { return nil, err } - return &p, nil } @@ -53,7 +51,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { // Parser. Whereas MODULENAME must be a upeprcase string. func (p *Parser) reflectLogFormat(logFormatName string) error { methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName)) - rt := reflect.TypeOf(p) method, ok := rt.MethodByName(methodName) if !ok { @@ -62,7 +59,6 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { p.makeFieldsFunc = method.Func p.makeFieldsReceiver = reflect.ValueOf(p) - return nil } @@ -70,15 +66,11 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) { inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)} returnValues := p.makeFieldsFunc.Call(inputValues) - errInterface := returnValues[1].Interface() - if errInterface == nil { fields, err = returnValues[0].Interface().(map[string]string), nil return } - fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) - return } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 6c1d849..d7c32bd 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -32,7 +32,9 @@ type Query struct { } func (q Query) String() string { - return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v,LogFormat:%s)", + return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,"+ + "GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,"+ + "RawQuery:%s,tokens:%v,LogFormat:%s)", q.Select, q.Table, q.Where, @@ -54,18 +56,14 @@ func NewQuery(queryStr string) (*Query, error) { if queryStr == "" { return nil, nil } - tokens := tokenize(queryStr) - q := Query{ RawQuery: queryStr, tokens: tokens, Interval: time.Second * 5, Limit: -1, } - - err := q.parse(tokens) - return &q, err + return &q, q.parse(tokens) } // HasOutfile returns true if query result will be written to a CVS output file. @@ -174,13 +172,13 @@ func (q *Query) parse(tokens []token) error { } if len(q.Select) < 1 { - return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") + return errors.New(invalidQuery + "Expected at least one field in 'select' " + + "clause but got none") } if len(q.GroupBy) == 0 { field := q.Select[0].Field q.GroupBy = append(q.GroupBy, field) } - if q.OrderBy != "" { var orderFieldIsValid bool for _, sc := range q.Select { @@ -190,7 +188,8 @@ func (q *Query) parse(tokens []token) error { } } if !orderFieldIsValid { - return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy)) + return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s',"+ + "must be present in 'select' clause", q.OrderBy)) } } diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go index b0b6c3a..88f7387 100644 --- a/internal/mapr/query_test.go +++ b/internal/mapr/query_test.go @@ -13,18 +13,25 @@ func TestParseQuerySimple(t *testing.T) { "select foo from bar where baz <", "select foo from bar where baz < 100 bay eq 12 group", "select foo from bar where baz < 100 bay eq 12 group by foo order by", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit set foo = bar;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit set foo = bar;", } okQueries := []string{"select foo from bar", "select foo from bar where", "select foo from bar where baz < 100 bay eq 12", "select foo from bar where baz < 100, bay eq 12", "select foo from bar where baz < 100 and bay eq 12", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\" set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\"", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\" " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", } for _, queryStr := range errorQueries { @@ -46,8 +53,13 @@ func TestParseQuerySimple(t *testing.T) { func TestParseQueryDeep(t *testing.T) { dialects := []string{ - "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", - "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq " + + "\"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + + "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq " + + "\"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 " + + "SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", } for _, queryStr := range dialects { @@ -55,119 +67,144 @@ func TestParseQueryDeep(t *testing.T) { if err != nil { t.Errorf("%s: %s", err.Error(), queryStr) } - t.Log(q) // 'select' clause if len(q.Select) != 3 { - t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q) + t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", + q.Select, queryStr, q) } - if q.Select[0].Field != "s1" { - t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q) + t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", + q.Select[0].Field, queryStr, q) } if q.Select[0].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of first element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) } - if q.Select[1].Field != "from" { - t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q) + t.Errorf("Expected 'from' as second element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[1].Field, queryStr, q) } if q.Select[1].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of second element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) } - if q.Select[2].Field != "s3" { - t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q) + t.Errorf("Expected 's3' as third element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[2].Field, queryStr, q) } if q.Select[2].Operation != Count { - t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) + t.Errorf("Expected 'count' as aggregation function of third element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) } if q.Select[2].FieldStorage != "count(s3)" { - t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) + t.Errorf("Expected 'count(s3)' as third element's storage in 'select' "+ + "clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) } // 'from' clause if q.Table != "TABLE" { - t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q) + t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", + q.Table, queryStr, q) } // 'where' clause if len(q.Where) != 2 { - t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q) + t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", + q.Where, queryStr, q) } if q.Where[0].lString != "w1" { - t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q) + t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", + q.Where[0].lString, queryStr, q) } if q.Where[0].Operation != FloatEq { - t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected FloatEq operation in first 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[0].rFloat != 2 { - t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) + t.Errorf("Expected '2' as float argument in first 'where' condition but "+ + "got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) } if q.Where[1].lString != "w2" { - t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q) + t.Errorf("Expected w2 as second element in 'where' clause but got '%v': "+ + "%s\n%v", q.Where[1].lString, queryStr, q) } if q.Where[1].Operation != StringEq { - t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected StringEq operation in second 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[1].rString != "free beer" { - t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) + t.Errorf("Expected 'free beer' as string argument in second 'where' "+ + "condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) } // 'group by' clause if len(q.GroupBy) != 2 { - t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q) + t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", + q.GroupBy, queryStr, q) } if q.GroupBy[0] != "g1" { - t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q) + t.Errorf("Expected 'g1' as first element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[0], queryStr, q) } if q.GroupBy[1] != "g2" { - t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q) + t.Errorf("Expected 'g2' as second element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[1], queryStr, q) } if q.GroupKey != "g1,g2" { - t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q) + t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupKey, queryStr, q) } // 'order by' clause if q.OrderBy != "count(s3)" { - t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q) + t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got "+ + "'%v': %s\n%v", q.OrderBy, queryStr, q) } // 'interval' clause if q.Interval != time.Second*time.Duration(10) { - t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q) + t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", + q.Interval, queryStr, q) } // 'limit' clause if q.Limit != 23 { - t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q) + t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", + q.Limit, queryStr, q) } // 'set' clause if q.Set[0].lString != "$foo" { - t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].lString, queryStr, q) + t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].lString, queryStr, q) } if q.Set[0].rString != "bar" { - t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].rString, queryStr, q) + t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].rString, queryStr, q) } - if q.Set[1].lString != "$baz" { - t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].lString, queryStr, q) + t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].lString, queryStr, q) } if q.Set[1].rString != "12" { - t.Errorf("Expected '12' rvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].rString, queryStr, q) + t.Errorf("Expected '12' rvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].rString, queryStr, q) } - if q.Set[2].lString != "$bay" { - t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].lString, queryStr, q) + t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].lString, queryStr, q) } if q.Set[2].rString != "$foo" { - t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].rString, queryStr, q) + t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].rString, queryStr, q) } + // 'logformat' clause if q.LogFormat != "generic" { - t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", q.LogFormat, queryStr, q) + t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", + q.LogFormat, queryStr, q) } } } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index d6aa0d4..5cfb8c7 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -37,7 +37,6 @@ func (sc selectCondition) String() string { func makeSelectConditions(tokens []token) ([]selectCondition, error) { var sel []selectCondition - // Parse select aggregation, e.g. sum(foo) parse := func(token token) (selectCondition, error) { var sc selectCondition @@ -52,13 +51,15 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { a := strings.Split(tokenStr, "(") if len(a) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + + token.str) } agg := a[0] // Aggregation, e.g. 'sum' b := strings.Split(a[1], ")") if len(b) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' field name " + + "from aggregation: " + token.str) } sc.Field = b[0] // Field name, e.g. 'foo' sc.FieldStorage = tokenStr // e.g. 'sum(foo)' @@ -79,9 +80,9 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { case "len": sc.Operation = Len default: - return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) + return sc, errors.New(invalidQuery + + "Unknown aggregation in 'select' clause: " + agg) } - return sc, nil } @@ -92,6 +93,5 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } - return sel, nil } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 1f5d1c3..97fee11 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -63,16 +63,14 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } } - a := Aggregate{ + return &Aggregate{ done: internal.NewDone(), NextLinesCh: make(chan chan line.Line, 10), serialize: make(chan struct{}), hostname: s[0], query: query, parser: logParser, - } - - return &a, nil + }, nil } // Shutdown the aggregation engine. @@ -95,12 +93,10 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { }() fieldsCh := a.fieldsFromLines(myCtx) - // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } - // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) @@ -147,17 +143,18 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - // Can not recycle here for some rason. + // Can't recycle it here yet, as field slices are still + // TODO: Add unit test reading from multiple mapreduce files lines. + // TODO: Add capability to recycle this bytes buffer. //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? - if err != logformat.IgnoreFieldsErr { + if err != logformat.ErrIgnoreFields { dlog.Common.Error(fields, err) } continue } - if !a.query.WhereClause(fields) { continue } @@ -175,12 +172,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin return fieldsCh } -func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - newFieldsCh := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, + fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { defer close(newFieldsCh) - for { fields, ok := <-fieldsCh if !ok { @@ -196,19 +193,18 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map } } }() - return newFieldsCh } -func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, + fieldsCh <-chan map[string]string, maprMessages chan<- string) { + group := mapr.NewGroupSet() serialize := func() { dlog.Common.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() } - for { select { case fields, ok := <-fieldsCh: @@ -227,7 +223,6 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var sb strings.Builder - for i, field := range a.query.GroupBy { if i > 0 { sb.WriteString(protocol.AggregateGroupKeyCombinator) @@ -254,7 +249,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { set.Samples++ return } - dlog.Common.Trace("Aggregated data locally without adding new samples") } diff --git a/internal/mapr/setclause.go b/internal/mapr/setclause.go index b4c2f73..1843d31 100644 --- a/internal/mapr/setclause.go +++ b/internal/mapr/setclause.go @@ -7,7 +7,6 @@ func (q *Query) SetClause(fields map[string]string) error { if !ok { continue } - switch sc.rType { case FunctionStack: fields[sc.lString] = sc.functionStack.Call(value) @@ -15,6 +14,5 @@ func (q *Query) SetClause(fields map[string]string) error { fields[sc.lString] = value } } - return nil } diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go index 8c5cfc9..92b21f4 100644 --- a/internal/mapr/setcondition.go +++ b/internal/mapr/setcondition.go @@ -39,20 +39,22 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { switch setOp { case "=": default: - return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' clause: " + setOp) + return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' " + + "clause: " + setOp) } if !tokens[0].isBareword { - return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' " + + "clause's lValue: " + tokens[0].str) } - sc.lString = tokens[0].str if !strings.HasPrefix(sc.lString, "$") { - return sc, nil, errors.New(invalidQuery + "Expected field variable name (starting with $) at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected field variable name " + + "(starting with $) at 'set' clause's lValue: " + tokens[0].str) } sc.rType = Field - rString := tokens[2].str + // Seems like a function call? if strings.HasSuffix(rString, ")") { functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str) @@ -72,7 +74,6 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { } else { sc.rType = Field } - return sc, tokens[3:], nil } @@ -84,10 +85,8 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { if err != nil { return nil, err } - set = append(set, sc) tokens = tokensConsumeOptional(tokens, ",") } - return } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index 7c6578b..6ac7631 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -4,7 +4,8 @@ import ( "strings" ) -var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", "order", "interval", "limit", "outfile", "logformat"} +var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", + "order", "interval", "limit", "outfile", "logformat"} // Represents a parsed token, used to parse the mapr query. type token struct { @@ -16,13 +17,11 @@ func (t token) isKeyword() bool { if !t.isBareword { return false } - for _, keyword := range keywords { if strings.ToLower(t.str) == keyword { return true } } - return false } @@ -32,7 +31,6 @@ func (t token) String() string { func tokenize(queryStr string) []token { var tokens []token - for i, part := range strings.Split(queryStr, "\"") { // Even i, means that it is not a quoted string if i%2 == 0 { @@ -53,14 +51,12 @@ func tokenize(queryStr string) []token { } tokens = append(tokens, token) } - return tokens } func tokensConsume(tokens []token) ([]token, []token) { //dlog.Common.Trace("=====================") var consumed []token - for i, t := range tokens { if t.isKeyword() { //dlog.Common.Trace("keyword", t) @@ -84,7 +80,6 @@ func tokensConsume(tokens []token) ([]token, []token) { //dlog.Common.Trace("bare", token) consumed = append(consumed, t) } - //dlog.Common.Trace("result", consumed) return nil, consumed } @@ -95,7 +90,6 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } - return tokens, strings } @@ -106,6 +100,5 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } - return tokens } diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go index 6356d94..d9f32eb 100644 --- a/internal/mapr/whereclause.go +++ b/internal/mapr/whereclause.go @@ -10,7 +10,6 @@ import ( func (q *Query) WhereClause(fields map[string]string) bool { for _, wc := range q.Where { var ok bool - if wc.Operation > FloatOperation { var lValue, rValue float64 if lValue, ok = whereClauseFloatValue(fields, wc.lString, wc.lFloat, wc.lType); !ok { @@ -36,11 +35,12 @@ func (q *Query) WhereClause(fields map[string]string) bool { return false } } - return true } -func whereClauseFloatValue(fields map[string]string, str string, float float64, t fieldType) (float64, bool) { +func whereClauseFloatValue(fields map[string]string, str string, float float64, + t fieldType) (float64, bool) { + switch t { case Float: return float, true @@ -60,7 +60,9 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64, } } -func whereClauseStringValue(fields map[string]string, str string, t fieldType) (string, bool) { +func whereClauseStringValue(fields map[string]string, str string, + t fieldType) (string, bool) { + switch t { case Field: value, ok := fields[str] diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index c60c0a5..280dcfb 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -46,15 +46,18 @@ type whereCondition struct { } func (wc *whereCondition) String() string { - return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)", - wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String()) + return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,"+ + "lType:%s,rString:%s,rFloat:%v,rType:%s)", + wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, + wc.rFloat, wc.rType.String()) } func makeWhereConditions(tokens []token) (where []whereCondition, err error) { parse := func(tokens []token) (whereCondition, []token, error) { var wc whereCondition if len(tokens) < 3 { - return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause") + err := errors.New(invalidQuery + "Not enough arguments in 'where' clause") + return wc, nil, err } whereOp := strings.ToLower(tokens[1].str) @@ -94,7 +97,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { case "nhassuffix": wc.Operation = StringNotHasSuffix default: - return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) + return wc, nil, errors.New(invalidQuery + + "Unknown operation in 'where' clause: " + whereOp) } wc.lString = tokens[0].str @@ -102,7 +106,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { if wc.Operation > FloatOperation { if !tokens[0].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) } if f, err := strconv.ParseFloat(wc.lString, 64); err == nil { wc.lFloat = f @@ -112,7 +117,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } if !tokens[2].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) } if f, err := strconv.ParseFloat(wc.rString, 64); err == nil { wc.rFloat = f @@ -133,23 +139,19 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } else { wc.rType = String } - return wc, tokens[3:], nil } for len(tokens) > 0 { var wc whereCondition var err error - wc, tokens, err = parse(tokens) if err != nil { return nil, err } - where = append(where, wc) tokens = tokensConsumeOptional(tokens, "and") } - return } @@ -170,7 +172,6 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { default: dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue) } - return false } @@ -195,6 +196,5 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { default: dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue) } - return false } diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 8c9e861..d29706c 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -13,7 +13,6 @@ const ( AggregateKVDelimiter string = "≔" // AggregateDelimiter delimits parts of an aggregation message. AggregateDelimiter string = "∥" - // AggregateDelimiter string = "⦀" // AggregateGroupKeyCombinator combines the group set keys. AggregateGroupKeyCombinator string = "," ) diff --git a/internal/regex/regex.go b/internal/regex/regex.go index 352ffd6..eb6e1b3 100644 --- a/internal/regex/regex.go +++ b/internal/regex/regex.go @@ -48,9 +48,7 @@ func new(regexStr string, flags []Flag) (Regex, error) { regexStr: regexStr, flags: flags, } - re, err := regexp.Compile(regexStr) - if err != nil { return r, err } @@ -94,11 +92,9 @@ func (r Regex) Serialize() (string, error) { for _, flag := range r.flags { flags = append(flags, flag.String()) } - if !r.initialized { return "", fmt.Errorf("Unable to serialize regex as not initialized properly: %v", r) } - return fmt.Sprintf("regex:%s %s", strings.Join(flags, ","), r.regexStr), nil } @@ -109,12 +105,12 @@ func Deserialize(str string) (Regex, error) { if len(s) < 2 { return NewNoop(), nil } - flagsStr := s[0] regexStr := s[1] if !strings.HasPrefix(flagsStr, "regex") { - return Regex{}, fmt.Errorf("unable to deserialize regex '%s': should start with string 'regex'", str) + return Regex{}, fmt.Errorf("unable to deserialize regex '%s': should start "+ + "with string 'regex'", str) } // Parse regex flags, e.g. "regex:flag1,flag2,flag3..." @@ -129,6 +125,5 @@ func Deserialize(str string) (Regex, error) { flags = append(flags, flag) } } - return new(regexStr, flags) } diff --git a/internal/regex/regex_test.go b/internal/regex/regex_test.go index 2ce49ac..033a286 100644 --- a/internal/regex/regex_test.go +++ b/internal/regex/regex_test.go @@ -9,7 +9,8 @@ func TestRegex(t *testing.T) { r := NewNoop() if !r.MatchString(input) { - t.Errorf("expected to match string '%s' with noop regex '%v' but didn't\n", input, r) + t.Errorf("expected to match string '%s' with noop regex '%v' but didn't\n", + input, r) } r, err := New(".hello", Default) @@ -17,7 +18,8 @@ func TestRegex(t *testing.T) { t.Errorf("unable to create regex: %v\n", err) } if r.MatchString(input) { - t.Errorf("expected to match string '%s' with regex '%v' but didn't\n", input, r) + t.Errorf("expected to match string '%s' with regex '%v' but didn't\n", + input, r) } serialized, err := r.Serialize() @@ -29,8 +31,8 @@ func TestRegex(t *testing.T) { t.Errorf("unable to serialize deserialized regex: %v: %v\n", serialized, err) } if r.String() != r2.String() { - t.Errorf("regex should be the same after deserialize(serialize(..)), got '%s' but expected '%s'.\n", - r2.String(), r.String()) + t.Errorf("regex should be the same after deserialize(serialize(..)), got "+ + "'%s' but expected '%s'.\n", r2.String(), r.String()) } r, err = New(".hello", Invert) @@ -38,7 +40,8 @@ func TestRegex(t *testing.T) { t.Errorf("unable to create regex: %v\n", err) } if !r.MatchString(input) { - t.Errorf("expected to not match string '%s' with regex '%v' but matched\n", input, r) + t.Errorf("expected to not match string '%s' with regex '%v' but matched\n", + input, r) } serialized, err = r.Serialize() @@ -50,7 +53,7 @@ func TestRegex(t *testing.T) { t.Errorf("unable to serialize deserialized regex: %v: %v\n", serialized, err) } if r.String() != r2.String() { - t.Errorf("regex should be the same after deserialize(serialize(..)), got '%s' but expected '%s'.\n", - r2.String(), r.String()) + t.Errorf("regex should be the same after deserialize(serialize(..)), got "+ + "'%s' but expected '%s'.\n", r2.String(), r.String()) } } diff --git a/internal/server/continuous.go b/internal/server/continuous.go index 5f84afc..93b3fcb 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -13,8 +13,7 @@ import ( gossh "golang.org/x/crypto/ssh" ) -type continuous struct { -} +type continuous struct{} func newContinuous() *continuous { return &continuous{} @@ -23,7 +22,6 @@ func newContinuous() *continuous { func (c *continuous) start(ctx context.Context) { dlog.Server.Info("Starting continuous job runner after 10s") time.Sleep(time.Second * 10) - c.runJobs(ctx) } @@ -33,7 +31,6 @@ func (c *continuous) runJobs(ctx context.Context) { dlog.Server.Debug(job.Name, "Not running job as not enabled") continue } - go func(job config.Continuous) { c.runJob(ctx, job) for { @@ -54,7 +51,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { files := fillDates(job.Files) outfile := fillDates(job.Outfile) - servers := strings.Join(job.Servers, ",") if servers == "" { servers = config.Server.SSHBindAddress @@ -70,7 +66,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { } args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile) client, err := clients.NewMaprClient(args, clients.NonCumulativeMode) if err != nil { @@ -80,7 +75,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { jobCtx, cancel := context.WithCancel(ctx) defer cancel() - if job.RestartOnDayChange { go func() { if c.waitForDayChange(ctx) { @@ -93,7 +87,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { dlog.Server.Info(fmt.Sprintf("Starting job %s", job.Name)) status := client.Start(jobCtx, make(chan string)) logMessage := fmt.Sprintf("Job exited with status %d", status) - if status != 0 { dlog.Server.Warn(logMessage) return diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index f73f82e..847e8f9 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -37,7 +37,7 @@ type baseHandler struct { activeCommands int32 quiet bool spartan bool - serverless bool + serverless int32 readBuf bytes.Buffer writeBuf bytes.Buffer } @@ -59,16 +59,14 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { select { case message := <-h.serverMessages: if message[0] == '.' { - // Handle hidden message (don't display to the user, interpreted by dtail client) + // Handle hidden message (don't display to the user) h.readBuf.WriteString(message) h.readBuf.WriteByte(protocol.MessageDelimiter) n = copy(p, h.readBuf.Bytes()) return } - if h.serverless { - // In serverless mode we have logged the server message already via the - // dlog logger, no need to send the message again to the client part. + if h.serverless > 0 { return } @@ -132,7 +130,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { h.writeBuf.WriteByte(b) } } - n = len(p) return } @@ -145,13 +142,11 @@ func (h *baseHandler) handleCommand(commandStr string) { h.send(h.serverMessages, dlog.Server.Error(h.user, err)+add) return } - args, argc, err = h.handleBase64(args, argc) if err != nil { h.send(h.serverMessages, dlog.Server.Error(h.user, err)) return } - ctx, cancel := context.WithCancel(context.Background()) go func() { <-h.done.Done() @@ -160,7 +155,6 @@ func (h *baseHandler) handleCommand(commandStr string) { splitted := strings.Split(args[0], ":") commandName := splitted[0] - options, err := config.DeserializeOptions(splitted[1:]) if err != nil { h.send(h.serverMessages, dlog.Server.Error(h.user, err)) @@ -191,8 +185,8 @@ func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, strin if clientCompat > serverCompat { toUpdate = "server" } - - err := fmt.Errorf("DTail server protocol version '%s' does not match client protocol version '%s', please update DTail %s!", + err := fmt.Errorf("the DTail server protocol version '%s' does not match "+ + "client protocol version '%s', please update DTail %s", protocol.ProtocolCompat, args[1], toUpdate) return args, argc, add, err } @@ -201,8 +195,8 @@ func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, strin } func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) { - err := errors.New("Unable to decode client message, DTail server and client versions may not be compatible") - + err := errors.New("unable to decode client message, DTail server and client " + + "versions may not be compatible") if argc != 2 || args[0] != "base64" { return args, argc, err } @@ -215,7 +209,8 @@ func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, erro args = strings.Split(decodedStr, " ") argc = len(decodedStr) - dlog.Server.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args) + dlog.Server.Trace(h.user, "Base64 decoded received command", + decodedStr, argc, args) return args, argc, nil } @@ -223,7 +218,8 @@ func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, erro func (h *baseHandler) handleAckCommand(argc int, args []string) { if argc < 3 { if !h.quiet { - h.send(h.serverMessages, dlog.Server.Warn(h.user, "Unable to parse command", args, argc)) + h.send(h.serverMessages, dlog.Server.Warn(h.user, + "Unable to parse command", args, argc)) } return } @@ -245,11 +241,9 @@ func (h *baseHandler) send(ch chan<- string, message string) { func (h *baseHandler) flush() { dlog.Server.Trace(h.user, "flush()") - numUnsentMessages := func() int { return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } - for i := 0; i < 10; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) @@ -258,7 +252,6 @@ func (h *baseHandler) flush() { dlog.Server.Debug(h.user, "Still lines to be sent") time.Sleep(time.Millisecond * 10) } - dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) } @@ -279,7 +272,6 @@ func (h *baseHandler) shutdown() { dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") case <-h.done.Done(): } - h.done.Shutdown() } diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index 347ff66..8d6c400 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -35,24 +35,23 @@ func NewHealthHandler(user *user.User) *HealthHandler { if err != nil { dlog.Server.FatalPanic(err) } - s := strings.Split(fqdn, ".") h.hostname = s[0] - return &h } -func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, args []string, - commandName string, options map[string]string) { - dlog.Server.Debug(h.user, "Handling health command", argc, args) +func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, + args []string, commandName string, options map[string]string) { + dlog.Server.Debug(h.user, "Handling health command", argc, args) switch commandName { case "health": h.send(h.serverMessages, "OK") case ".ack": h.handleAckCommand(argc, args) default: - h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown health command", commandName, argc, args)) + h.send(h.serverMessages, dlog.Server.Error(h.user, + "Received unknown health command", commandName, argc, args)) } h.shutdown() } diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index c3e600e..65e0ed8 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -14,18 +14,17 @@ type mapCommand struct { } // NewMapCommand returns a new server side mapreduce command. -func newMapCommand(serverHandler *ServerHandler, argc int, args []string) (mapCommand, *server.Aggregate, error) { - m := mapCommand{server: serverHandler} +func newMapCommand(serverHandler *ServerHandler, argc int, + args []string) (mapCommand, *server.Aggregate, error) { + m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") aggregate, err := server.NewAggregate(queryStr) if err != nil { return m, nil, err } - m.aggregate = aggregate return m, aggregate, nil - } func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index abc44c7..384e966 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -26,25 +26,30 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { - re := regex.NewNoop() +func (r *readCommand) Start(ctx context.Context, argc int, args []string, + retries int) { + re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, "Unable to parse command", err)) + r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, + "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to parse command", args, argc)) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to parse command", args, argc)) return } r.readGlob(ctx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { +func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, + retries int) { + retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -58,7 +63,8 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, if numPaths := len(paths); numPaths == 0 { dlog.Server.Error(r.server.user, "No such file(s) to read", glob) - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -72,31 +78,33 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Giving up to read file(s)")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Giving up to read file(s)")) return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, + re regex.Regex, retryInterval time.Duration) { + var wg sync.WaitGroup wg.Add(len(paths)) - for _, path := range paths { go r.readFileIfPermissions(ctx, &wg, path, glob, re) } - wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, + wg *sync.WaitGroup, path, glob string, re regex.Regex) { + defer wg.Done() globID := r.makeGlobID(path, glob) - if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) return } - r.readFile(ctx, path, globID, re) } @@ -137,7 +145,6 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege return } } - time.Sleep(time.Second * 2) dlog.Server.Info(path, globID, "Reading file again") } @@ -156,11 +163,11 @@ func (r *readCommand) makeGlobID(path, glob string) string { if len(idParts) > 0 { return strings.Join(idParts, "/") } - if len(pathParts) > 0 { return pathParts[len(pathParts)-1] } - r.server.send(r.server.serverMessages, dlog.Server.Warn("Empty file path given?", path, glob)) + r.server.send(r.server.serverMessages, + dlog.Server.Warn("Empty file path given?", path, glob)) return "" } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index aed8956..f12d590 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,6 +4,7 @@ import ( "context" "os" "strings" + "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" @@ -23,7 +24,9 @@ type ServerHandler struct { } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, + tailLimiter chan struct{}) *ServerHandler { + dlog.Server.Debug(user, "Creating new server handler") h := ServerHandler{ baseHandler: baseHandler{ @@ -51,11 +54,10 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, - commandName string, options map[string]string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, + args []string, commandName string, options map[string]string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) - h.incrementActiveCommands() commandFinished := func() { if h.decrementActiveCommands() == 0 { @@ -73,7 +75,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] } if serverless, _ := options["serverless"]; serverless == "true" { dlog.Server.Debug(h.user, "Enabling serverless mode") - h.serverless = true + atomic.AddInt32(&h.serverless, 1) } switch commandName { @@ -83,14 +85,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command.Start(ctx, argc, args, 1) commandFinished() }() - case "tail": command := newReadCommand(h, omode.TailClient) go func() { command.Start(ctx, argc, args, 10) commandFinished() }() - case "map": command, aggregate, err := newMapCommand(h, argc, args) if err != nil { @@ -99,19 +99,17 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() return } - h.aggregate = aggregate go func() { command.Start(ctx, h.maprMessages) commandFinished() }() - case ".ack": h.handleAckCommand(argc, args) commandFinished() - default: - h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options)) + h.send(h.serverMessages, dlog.Server.Error(h.user, + "Received unknown user command", commandName, argc, args, options)) commandFinished() } } diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index ccb2225..0ba65f7 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -16,8 +16,7 @@ import ( gossh "golang.org/x/crypto/ssh" ) -type scheduler struct { -} +type scheduler struct{} func newScheduler() *scheduler { return &scheduler{} @@ -28,7 +27,6 @@ func (s *scheduler) start(ctx context.Context) { // First run after just 10s! time.Sleep(time.Second * 10) s.runJobs(ctx) - for { select { case <-time.After(time.Minute): @@ -45,13 +43,11 @@ func (s *scheduler) runJobs(ctx context.Context) { dlog.Server.Debug(job.Name, "Not running job as not enabled") continue } - hour, err := strconv.Atoi(time.Now().Format("15")) if err != nil { dlog.Server.Error(job.Name, "Unable to create job", err) continue } - if hour < job.TimeRange[0] || hour >= job.TimeRange[1] { dlog.Server.Debug(job.Name, "Not running job out of time range") continue @@ -59,7 +55,6 @@ func (s *scheduler) runJobs(ctx context.Context) { files := fillDates(job.Files) outfile := fillDates(job.Outfile) - _, err = os.Stat(outfile) if !os.IsNotExist(err) { dlog.Server.Debug(job.Name, "Not running job as outfile already exists", outfile) @@ -70,7 +65,6 @@ func (s *scheduler) runJobs(ctx context.Context) { if servers == "" { servers = config.Server.SSHBindAddress } - args := config.Args{ ConnectionsPerCPU: config.DefaultConnectionsPerCPU, Discovery: job.Discovery, @@ -81,7 +75,6 @@ func (s *scheduler) runJobs(ctx context.Context) { } args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile) client, err := clients.NewMaprClient(args, clients.CumulativeMode) if err != nil { diff --git a/internal/server/server.go b/internal/server/server.go index b3d4bff..0cb5e27 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -24,9 +24,9 @@ type Server struct { stats stats // SSH server configuration. sshServerConfig *gossh.ServerConfig - // To control the max amount of concurrent cats (which can cause a lot of I/O on the server) + // To control the max amount of concurrent cats. catLimiter chan struct{} - // To control the max amount of concurrent tails + // To control the max amount of concurrent tails. tailLimiter chan struct{} // To run scheduled tasks (if configured) sched *scheduler @@ -61,7 +61,6 @@ func New() *Server { // Start the server. func (s *Server) Start(ctx context.Context) int { dlog.Server.Info("Starting server") - bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort) dlog.Server.Info("Binding server", bindAt) @@ -76,14 +75,12 @@ func (s *Server) Start(ctx context.Context) int { go s.listenerLoop(ctx, listener) <-ctx.Done() - // For future use. return 0 } func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { dlog.Server.Debug("Starting listener loop") - for { conn, err := listener.Accept() // Blocking if err != nil { @@ -101,7 +98,6 @@ func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { conn.Close() continue } - go s.handleConnection(ctx, conn) } } @@ -116,22 +112,23 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) { } s.stats.incrementConnections() - go gossh.DiscardRequests(reqs) for newChannel := range chans { go s.handleChannel(ctx, sshConn, newChannel) } } -func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) { +func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, + newChannel gossh.NewChannel) { + user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String()) if err != nil { dlog.Server.Error(user, err) newChannel.Reject(gossh.Prohibited, err.Error()) return } - dlog.Server.Info(user, "Invoking channel handler") + dlog.Server.Info(user, "Invoking channel handler") if newChannel.ChannelType() != "session" { err := errors.New("Don'w allow other channel types than session") dlog.Server.Error(user, err) @@ -151,9 +148,10 @@ func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChann } } -func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error { - dlog.Server.Info(user, "Invoking request handler") +func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, + in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error { + dlog.Server.Info(user, "Invoking request handler") for req := range in { var payload = struct{ Value string }{} gossh.Unmarshal(req.Payload, &payload) @@ -167,7 +165,6 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch default: handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } - terminate := func() { handler.Shutdown() sshConn.Close() @@ -178,13 +175,11 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch io.Copy(channel, handler) terminate() }() - go func() { // Broken pipe, cancel io.Copy(handler, channel) terminate() }() - go func() { select { case <-ctx.Done(): @@ -192,7 +187,6 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch } terminate() }() - go func() { if err := sshConn.Wait(); err != nil && err != io.EOF { dlog.Server.Error(user, err) @@ -204,20 +198,19 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch // Only serving shell type req.Reply(true, nil) - default: req.Reply(false, nil) - return fmt.Errorf("Closing SSH connection as unknown request recieved|%s|%v", req.Type, payload.Value) } } - return nil } // Callback for SSH authentication. -func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { +func (s *Server) Callback(c gossh.ConnMetadata, + authPayload []byte) (*gossh.Permissions, error) { + user, err := user.New(c.User(), c.RemoteAddr().String()) if err != nil { return nil, err @@ -229,7 +222,6 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm } authInfo := string(authPayload) - splitted := strings.Split(c.RemoteAddr().String(), ":") remoteIP := splitted[0] @@ -259,23 +251,26 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm return nil, fmt.Errorf("user %s not authorized", user) } -func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool { - dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom) +func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, + allowedJobName string, allowFrom []string) bool { + dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom) if jobName != allowedJobName { - dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName) + dlog.Server.Debug(user, jobName, "backgroundCanSSH", + "Job name does not match, skipping to next one...", allowedJobName) return false } for _, myAddr := range allowFrom { ips, err := net.LookupIP(myAddr) if err != nil { - dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP address for allowed hosts lookup, skipping to next one...", myAddr, err) + dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP "+ + "address for allowed hosts lookup, skipping to next one...", myAddr, err) continue } - for _, ip := range ips { - dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String()) + dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", + remoteIP, ip.String()) if remoteIP == ip.String() { return true } diff --git a/internal/server/stats.go b/internal/server/stats.go index c07634d..99a644a 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -19,7 +19,6 @@ type stats struct { func (s *stats) incrementConnections() { defer s.logServerStats() - s.mutex.Lock() s.currentConnections++ s.lifetimeConnections++ @@ -28,7 +27,6 @@ func (s *stats) incrementConnections() { func (s *stats) decrementConnections() { defer s.logServerStats() - s.mutex.Lock() s.currentConnections-- s.mutex.Unlock() @@ -40,8 +38,8 @@ func (s *stats) hasConnections() bool { s.mutex.Unlock() has := currentConnections > 0 - dlog.Server.Info("stats", "Server with open connections?", has, currentConnections) - + dlog.Server.Info("stats", "Server with open connections?", + has, currentConnections) return has } @@ -52,7 +50,6 @@ func (s *stats) logServerStats() { data := make(map[string]interface{}) data["currentConnections"] = s.currentConnections data["lifetimeConnections"] = s.lifetimeConnections - dlog.Server.Mapreduce("STATS", data) } @@ -61,9 +58,9 @@ func (s *stats) serverLimitExceeded() error { defer s.mutex.Unlock() if s.currentConnections >= config.Server.MaxConnections { - return fmt.Errorf("Exceeded max allowed concurrent connections of %d", config.Server.MaxConnections) + return fmt.Errorf("Exceeded max allowed concurrent connections of %d", + config.Server.MaxConnections) } - return nil } diff --git a/internal/source/source.go b/internal/source/source.go index be7aecd..4bb0784 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -1,10 +1,19 @@ package source +// Source specifies the origin of either the current process (dtail is a client +// process, dserver is a server process) or the source code package (e.g. +// dserver server side code or dtail client side code). Notice that dtail client +// may also executes server code directly (e.g. via serverless mode) and that +// the dserver may also executes client code (e.g. via scheduled server side +// mapreduce queries). type Source int const ( - Client Source = iota - Server Source = iota + // Client process or source code package. + Client Source = iota + // Server process or source code package. + Server Source = iota + // HealthCheck process or client source code package. HealthCheck Source = iota ) @@ -17,6 +26,5 @@ func (s Source) String() string { case HealthCheck: return "HEALTHCHECK" } - panic("Unknown source type") } diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go index 4508319..ced1fb9 100644 --- a/internal/ssh/client/authmethods.go +++ b/internal/ssh/client/authmethods.go @@ -11,7 +11,10 @@ import ( ) // InitSSHAuthMethods initialises all known SSH auth methods on the client side. -func InitSSHAuthMethods(sshAuthMethods []gossh.AuthMethod, hostKeyCallback gossh.HostKeyCallback, trustAllHosts bool, throttleCh chan struct{}, privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) { +func InitSSHAuthMethods(sshAuthMethods []gossh.AuthMethod, + hostKeyCallback gossh.HostKeyCallback, trustAllHosts bool, throttleCh chan struct{}, + privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) { + if len(sshAuthMethods) > 0 { simpleCallback, err := NewSimpleCallback() if err != nil { @@ -19,20 +22,21 @@ func InitSSHAuthMethods(sshAuthMethods []gossh.AuthMethod, hostKeyCallback gossh } return sshAuthMethods, simpleCallback } - return initKnownHostsAuthMethods(trustAllHosts, throttleCh, privateKeyPath) } -func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) { - var sshAuthMethods []gossh.AuthMethod +func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, + privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) { + var sshAuthMethods []gossh.AuthMethod knownHostsPath := os.Getenv("HOME") + "/.ssh/known_hosts" - knownHostsCallback, err := NewKnownHostsCallback(knownHostsPath, trustAllHosts, throttleCh) + knownHostsCallback, err := NewKnownHostsCallback(knownHostsPath, trustAllHosts, + throttleCh) if err != nil { dlog.Common.FatalPanic(knownHostsPath, err) } - dlog.Common.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsPath) - + dlog.Common.Debug("initKnownHostsAuthMethods", "Added known hosts file path", + knownHostsPath) if config.Common.ExperimentalFeaturesEnable { sshAuthMethods = append(sshAuthMethods, gossh.Password("experimental feature test")) dlog.Common.Debug("initKnownHostsAuthMethods", "Added experimental method to list of auth methods") @@ -43,7 +47,9 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri authMethod, err := ssh.PrivateKey(privateKeyPath) if err == nil { sshAuthMethods = append(sshAuthMethods, authMethod) - dlog.Common.Debug("initKnownHostsAuthMethods", "Added path to list of auth methods, not adding further methods", privateKeyPath) + dlog.Common.Debug("initKnownHostsAuthMethods", + "Added path to list of auth methods, not adding further methods", + privateKeyPath) return sshAuthMethods, knownHostsCallback } dlog.Common.FatalPanic("Unable to use private SSH key", privateKeyPath, err) @@ -53,30 +59,35 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri authMethod, err := ssh.Agent() if err == nil { sshAuthMethods = append(sshAuthMethods, authMethod) - dlog.Common.Debug("initKnownHostsAuthMethods", "Added SSH Agent (SSH_AUTH_SOCK) to list of auth methods, not adding further methods") + dlog.Common.Debug("initKnownHostsAuthMethods", "Added SSH Agent (SSH_AUTH_SOCK)"+ + "to list of auth methods, not adding further methods") return sshAuthMethods, knownHostsCallback } - dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to init SSH Agent auth method", err) + dlog.Common.Debug("initKnownHostsAuthMethods", + "Unable to init SSH Agent auth method", err) // Third, try Linux/UNIX default key paths privateKeyPath = os.Getenv("HOME") + "/.ssh/id_rsa" authMethod, err = ssh.PrivateKey(privateKeyPath) if err == nil { sshAuthMethods = append(sshAuthMethods, authMethod) - dlog.Common.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath) + dlog.Common.Debug("initKnownHostsAuthmethods", + "Added path to list of auth methods, not adding further methods", privateKeyPath) return sshAuthMethods, knownHostsCallback } - dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err) + dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", + privateKeyPath, err) privateKeyPath = os.Getenv("HOME") + "/.ssh/id_dsa" authMethod, err = ssh.PrivateKey(privateKeyPath) if err == nil { sshAuthMethods = append(sshAuthMethods, authMethod) - dlog.Common.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath) + dlog.Common.Debug("initKnownHostsAuthmethods", + "Added path to list of auth methods, not adding further methods", privateKeyPath) return sshAuthMethods, knownHostsCallback } - dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err) - + dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", + privateKeyPath, err) dlog.Common.FatalPanic("Unable to find private SSH key information") // Never reach this point. diff --git a/internal/ssh/client/customkeycallback.go b/internal/ssh/client/customkeycallback.go index 73e5289..53b8e3c 100644 --- a/internal/ssh/client/customkeycallback.go +++ b/internal/ssh/client/customkeycallback.go @@ -7,8 +7,7 @@ import ( ) // CustomCallback is a custom host key callback wrapper. -type CustomCallback struct { -} +type CustomCallback struct{} // NewCustomCallback returns a new wrapper. func NewCustomCallback() (*CustomCallback, error) { diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go index a73d612..65a590a 100644 --- a/internal/ssh/client/knownhostscallback.go +++ b/internal/ssh/client/knownhostscallback.go @@ -46,8 +46,9 @@ type KnownHostsCallback struct { } // NewKnownHostsCallback returns a new wrapper. -func NewKnownHostsCallback(knownHostsPath string, trustAllHosts bool, throttleCh chan struct{}) (HostKeyCallback, error) { - // Ensure file exists +func NewKnownHostsCallback(knownHostsPath string, trustAllHosts bool, + throttleCh chan struct{}) (HostKeyCallback, error) { + os.OpenFile(knownHostsPath, os.O_RDONLY|os.O_CREATE, 0666) untrustedHosts := make(map[string]bool) @@ -59,11 +60,9 @@ func NewKnownHostsCallback(knownHostsPath string, trustAllHosts bool, throttleCh untrustedHosts: untrustedHosts, mutex: &sync.Mutex{}, } - if trustAllHosts { close(c.trustAllHostsCh) } - return c, nil } @@ -75,14 +74,12 @@ func (c KnownHostsCallback) Wrap() ssh.HostKeyCallback { if err != nil { return err } - // Check for valid entry in known_hosts file err = knownHostsCb(server, remote, key) if err == nil { // OK return nil } - // Make sure that interactive user callback does not interfere with // SSH connection throttler. <-c.throttleCh @@ -96,11 +93,9 @@ func (c KnownHostsCallback) Wrap() ssh.HostKeyCallback { ipLine: knownhosts.Line([]string{remote.String()}, key), responseCh: make(chan response), } - dlog.Common.Warn("Encountered unknown host", unknown) // Notify user that there is an unknown host c.unknownCh <- unknown - // Wait for user input. switch <-unknown.responseCh { case trustHost: @@ -112,7 +107,6 @@ func (c KnownHostsCallback) Wrap() ssh.HostKeyCallback { c.mutex.Lock() defer c.mutex.Unlock() c.untrustedHosts[server] = true - return err } } @@ -121,7 +115,6 @@ func (c KnownHostsCallback) Wrap() ssh.HostKeyCallback { // be added to the known hosts or not. func (c KnownHostsCallback) PromptAddHosts(ctx context.Context) { var hosts []unknownHost - for { // Check whether there is a unknown host select { @@ -147,7 +140,6 @@ func (c KnownHostsCallback) PromptAddHosts(ctx context.Context) { func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) { var servers []string - for _, host := range hosts { servers = append(servers, host.server) } @@ -165,7 +157,6 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) { strings.Join(servers, ","), "Do you want to trust these hosts?", ) - p := prompt.New(question) a := prompt.Answer{ @@ -223,7 +214,6 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) { func (c KnownHostsCallback) trustHosts(hosts []unknownHost) { tmpKnownHostsPath := fmt.Sprintf("%s.tmp", c.knownHostsPath) - newFd, err := os.OpenFile(tmpKnownHostsPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) if err != nil { panic(fmt.Sprintf("%s: %s", tmpKnownHostsPath, err.Error())) @@ -232,7 +222,6 @@ func (c KnownHostsCallback) trustHosts(hosts []unknownHost) { // Newly trusted hosts in normalized form addresses := make(map[string]struct{}) - // First write to new known hosts file, and keep track of addresses for _, unknown := range hosts { unknown.responseCh <- trustHost @@ -255,7 +244,6 @@ func (c KnownHostsCallback) trustHosts(hosts []unknownHost) { defer oldFd.Close() scanner := bufio.NewScanner(oldFd) - // Now, append all still valid old entries to the new host file for scanner.Scan() { line := scanner.Text() @@ -283,6 +271,5 @@ func (c KnownHostsCallback) Untrusted(server string) bool { c.mutex.Lock() defer c.mutex.Unlock() _, ok := c.untrustedHosts[server] - return ok } diff --git a/internal/ssh/server/hostkey.go b/internal/ssh/server/hostkey.go index 20de1f0..33bd4e8 100644 --- a/internal/ssh/server/hostkey.go +++ b/internal/ssh/server/hostkey.go @@ -24,7 +24,8 @@ func PrivateHostKey() []byte { pem := ssh.EncodePrivateKeyToPEM(privateKey) if err := ioutil.WriteFile(hostKeyFile, pem, 0600); err != nil { - dlog.Common.Error("Unable to write private server RSA host key to file", hostKeyFile, err) + dlog.Common.Error("Unable to write private server RSA host key to file", + hostKeyFile, err) } return pem } diff --git a/internal/ssh/server/publickeycallback.go b/internal/ssh/server/publickeycallback.go index 59d1f31..ebc428a 100644 --- a/internal/ssh/server/publickeycallback.go +++ b/internal/ssh/server/publickeycallback.go @@ -13,25 +13,28 @@ import ( gossh "golang.org/x/crypto/ssh" ) -// PublicKeyCallback is for the server to check whether a public SSH key is authorized ot not. -func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*gossh.Permissions, error) { +// PublicKeyCallback is for the server to check whether a public SSH key is +// authorized ot not. +func PublicKeyCallback(c gossh.ConnMetadata, + offeredPubKey gossh.PublicKey) (*gossh.Permissions, error) { + user, err := user.New(c.User(), c.RemoteAddr().String()) if err != nil { return nil, err } - dlog.Common.Info(user, "Incoming authorization") + dlog.Common.Info(user, "Incoming authorization") cwd, err := os.Getwd() if err != nil { return nil, fmt.Errorf("Unable to get current working directory|%s|", err.Error()) } - if config.ServerRelaxedAuthEnable { dlog.Common.Fatal(user, "Granting permissions via relaxed-auth") return nil, nil } - authorizedKeysFile := fmt.Sprintf("%s/%s/%s.authorized_keys", cwd, config.Common.CacheDir, user.Name) + authorizedKeysFile := fmt.Sprintf("%s/%s/%s.authorized_keys", cwd, + config.Common.CacheDir, user.Name) if _, err := os.Stat(authorizedKeysFile); os.IsNotExist(err) { user, err := osUser.Lookup(user.Name) if err != nil { @@ -44,23 +47,25 @@ func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*go dlog.Common.Info(user, "Reading", authorizedKeysFile) authorizedKeysBytes, err := ioutil.ReadFile(authorizedKeysFile) if err != nil { - return nil, fmt.Errorf("Unable to read authorized keys file|%s|%s|%s", authorizedKeysFile, user, err.Error()) + return nil, fmt.Errorf("Unable to read authorized keys file|%s|%s|%s", + authorizedKeysFile, user, err.Error()) } authorizedKeysMap := map[string]bool{} for len(authorizedKeysBytes) > 0 { authorizedPubKey, _, _, restBytes, err := gossh.ParseAuthorizedKey(authorizedKeysBytes) if err != nil { - return nil, fmt.Errorf("Unable to parse authorized keys bytes|%s|%s", user, err.Error()) + return nil, fmt.Errorf("Unable to parse authorized keys bytes|%s|%s", + user, err.Error()) } authorizedKeysMap[string(authorizedPubKey.Marshal())] = true authorizedKeysBytes = restBytes - - dlog.Common.Debug(user, "Authorized public key fingerprint", gossh.FingerprintSHA256(authorizedPubKey)) + dlog.Common.Debug(user, "Authorized public key fingerprint", + gossh.FingerprintSHA256(authorizedPubKey)) } - dlog.Common.Debug(user, "Offered public key fingerprint", gossh.FingerprintSHA256(offeredPubKey)) - + dlog.Common.Debug(user, "Offered public key fingerprint", + gossh.FingerprintSHA256(offeredPubKey)) if authorizedKeysMap[string(offeredPubKey.Marshal())] { return &gossh.Permissions{ Extensions: map[string]string{ diff --git a/internal/ssh/ssh.go b/internal/ssh/ssh.go index 56494a7..db5aaf1 100644 --- a/internal/ssh/ssh.go +++ b/internal/ssh/ssh.go @@ -24,12 +24,10 @@ func GeneratePrivateRSAKey(size int) (*rsa.PrivateKey, error) { if err != nil { return nil, err } - err = privateKey.Validate() if err != nil { return nil, err } - return privateKey, nil } @@ -42,7 +40,6 @@ func EncodePrivateKeyToPEM(privateKey *rsa.PrivateKey) []byte { Headers: nil, Bytes: derFormat, } - return pem.EncodeToMemory(&block) } @@ -80,7 +77,6 @@ func KeyFile(keyFile string) (gossh.AuthMethod, error) { if err != nil { return nil, err } - key, err := gossh.ParsePrivateKey(buffer) if err != nil { return nil, err diff --git a/internal/user/name.go b/internal/user/name.go index 28ab0a4..cd11907 100644 --- a/internal/user/name.go +++ b/internal/user/name.go @@ -10,11 +10,9 @@ func NoRootCheck() { if err != nil { panic(err) } - if user.Uid == "0" { panic("Not allowed to run as UID 0") } - if user.Gid == "0" { panic("Not allowed to run as GID 0") } @@ -26,6 +24,5 @@ func Name() string { if err != nil { panic(err) } - return user.Username } diff --git a/internal/user/server/user.go b/internal/user/server/user.go index 70ead1c..aa7f8b1 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -49,7 +49,6 @@ func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission dlog.Server.Fatal(u, filePath, permissionType, "Server releaxed auth enabled") return true } - if u.Name == config.ScheduleUser || u.Name == config.ContinuousUser { // Background user has same permissions as dtail process itself. return true @@ -57,27 +56,29 @@ func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission cleanPath, err := filepath.EvalSymlinks(filePath) if err != nil { - dlog.Server.Error(u, filePath, permissionType, "Unable to evaluate symlinks", err) + dlog.Server.Error(u, filePath, permissionType, + "Unable to evaluate symlinks", err) hasPermission = false return } cleanPath, err = filepath.Abs(cleanPath) if err != nil { - dlog.Server.Error(u, cleanPath, permissionType, "Unable to make file path absolute", err) + dlog.Server.Error(u, cleanPath, permissionType, + "Unable to make file path absolute", err) hasPermission = false return } if cleanPath != filePath { - dlog.Server.Info(u, filePath, cleanPath, permissionType, "Calculated new clean path from original file path (possibly symlink)") + dlog.Server.Info(u, filePath, cleanPath, permissionType, + "Calculated new clean path from original file path (possibly symlink)") } hasPermission, err = u.hasFilePermission(cleanPath, permissionType) if err != nil { dlog.Server.Warn(u, cleanPath, err) } - return } @@ -86,18 +87,17 @@ func (u *User) hasFilePermission(cleanPath, permissionType string) (bool, error) if _, err := permissions.ToRead(u.Name, cleanPath); err != nil { return false, fmt.Errorf("User without OS file system permissions to read path: '%v'", err) } - dlog.Server.Info(u, cleanPath, permissionType, "User with OS file system permissions to path") + dlog.Server.Info(u, cleanPath, permissionType, + "User with OS file system permissions to path") // Only allow to follow regular files or symlinks. info, err := os.Lstat(cleanPath) if err != nil { return false, fmt.Errorf("Unable to determine file type: '%v'", err) } - if !info.Mode().IsRegular() { return false, fmt.Errorf("Can only open regular files or follow symlinks") } - hasPermission, err := u.iteratePaths(cleanPath, permissionType) if err != nil { return false, err @@ -109,10 +109,8 @@ func (u *User) hasFilePermission(cleanPath, permissionType string) (bool, error) func (u *User) iteratePaths(cleanPath, permissionType string) (bool, error) { // By default assume no permissions hasPermission := false - for _, permission := range u.permissions { typeStr := "readfiles" // Assume ReadFiles by default. - var regexStr string var negate bool @@ -123,7 +121,6 @@ func (u *User) iteratePaths(cleanPath, permissionType string) (bool, error) { } dlog.Server.Debug(u, cleanPath, typeStr, permission) - if typeStr != permissionType { continue } @@ -136,16 +133,17 @@ func (u *User) iteratePaths(cleanPath, permissionType string) (bool, error) { re, err := regexp.Compile(regexStr) if err != nil { - return false, fmt.Errorf("Permission test failed, can't compile regex '%s': '%v'", regexStr, err) + return false, fmt.Errorf("Permission test failed, can't compile regex "+ + "'%s': '%v'", regexStr, err) } - if negate && re.MatchString(cleanPath) { - dlog.Server.Info(u, cleanPath, "Permission test failed partially, matching negative pattern '%s'", permission) + dlog.Server.Info(u, cleanPath, "Permission test failed partially, "+ + "matching negative pattern '%s'", permission) hasPermission = false } - if !negate && re.MatchString(cleanPath) { - dlog.Server.Info(u, cleanPath, "Permission test passed partially, matching positive pattern", permission) + dlog.Server.Info(u, cleanPath, "Permission test passed partially, "+ + "matching positive pattern", permission) hasPermission = true } } diff --git a/internal/version/version.go b/internal/version/version.go index 4ff6eae..68b9e6e 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -20,7 +20,8 @@ const ( // String representation of the DTail version. func String() string { - return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, protocol.ProtocolCompat, Additional) + return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, + protocol.ProtocolCompat, Additional) } // PaintedString is a prettier string representation of the DTail version. @@ -31,13 +32,10 @@ func PaintedString() string { name := color.PaintStrWithAttr(fmt.Sprintf(" %s ", Name), color.FgYellow, color.BgBlue, color.AttrBold) - version := color.PaintStrWithAttr(fmt.Sprintf(" %s ", Version), color.FgBlue, color.BgYellow, color.AttrBold) - protocol := color.PaintStr(fmt.Sprintf(" Protocol %s ", protocol.ProtocolCompat), color.FgBlack, color.BgGreen) - additional := color.PaintStrWithAttr(fmt.Sprintf(" %s ", Additional), color.FgWhite, color.BgMagenta, color.AttrUnderline) |
