summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 16:44:28 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:32 +0300
commit7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (patch)
tree2c1aa056285b3e5d4febefd114a4b95f62071386 /internal
parent2d7ddbeae8286373ac19787dc7dde598a7cb0598 (diff)
add dtail health check unit test.
Diffstat (limited to 'internal')
-rw-r--r--internal/config/initializer.go29
-rw-r--r--internal/io/dlog/dlog.go43
-rw-r--r--internal/io/dlog/loggers/file.go10
-rw-r--r--internal/mapr/server/aggregate.go6
4 files changed, 45 insertions, 43 deletions
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index 5247699..e4cbeaf 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -93,6 +93,9 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
if args.Logger != "" {
i.Common.Logger = args.Logger
}
+ if args.ConnectionsPerCPU == 0 {
+ args.ConnectionsPerCPU = DefaultConnectionsPerCPU
+ }
// Setup log directory.
if strings.Contains(i.Common.LogDir, "~/") {
@@ -103,14 +106,6 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir))
}
- // Serverless mode.
- if args.Discovery == "" && (args.ServersStr == "" ||
- strings.ToLower(args.ServersStr) == "serverless") {
- // We are not connecting to any servers.
- args.Serverless = true
- i.Common.LogLevel = "WARN"
- }
-
// Source type specific transormations.
sourceCb(i, args, additionalArgs)
@@ -141,6 +136,14 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
}
func transformClient(i *initializer, args *Args, additionalArgs []string) error {
+ // Serverless mode.
+ if args.Discovery == "" && (args.ServersStr == "" ||
+ strings.ToLower(args.ServersStr) == "serverless") {
+ // We are not connecting to any servers.
+ args.Serverless = true
+ i.Common.LogLevel = "warn"
+ }
+
return nil
}
@@ -149,9 +152,13 @@ func transformServer(i *initializer, args *Args, additionalArgs []string) error
}
func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error {
- args.TrustAllHosts = true
- if !args.Serverless && args.ServersStr == "" {
- args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort)
+ // Serverless mode.
+ if args.Discovery == "" && (args.ServersStr == "" ||
+ strings.ToLower(args.ServersStr) == "serverless") {
+ // We are not connecting to any servers.
+ args.Serverless = true
+ i.Common.LogLevel = "warn"
}
+ args.TrustAllHosts = true
return nil
}
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index f3774ba..28e6882 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -41,32 +41,25 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source)
Common.FatalPanic("Logger already started")
}
- switch sourceProcess {
- case source.Client:
- Client = New(source.Client, source.Client)
- Server = New(source.Client, source.Server)
- Common = Client
- case source.Server:
- Client = New(source.Server, source.Client)
- Server = New(source.Server, source.Server)
+ Client = new(sourceProcess, source.Client)
+ Server = new(sourceProcess, source.Server)
+ Common = Client
+ if sourceProcess == source.Server {
Common = Server
- case source.HealthCheck:
- Client = New(source.HealthCheck, source.Client)
- Server = New(source.HealthCheck, source.Server)
- Common = Client
}
var wg2 sync.WaitGroup
wg2.Add(2)
- Client.start(ctx, &wg2)
- Server.start(ctx, &wg2)
- started = true
+ go Client.start(ctx, &wg2)
+ go Server.start(ctx, &wg2)
go rotation(ctx)
go func() {
wg2.Wait()
wg.Done()
}()
+
+ started = true
}
// DLog is the DTail logger.
@@ -83,8 +76,8 @@ type DLog struct {
hostname string
}
-// New creates a new DTail logger.
-func New(sourceProcess, sourcePackage source.Source) *DLog {
+// new creates a new DTail logger.
+func new(sourceProcess, sourcePackage source.Source) *DLog {
hostname, err := os.Hostname()
if err != nil {
panic(err)
@@ -103,14 +96,12 @@ func New(sourceProcess, sourcePackage source.Source) *DLog {
}
func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) {
- go func() {
- defer wg.Done()
- var wg2 sync.WaitGroup
- wg2.Add(1)
- d.logger.Start(ctx, &wg2)
- <-ctx.Done()
- wg2.Wait()
- }()
+ defer wg.Done()
+ var wg2 sync.WaitGroup
+ wg2.Add(1)
+ d.logger.Start(ctx, &wg2)
+ <-ctx.Done()
+ wg2.Wait()
}
func (d *DLog) log(level level, args []interface{}) string {
@@ -202,6 +193,8 @@ func (d *DLog) Trace(args ...interface{}) string {
}
func (d *DLog) Devel(args ...interface{}) string {
+ _, file, line, _ := runtime.Caller(1)
+ args = append(args, fmt.Sprintf("at %s:%d", file, line))
return d.log(Devel, args)
}
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
index 6e692a3..87280fd 100644
--- a/internal/io/dlog/loggers/file.go
+++ b/internal/io/dlog/loggers/file.go
@@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer {
if f.lastFileName == name {
return f.writer
}
-
if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
panic(err)
@@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer {
f.writer.Flush()
f.fd.Close()
}
-
+ // Set new writer.
f.fd = newFd
f.writer = bufio.NewWriterSize(f.fd, 1)
f.lastFileName = name
@@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer {
}
func (f *file) flush() {
- defer f.writer.Flush()
-
+ defer func() {
+ if f.writer != nil {
+ f.writer.Flush()
+ }
+ }()
for {
select {
case m := <-f.bufferCh:
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 767aada..1f5d1c3 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -8,9 +8,8 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
"github.com/mimecast/dtail/internal/protocol"
@@ -148,7 +147,8 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
maprLine := strings.TrimSpace(line.Content.String())
fields, err := a.parser.MakeFields(maprLine)
- pool.RecycleBytesBuffer(line.Content)
+ // Can not recycle here for some rason.
+ //pool.RecycleBytesBuffer(line.Content)
if err != nil {
// Should fields be ignored anyway?