summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 16:44:28 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:32 +0300
commit7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (patch)
tree2c1aa056285b3e5d4febefd114a4b95f62071386
parent2d7ddbeae8286373ac19787dc7dde598a7cb0598 (diff)
add dtail health check unit test.
-rw-r--r--Makefile1
-rw-r--r--cmd/dserver/main.go2
-rw-r--r--cmd/dtailhealthcheck/main.go3
-rw-r--r--docker/dtail.json9
-rw-r--r--integrationtests/commons.go33
-rw-r--r--integrationtests/dcat_test.go2
-rw-r--r--integrationtests/dgrep_test.go4
-rw-r--r--integrationtests/dmap_test.go4
-rw-r--r--integrationtests/dtail_test.go2
-rw-r--r--integrationtests/dtailhealthcheck.expected1
-rw-r--r--integrationtests/dtailhealthcheck2.expected1
-rw-r--r--integrationtests/dtailhealthcheck3.expected1
-rw-r--r--integrationtests/dtailhealthcheck_test.go83
-rw-r--r--internal/config/initializer.go29
-rw-r--r--internal/io/dlog/dlog.go43
-rw-r--r--internal/io/dlog/loggers/file.go10
-rw-r--r--internal/mapr/server/aggregate.go6
-rw-r--r--samples/dtail.json.sample5
18 files changed, 172 insertions, 67 deletions
diff --git a/Makefile b/Makefile
index 4b50733..6a0f828 100644
--- a/Makefile
+++ b/Makefile
@@ -45,6 +45,7 @@ lint:
golint $$dir; \
done
test:
+ ${GO} clean -testcache
ifndef USE_ACL
${GO} test -race ./... -v
else
diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go
index 780c6d5..b4fc873 100644
--- a/cmd/dserver/main.go
+++ b/cmd/dserver/main.go
@@ -40,7 +40,7 @@ func main() {
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.LogDir, "logDir", "", "Log dir")
flag.StringVar(&args.Logger, "logger", config.DefaultServerLogger, "Logger name")
- flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
+ flag.StringVar(&args.LogLevel, "logLevel", config.DefaultLogLevel, "Log level")
flag.Parse()
args.NoColor = !color
diff --git a/cmd/dtailhealthcheck/main.go b/cmd/dtailhealthcheck/main.go
index 71c162e..0f37f8a 100644
--- a/cmd/dtailhealthcheck/main.go
+++ b/cmd/dtailhealthcheck/main.go
@@ -36,6 +36,8 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
+ dlog.Start(ctx, &wg, source.HealthCheck)
+
if pprof > -1 {
// For debugging purposes only
pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof)
@@ -43,7 +45,6 @@ func main() {
dlog.Client.Info("Started PProf", pprofArgs)
}
- dlog.Start(ctx, &wg, source.HealthCheck)
healthClient, _ := clients.NewHealthClient(args)
os.Exit(healthClient.Start(ctx, signal.NoCh(ctx)))
}
diff --git a/docker/dtail.json b/docker/dtail.json
index badd42f..d86da20 100644
--- a/docker/dtail.json
+++ b/docker/dtail.json
@@ -28,12 +28,13 @@
}
},
"Common": {
- "LogDir" : "/var/log/dserver",
- "CacheDir" : "cache",
- "TmpDir" : "tmp",
+ "LogDir": "/var/log/dserver",
+ "Logger": "fout",
+ "CacheDir": "cache",
+ "TmpDir": "tmp",
"LogStrategy": "daily",
"SSHPort": 2222,
- "LogLevel": "DEVEL",
+ "LogLevel": "trace",
"ExperimentalFeaturesEnable": false
}
}
diff --git a/integrationtests/commons.go b/integrationtests/commons.go
index f789322..f96b532 100644
--- a/integrationtests/commons.go
+++ b/integrationtests/commons.go
@@ -2,6 +2,7 @@ package integrationtests
import (
"bufio"
+ "context"
"crypto/sha256"
"encoding/base64"
"fmt"
@@ -9,29 +10,41 @@ import (
"os"
"os/exec"
"strings"
+ "syscall"
"testing"
)
-func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) error {
+func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) (int, error) {
+ return runCommandContext(t, context.TODO(), cmd, args, stdoutFile)
+}
+
+func runCommandContext(t *testing.T, ctx context.Context, cmd string, args []string, stdoutFile string) (int, error) {
if _, err := os.Stat(cmd); err != nil {
- return fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err)
+ return -1, fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err)
}
- t.Log("Executing command:", cmd, strings.Join(args, " "))
- bytes, err := exec.Command(cmd, args...).Output()
- if err != nil {
- return err
- }
+ t.Log("Running command:", cmd, strings.Join(args, " "))
+ bytes, cmdErr := exec.CommandContext(ctx, cmd, args...).Output()
t.Log("Writing stdout to file", stdoutFile)
fd, err := os.Create(stdoutFile)
if err != nil {
- return err
+ return -1, err
}
+ defer fd.Close()
fd.Write(bytes)
- fd.Close()
- return nil
+ return exitCodeFromError(cmdErr), err
+}
+
+func exitCodeFromError(err error) int {
+ if err != nil {
+ if exitError, ok := err.(*exec.ExitError); ok {
+ ws := exitError.Sys().(syscall.WaitStatus)
+ return ws.ExitStatus()
+ }
+ }
+ return 0
}
// Checks whether both files have the same lines (order doesn't matter)
diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go
index 4394552..a164960 100644
--- a/integrationtests/dcat_test.go
+++ b/integrationtests/dcat_test.go
@@ -9,7 +9,7 @@ func TestDCat(t *testing.T) {
testdataFile := "dcat.txt.expected"
stdoutFile := "dcat.out"
- if err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil {
t.Error(err)
return
}
diff --git a/integrationtests/dgrep_test.go b/integrationtests/dgrep_test.go
index 32c0ace..6a15ebd 100644
--- a/integrationtests/dgrep_test.go
+++ b/integrationtests/dgrep_test.go
@@ -10,7 +10,7 @@ func TestDGrep(t *testing.T) {
stdoutFile := "dgrep.stdout.tmp"
expectedStdoutFile := "dgrep.txt.expected"
- if err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", inFile}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", inFile}, stdoutFile); err != nil {
t.Error(err)
return
}
@@ -27,7 +27,7 @@ func TestDGrep2(t *testing.T) {
stdoutFile := "dgrep2.stdout.tmp"
expectedStdoutFile := "dgrep2.txt.expected"
- if err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dgrep", []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile}, stdoutFile); err != nil {
t.Error(err)
return
}
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index dc508e2..b512985 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -16,7 +16,7 @@ func TestDMap(t *testing.T) {
query := fmt.Sprintf("from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile %s", csvFile)
- if err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil {
t.Error(err)
return
}
@@ -44,7 +44,7 @@ func TestDMap2(t *testing.T) {
query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile %s", csvFile)
- if err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dmap", []string{"-query", query, inFile}, stdoutFile); err != nil {
t.Error(err)
return
}
diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go
index 8d73174..9971f1a 100644
--- a/integrationtests/dtail_test.go
+++ b/integrationtests/dtail_test.go
@@ -9,7 +9,7 @@ func TestDTailColorTable(t *testing.T) {
stdoutFile := "dtailcolortable.stdout.tmp"
expectedStdoutFile := "dtailcolortable.expected"
- if err := runCommand(t, "../dtail", []string{"-colorTable"}, stdoutFile); err != nil {
+ if _, err := runCommand(t, "../dtail", []string{"-colorTable"}, stdoutFile); err != nil {
t.Error(err)
return
}
diff --git a/integrationtests/dtailhealthcheck.expected b/integrationtests/dtailhealthcheck.expected
new file mode 100644
index 0000000..7bf393c
--- /dev/null
+++ b/integrationtests/dtailhealthcheck.expected
@@ -0,0 +1 @@
+WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port
diff --git a/integrationtests/dtailhealthcheck2.expected b/integrationtests/dtailhealthcheck2.expected
new file mode 100644
index 0000000..3dd84d8
--- /dev/null
+++ b/integrationtests/dtailhealthcheck2.expected
@@ -0,0 +1 @@
+CRITICAL: DTail server not operating properly at example:1!
diff --git a/integrationtests/dtailhealthcheck3.expected b/integrationtests/dtailhealthcheck3.expected
new file mode 100644
index 0000000..8e6dd57
--- /dev/null
+++ b/integrationtests/dtailhealthcheck3.expected
@@ -0,0 +1 @@
+OK: All fine at localhost:4242 :-)
diff --git a/integrationtests/dtailhealthcheck_test.go b/integrationtests/dtailhealthcheck_test.go
new file mode 100644
index 0000000..97fa5f2
--- /dev/null
+++ b/integrationtests/dtailhealthcheck_test.go
@@ -0,0 +1,83 @@
+package integrationtests
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestDTailHealthCheck(t *testing.T) {
+ stdoutFile := "dtailhealthcheck.stdout.tmp"
+ expectedStdoutFile := "dtailhealthcheck.expected"
+
+ t.Log("Serverless check, is supposed to exit with warning state.")
+ exitCode, err := runCommand(t, "../dtailhealthcheck", []string{}, stdoutFile)
+ if exitCode != 1 {
+ t.Error(fmt.Sprintf("Expected exit code '1' but got '%d': %v", exitCode, err))
+ return
+ }
+
+ if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(stdoutFile)
+}
+
+func TestDTailHealthCheck2(t *testing.T) {
+ stdoutFile := "dtailhealthcheck2.stdout.tmp"
+ expectedStdoutFile := "dtailhealthcheck2.expected"
+
+ t.Log("Negative test, is supposed to exit with a critical state.")
+ exitCode, err := runCommand(t, "../dtailhealthcheck", []string{"--server", "example:1"}, stdoutFile)
+ if exitCode != 2 {
+ t.Error(fmt.Sprintf("Expected exit code '2' but got '%d': %v", exitCode, err))
+ return
+ }
+
+ if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(stdoutFile)
+}
+
+func TestDTailHealthCheck3(t *testing.T) {
+ stdoutFile := "dtailhealthcheck3.stdout.tmp"
+ serverStdoutFile := "dtailhealthcheck3.dserver.stdout.tmp"
+ expectedStdoutFile := "dtailhealthcheck3.expected"
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ serverArgs := []string{"--logger", "stdout", "--logLevel", "trace", "--port", "4242"}
+ runCommandContext(t, ctx, "../dserver", serverArgs, serverStdoutFile)
+ }()
+
+ var err error
+ for i := 0; i < 30; i++ {
+ t.Log("Waiting for dserver to start", i)
+ time.Sleep(time.Second)
+ var exitCode int
+ if exitCode, err = runCommand(t, "../dtailhealthcheck", []string{"--server", "localhost:4242"}, stdoutFile); exitCode == 0 {
+ break
+ }
+ }
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(serverStdoutFile)
+ os.Remove(stdoutFile)
+}
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index 5247699..e4cbeaf 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -93,6 +93,9 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
if args.Logger != "" {
i.Common.Logger = args.Logger
}
+ if args.ConnectionsPerCPU == 0 {
+ args.ConnectionsPerCPU = DefaultConnectionsPerCPU
+ }
// Setup log directory.
if strings.Contains(i.Common.LogDir, "~/") {
@@ -103,14 +106,6 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir))
}
- // Serverless mode.
- if args.Discovery == "" && (args.ServersStr == "" ||
- strings.ToLower(args.ServersStr) == "serverless") {
- // We are not connecting to any servers.
- args.Serverless = true
- i.Common.LogLevel = "WARN"
- }
-
// Source type specific transormations.
sourceCb(i, args, additionalArgs)
@@ -141,6 +136,14 @@ func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalA
}
func transformClient(i *initializer, args *Args, additionalArgs []string) error {
+ // Serverless mode.
+ if args.Discovery == "" && (args.ServersStr == "" ||
+ strings.ToLower(args.ServersStr) == "serverless") {
+ // We are not connecting to any servers.
+ args.Serverless = true
+ i.Common.LogLevel = "warn"
+ }
+
return nil
}
@@ -149,9 +152,13 @@ func transformServer(i *initializer, args *Args, additionalArgs []string) error
}
func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error {
- args.TrustAllHosts = true
- if !args.Serverless && args.ServersStr == "" {
- args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort)
+ // Serverless mode.
+ if args.Discovery == "" && (args.ServersStr == "" ||
+ strings.ToLower(args.ServersStr) == "serverless") {
+ // We are not connecting to any servers.
+ args.Serverless = true
+ i.Common.LogLevel = "warn"
}
+ args.TrustAllHosts = true
return nil
}
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index f3774ba..28e6882 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -41,32 +41,25 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source)
Common.FatalPanic("Logger already started")
}
- switch sourceProcess {
- case source.Client:
- Client = New(source.Client, source.Client)
- Server = New(source.Client, source.Server)
- Common = Client
- case source.Server:
- Client = New(source.Server, source.Client)
- Server = New(source.Server, source.Server)
+ Client = new(sourceProcess, source.Client)
+ Server = new(sourceProcess, source.Server)
+ Common = Client
+ if sourceProcess == source.Server {
Common = Server
- case source.HealthCheck:
- Client = New(source.HealthCheck, source.Client)
- Server = New(source.HealthCheck, source.Server)
- Common = Client
}
var wg2 sync.WaitGroup
wg2.Add(2)
- Client.start(ctx, &wg2)
- Server.start(ctx, &wg2)
- started = true
+ go Client.start(ctx, &wg2)
+ go Server.start(ctx, &wg2)
go rotation(ctx)
go func() {
wg2.Wait()
wg.Done()
}()
+
+ started = true
}
// DLog is the DTail logger.
@@ -83,8 +76,8 @@ type DLog struct {
hostname string
}
-// New creates a new DTail logger.
-func New(sourceProcess, sourcePackage source.Source) *DLog {
+// new creates a new DTail logger.
+func new(sourceProcess, sourcePackage source.Source) *DLog {
hostname, err := os.Hostname()
if err != nil {
panic(err)
@@ -103,14 +96,12 @@ func New(sourceProcess, sourcePackage source.Source) *DLog {
}
func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) {
- go func() {
- defer wg.Done()
- var wg2 sync.WaitGroup
- wg2.Add(1)
- d.logger.Start(ctx, &wg2)
- <-ctx.Done()
- wg2.Wait()
- }()
+ defer wg.Done()
+ var wg2 sync.WaitGroup
+ wg2.Add(1)
+ d.logger.Start(ctx, &wg2)
+ <-ctx.Done()
+ wg2.Wait()
}
func (d *DLog) log(level level, args []interface{}) string {
@@ -202,6 +193,8 @@ func (d *DLog) Trace(args ...interface{}) string {
}
func (d *DLog) Devel(args ...interface{}) string {
+ _, file, line, _ := runtime.Caller(1)
+ args = append(args, fmt.Sprintf("at %s:%d", file, line))
return d.log(Devel, args)
}
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
index 6e692a3..87280fd 100644
--- a/internal/io/dlog/loggers/file.go
+++ b/internal/io/dlog/loggers/file.go
@@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer {
if f.lastFileName == name {
return f.writer
}
-
if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
panic(err)
@@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer {
f.writer.Flush()
f.fd.Close()
}
-
+ // Set new writer.
f.fd = newFd
f.writer = bufio.NewWriterSize(f.fd, 1)
f.lastFileName = name
@@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer {
}
func (f *file) flush() {
- defer f.writer.Flush()
-
+ defer func() {
+ if f.writer != nil {
+ f.writer.Flush()
+ }
+ }()
for {
select {
case m := <-f.bufferCh:
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 767aada..1f5d1c3 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -8,9 +8,8 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
"github.com/mimecast/dtail/internal/protocol"
@@ -148,7 +147,8 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
maprLine := strings.TrimSpace(line.Content.String())
fields, err := a.parser.MakeFields(maprLine)
- pool.RecycleBytesBuffer(line.Content)
+ // Can not recycle here for some rason.
+ //pool.RecycleBytesBuffer(line.Content)
if err != nil {
// Should fields be ignored anyway?
diff --git a/samples/dtail.json.sample b/samples/dtail.json.sample
index 4f9b9ab..100e488 100644
--- a/samples/dtail.json.sample
+++ b/samples/dtail.json.sample
@@ -68,9 +68,10 @@
"LogDir": "log",
"CacheDir": "cache",
"TmpDir": "tmp",
- "LogStrategy": "stdout",
+ "Logger": "fout",
+ "LogStrategy": "daily",
"SSHPort": 2222,
- "LogLevel": "INFO",
+ "LogLevel": "info",
"ExperimentalFeaturesEnable": false
}
}