diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-09 16:44:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 13:36:32 +0300 |
| commit | 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (patch) | |
| tree | 2c1aa056285b3e5d4febefd114a4b95f62071386 | |
| parent | 2d7ddbeae8286373ac19787dc7dde598a7cb0598 (diff) | |
add dtail health check unit test.
| -rw-r--r-- | Makefile | 1 | ||||
| -rw-r--r-- | cmd/dserver/main.go | 2 | ||||
| -rw-r--r-- | cmd/dtailhealthcheck/main.go | 3 | ||||
| -rw-r--r-- | docker/dtail.json | 9 | ||||
| -rw-r--r-- | integrationtests/commons.go | 33 | ||||
| -rw-r--r-- | integrationtests/dcat_test.go | 2 | ||||
| -rw-r--r-- | integrationtests/dgrep_test.go | 4 | ||||
| -rw-r--r-- | integrationtests/dmap_test.go | 4 | ||||
| -rw-r--r-- | integrationtests/dtail_test.go | 2 | ||||
| -rw-r--r-- | integrationtests/dtailhealthcheck.expected | 1 | ||||
| -rw-r--r-- | integrationtests/dtailhealthcheck2.expected | 1 | ||||
| -rw-r--r-- | integrationtests/dtailhealthcheck3.expected | 1 | ||||
| -rw-r--r-- | integrationtests/dtailhealthcheck_test.go | 83 | ||||
| -rw-r--r-- | internal/config/initializer.go | 29 | ||||
| -rw-r--r-- | internal/io/dlog/dlog.go | 43 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/file.go | 10 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 6 | ||||
| -rw-r--r-- | samples/dtail.json.sample | 5 |
18 files changed, 172 insertions, 67 deletions
@@ -45,6 +45,7 @@ lint: golint $$dir; \ done test: + ${GO} clean -testcache ifndef USE_ACL ${GO} test -race ./... -v else diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go index 780c6d5..b4fc873 100644 --- a/cmd/dserver/main.go +++ b/cmd/dserver/main.go @@ -40,7 +40,7 @@ func main() { flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.LogDir, "logDir", "", "Log dir") flag.StringVar(&args.Logger, "logger", config.DefaultServerLogger, "Logger name") - flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") + flag.StringVar(&args.LogLevel, "logLevel", config.DefaultLogLevel, "Log level") flag.Parse() args.NoColor = !color diff --git a/cmd/dtailhealthcheck/main.go b/cmd/dtailhealthcheck/main.go index 71c162e..0f37f8a 100644 --- a/cmd/dtailhealthcheck/main.go +++ b/cmd/dtailhealthcheck/main.go @@ -36,6 +36,8 @@ func main() { var wg sync.WaitGroup wg.Add(1) + dlog.Start(ctx, &wg, source.HealthCheck) + if pprof > -1 { // For debugging purposes only pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof) @@ -43,7 +45,6 @@ func main() { dlog.Client.Info("Started PProf", pprofArgs) } - dlog.Start(ctx, &wg, source.HealthCheck) healthClient, _ := clients.NewHealthClient(args) os.Exit(healthClient.Start(ctx, signal.NoCh(ctx))) } diff --git a/docker/dtail.json b/docker/dtail.json index badd42f..d86da20 100644 --- a/docker/dtail.json +++ b/docker/dtail.json @@ -28,12 +28,13 @@ } }, "Common": { - "LogDir" : "/var/log/dserver", - "CacheDir" : "cache", - "TmpDir" : "tmp", + "LogDir": "/var/log/dserver", + "Logger": "fout", + "CacheDir": "cache", + "TmpDir": "tmp", "LogStrategy": "daily", "SSHPort": 2222, - "LogLevel": "DEVEL", + "LogLevel": "trace", "ExperimentalFeaturesEnable": false } } diff --git a/integrationtests/commons.go b/integrationtests/commons.go index f789322..f96b532 100644 --- a/integrationtests/commons.go +++ b/integrationtests/commons.go @@ -2,6 +2,7 @@ package integrationtests import ( "bufio" + "context" "crypto/sha256" "encoding/base64" "fmt" @@ -9,29 +10,41 @@ import ( "os" "os/exec" "strings" + "syscall" "testing" ) -func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) error { +func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) (int, error) { + return runCommandContext(t, context.TODO(), cmd, args, stdoutFile) +} + +func runCommandContext(t *testing.T, ctx context.Context, cmd string, args []string, stdoutFile string) (int, error) { if _, err := os.Stat(cmd); err != nil { - return fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err) + return -1, fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err) } - t.Log("Executing command:", cmd, strings.Join(args, " ")) - bytes, err := exec.Command(cmd, args...).Output() - if err != nil { - return err - } + t.Log("Running command:", cmd, strings.Join(args, " ")) + bytes, cmdErr := exec.CommandContext(ctx, cmd, args...).Output() t.Log("Writing stdout to file", stdoutFile) fd, err := os.Create(stdoutFile) if err != nil { - return err + return -1, err } + defer fd.Close() fd.Write(bytes) - fd.Close() - return nil + return exitCodeFromError(cmdErr), err +} + +func exitCodeFromError(err error) int { + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + return ws.ExitStatus() + } + } + return 0 } // Checks whether both files have the same lines (order doesn't matter) diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go index 4394552..a164960 100644 --- a/integrationtests/dcat_test.go +++ b/integrationtests/dcat_test.go @@ -9,7 +9,7 @@ func TestDCat(t *testing.T) { testdataFile := "dcat.txt.expected" stdoutFile := "dcat.out" - if err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil { t.Error(err) return } diff --git a/integrationtests/dgrep_test.go b/integrationtests/dgrep_test.go index 32c0ace..6a15ebd 100644 --- a/integrationtests/dgrep_test.go +++ b/integrationtests/dgrep_test.go @@ -10,7 +10,7 @@ func TestDGrep(t *testing.T) { stdoutFile := "dgrep.stdout.tmp" expectedStdoutFile := "dgrep.txt.expected" - if err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", inFile}, stdoutFile); err != nil { t.Error(err) return } @@ -27,7 +27,7 @@ func TestDGrep2(t *testing.T) { stdoutFile := "dgrep2.stdout.tmp" expectedStdoutFile := "dgrep2.txt.expected" - if err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile}, stdoutFile); err != nil { t.Error(err) return } diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index dc508e2..b512985 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -16,7 +16,7 @@ func TestDMap(t *testing.T) { query := fmt.Sprintf("from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile %s", csvFile) - if err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { t.Error(err) return } @@ -44,7 +44,7 @@ func TestDMap2(t *testing.T) { query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile %s", csvFile) - if err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil { t.Error(err) return } diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go index 8d73174..9971f1a 100644 --- a/integrationtests/dtail_test.go +++ b/integrationtests/dtail_test.go @@ -9,7 +9,7 @@ func TestDTailColorTable(t *testing.T) { stdoutFile := "dtailcolortable.stdout.tmp" expectedStdoutFile := "dtailcolortable.expected" - if err := runCommand(t, "../dtail", []string{"-colorTable"}, stdoutFile); err != nil { + if _, err := runCommand(t, "../dtail", []string{"-colorTable"}, stdoutFile); err != nil { t.Error(err) return } diff --git a/integrationtests/dtailhealthcheck.expected b/integrationtests/dtailhealthcheck.expected new file mode 100644 index 0000000..7bf393c --- /dev/null +++ b/integrationtests/dtailhealthcheck.expected @@ -0,0 +1 @@ +WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port diff --git a/integrationtests/dtailhealthcheck2.expected b/integrationtests/dtailhealthcheck2.expected new file mode 100644 index 0000000..3dd84d8 --- /dev/null +++ b/integrationtests/dtailhealthcheck2.expected @@ -0,0 +1 @@ +CRITICAL: DTail server not operating properly at example:1! diff --git a/integrationtests/dtailhealthcheck3.expected b/integrationtests/dtailhealthcheck3.expected new file mode 100644 index 0000000..8e6dd57 --- /dev/null +++ b/integrationtests/dtailhealthcheck3.expected @@ -0,0 +1 @@ +OK: All fine at localhost:4242 :-) diff --git a/integrationtests/dtailhealthcheck_test.go b/integrationtests/dtailhealthcheck_test.go new file mode 100644 index 0000000..97fa5f2 --- /dev/null +++ b/integrationtests/dtailhealthcheck_test.go @@ -0,0 +1,83 @@ +package integrationtests + +import ( + "context" + "fmt" + "os" + "testing" + "time" +) + +func TestDTailHealthCheck(t *testing.T) { + stdoutFile := "dtailhealthcheck.stdout.tmp" + expectedStdoutFile := "dtailhealthcheck.expected" + + t.Log("Serverless check, is supposed to exit with warning state.") + exitCode, err := runCommand(t, "../dtailhealthcheck", []string{}, stdoutFile) + if exitCode != 1 { + t.Error(fmt.Sprintf("Expected exit code '1' but got '%d': %v", exitCode, err)) + return + } + + if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { + t.Error(err) + return + } + + os.Remove(stdoutFile) +} + +func TestDTailHealthCheck2(t *testing.T) { + stdoutFile := "dtailhealthcheck2.stdout.tmp" + expectedStdoutFile := "dtailhealthcheck2.expected" + + t.Log("Negative test, is supposed to exit with a critical state.") + exitCode, err := runCommand(t, "../dtailhealthcheck", []string{"--server", "example:1"}, stdoutFile) + if exitCode != 2 { + t.Error(fmt.Sprintf("Expected exit code '2' but got '%d': %v", exitCode, err)) + return + } + + if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { + t.Error(err) + return + } + + os.Remove(stdoutFile) +} + +func TestDTailHealthCheck3(t *testing.T) { + stdoutFile := "dtailhealthcheck3.stdout.tmp" + serverStdoutFile := "dtailhealthcheck3.dserver.stdout.tmp" + expectedStdoutFile := "dtailhealthcheck3.expected" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + serverArgs := []string{"--logger", "stdout", "--logLevel", "trace", "--port", "4242"} + runCommandContext(t, ctx, "../dserver", serverArgs, serverStdoutFile) + }() + + var err error + for i := 0; i < 30; i++ { + t.Log("Waiting for dserver to start", i) + time.Sleep(time.Second) + var exitCode int + if exitCode, err = runCommand(t, "../dtailhealthcheck", []string{"--server", "localhost:4242"}, stdoutFile); exitCode == 0 { + break + } + } + if err != nil { + t.Error(err) + return + } + + if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { + t.Error(err) + return + } + + os.Remove(serverStdoutFile) + os.Remove(stdoutFile) +} diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 5247699..e4cbeaf 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -93,6 +93,9 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA if args.Logger != "" { i.Common.Logger = args.Logger } + if args.ConnectionsPerCPU == 0 { + args.ConnectionsPerCPU = DefaultConnectionsPerCPU + } // Setup log directory. if strings.Contains(i.Common.LogDir, "~/") { @@ -103,14 +106,6 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir)) } - // Serverless mode. - if args.Discovery == "" && (args.ServersStr == "" || - strings.ToLower(args.ServersStr) == "serverless") { - // We are not connecting to any servers. - args.Serverless = true - i.Common.LogLevel = "WARN" - } - // Source type specific transormations. sourceCb(i, args, additionalArgs) @@ -141,6 +136,14 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA } func transformClient(i *initializer, args *Args, additionalArgs []string) error { + // Serverless mode. + if args.Discovery == "" && (args.ServersStr == "" || + strings.ToLower(args.ServersStr) == "serverless") { + // We are not connecting to any servers. + args.Serverless = true + i.Common.LogLevel = "warn" + } + return nil } @@ -149,9 +152,13 @@ func transformServer(i *initializer, args *Args, additionalArgs []string) error } func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error { - args.TrustAllHosts = true - if !args.Serverless && args.ServersStr == "" { - args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort) + // Serverless mode. + if args.Discovery == "" && (args.ServersStr == "" || + strings.ToLower(args.ServersStr) == "serverless") { + // We are not connecting to any servers. + args.Serverless = true + i.Common.LogLevel = "warn" } + args.TrustAllHosts = true return nil } diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index f3774ba..28e6882 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -41,32 +41,25 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) Common.FatalPanic("Logger already started") } - switch sourceProcess { - case source.Client: - Client = New(source.Client, source.Client) - Server = New(source.Client, source.Server) - Common = Client - case source.Server: - Client = New(source.Server, source.Client) - Server = New(source.Server, source.Server) + Client = new(sourceProcess, source.Client) + Server = new(sourceProcess, source.Server) + Common = Client + if sourceProcess == source.Server { Common = Server - case source.HealthCheck: - Client = New(source.HealthCheck, source.Client) - Server = New(source.HealthCheck, source.Server) - Common = Client } var wg2 sync.WaitGroup wg2.Add(2) - Client.start(ctx, &wg2) - Server.start(ctx, &wg2) - started = true + go Client.start(ctx, &wg2) + go Server.start(ctx, &wg2) go rotation(ctx) go func() { wg2.Wait() wg.Done() }() + + started = true } // DLog is the DTail logger. @@ -83,8 +76,8 @@ type DLog struct { hostname string } -// New creates a new DTail logger. -func New(sourceProcess, sourcePackage source.Source) *DLog { +// new creates a new DTail logger. +func new(sourceProcess, sourcePackage source.Source) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) @@ -103,14 +96,12 @@ func New(sourceProcess, sourcePackage source.Source) *DLog { } func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { - go func() { - defer wg.Done() - var wg2 sync.WaitGroup - wg2.Add(1) - d.logger.Start(ctx, &wg2) - <-ctx.Done() - wg2.Wait() - }() + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() } func (d *DLog) log(level level, args []interface{}) string { @@ -202,6 +193,8 @@ func (d *DLog) Trace(args ...interface{}) string { } func (d *DLog) Devel(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Devel, args) } diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 6e692a3..87280fd 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer { if f.lastFileName == name { return f.writer } - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { panic(err) @@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer { f.writer.Flush() f.fd.Close() } - + // Set new writer. f.fd = newFd f.writer = bufio.NewWriterSize(f.fd, 1) f.lastFileName = name @@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer { } func (f *file) flush() { - defer f.writer.Flush() - + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() for { select { case m := <-f.bufferCh: diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 767aada..1f5d1c3 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -8,9 +8,8 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" "github.com/mimecast/dtail/internal/protocol" @@ -148,7 +147,8 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - pool.RecycleBytesBuffer(line.Content) + // Can not recycle here for some rason. + //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? diff --git a/samples/dtail.json.sample b/samples/dtail.json.sample index 4f9b9ab..100e488 100644 --- a/samples/dtail.json.sample +++ b/samples/dtail.json.sample @@ -68,9 +68,10 @@ "LogDir": "log", "CacheDir": "cache", "TmpDir": "tmp", - "LogStrategy": "stdout", + "Logger": "fout", + "LogStrategy": "daily", "SSHPort": 2222, - "LogLevel": "INFO", + "LogLevel": "info", "ExperimentalFeaturesEnable": false } } |
