diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-09 16:44:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 13:36:32 +0300 |
| commit | 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (patch) | |
| tree | 2c1aa056285b3e5d4febefd114a4b95f62071386 /internal | |
| parent | 2d7ddbeae8286373ac19787dc7dde598a7cb0598 (diff) | |
add dtail health check unit test.
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/config/initializer.go | 29 | ||||
| -rw-r--r-- | internal/io/dlog/dlog.go | 43 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/file.go | 10 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 6 |
4 files changed, 45 insertions, 43 deletions
diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 5247699..e4cbeaf 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -93,6 +93,9 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA if args.Logger != "" { i.Common.Logger = args.Logger } + if args.ConnectionsPerCPU == 0 { + args.ConnectionsPerCPU = DefaultConnectionsPerCPU + } // Setup log directory. if strings.Contains(i.Common.LogDir, "~/") { @@ -103,14 +106,6 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir)) } - // Serverless mode. - if args.Discovery == "" && (args.ServersStr == "" || - strings.ToLower(args.ServersStr) == "serverless") { - // We are not connecting to any servers. - args.Serverless = true - i.Common.LogLevel = "WARN" - } - // Source type specific transormations. sourceCb(i, args, additionalArgs) @@ -141,6 +136,14 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA } func transformClient(i *initializer, args *Args, additionalArgs []string) error { + // Serverless mode. + if args.Discovery == "" && (args.ServersStr == "" || + strings.ToLower(args.ServersStr) == "serverless") { + // We are not connecting to any servers. + args.Serverless = true + i.Common.LogLevel = "warn" + } + return nil } @@ -149,9 +152,13 @@ func transformServer(i *initializer, args *Args, additionalArgs []string) error } func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error { - args.TrustAllHosts = true - if !args.Serverless && args.ServersStr == "" { - args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort) + // Serverless mode. + if args.Discovery == "" && (args.ServersStr == "" || + strings.ToLower(args.ServersStr) == "serverless") { + // We are not connecting to any servers. + args.Serverless = true + i.Common.LogLevel = "warn" } + args.TrustAllHosts = true return nil } diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index f3774ba..28e6882 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -41,32 +41,25 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) Common.FatalPanic("Logger already started") } - switch sourceProcess { - case source.Client: - Client = New(source.Client, source.Client) - Server = New(source.Client, source.Server) - Common = Client - case source.Server: - Client = New(source.Server, source.Client) - Server = New(source.Server, source.Server) + Client = new(sourceProcess, source.Client) + Server = new(sourceProcess, source.Server) + Common = Client + if sourceProcess == source.Server { Common = Server - case source.HealthCheck: - Client = New(source.HealthCheck, source.Client) - Server = New(source.HealthCheck, source.Server) - Common = Client } var wg2 sync.WaitGroup wg2.Add(2) - Client.start(ctx, &wg2) - Server.start(ctx, &wg2) - started = true + go Client.start(ctx, &wg2) + go Server.start(ctx, &wg2) go rotation(ctx) go func() { wg2.Wait() wg.Done() }() + + started = true } // DLog is the DTail logger. @@ -83,8 +76,8 @@ type DLog struct { hostname string } -// New creates a new DTail logger. -func New(sourceProcess, sourcePackage source.Source) *DLog { +// new creates a new DTail logger. +func new(sourceProcess, sourcePackage source.Source) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) @@ -103,14 +96,12 @@ func New(sourceProcess, sourcePackage source.Source) *DLog { } func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { - go func() { - defer wg.Done() - var wg2 sync.WaitGroup - wg2.Add(1) - d.logger.Start(ctx, &wg2) - <-ctx.Done() - wg2.Wait() - }() + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() } func (d *DLog) log(level level, args []interface{}) string { @@ -202,6 +193,8 @@ func (d *DLog) Trace(args ...interface{}) string { } func (d *DLog) Devel(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Devel, args) } diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 6e692a3..87280fd 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer { if f.lastFileName == name { return f.writer } - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { panic(err) @@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer { f.writer.Flush() f.fd.Close() } - + // Set new writer. f.fd = newFd f.writer = bufio.NewWriterSize(f.fd, 1) f.lastFileName = name @@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer { } func (f *file) flush() { - defer f.writer.Flush() - + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() for { select { case m := <-f.bufferCh: diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 767aada..1f5d1c3 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -8,9 +8,8 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" "github.com/mimecast/dtail/internal/protocol" @@ -148,7 +147,8 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - pool.RecycleBytesBuffer(line.Content) + // Can not recycle here for some rason. + //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? |
