summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-19 13:22:59 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commitfe3e68afd99d8ea246be52893730f987e138ec24 (patch)
tree726e0914730912e0a3b223f7b37facc05ba31140
parentabeac87aec44249bf67f1b0eca471a31086265ca (diff)
move args to config package
logger package rewrite as dlog
-rw-r--r--.gitignore5
-rw-r--r--Makefile12
-rw-r--r--TODO.md13
-rw-r--r--cmd/dcat/main.go35
-rw-r--r--cmd/dgrep/main.go35
-rw-r--r--cmd/dmap/main.go39
-rw-r--r--cmd/dserver/main.go37
-rw-r--r--cmd/dtail/main.go47
-rw-r--r--docker/.gitignore5
-rw-r--r--docker/Makefile5
-rw-r--r--docker/dtail.json2
-rw-r--r--internal/clients/args.go63
-rw-r--r--internal/clients/baseclient.go28
-rw-r--r--internal/clients/catclient.go12
-rw-r--r--internal/clients/connectors/serverconnection.go34
-rw-r--r--internal/clients/connectors/serverless.go21
-rw-r--r--internal/clients/grepclient.go13
-rw-r--r--internal/clients/handlers/basehandler.go6
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/maprhandler.go8
-rw-r--r--internal/clients/maprclient.go40
-rw-r--r--internal/clients/stats.go8
-rw-r--r--internal/clients/tailclient.go16
-rw-r--r--internal/color/table.go8
-rw-r--r--internal/config/args.go105
-rw-r--r--internal/config/common.go8
-rw-r--r--internal/config/config.go3
-rw-r--r--internal/config/read.go40
-rw-r--r--internal/config/server.go1
-rw-r--r--internal/config/setup.go35
-rw-r--r--internal/discovery/comma.go4
-rw-r--r--internal/discovery/discovery.go18
-rw-r--r--internal/discovery/file.go8
-rw-r--r--internal/io/dlog/dlog.go206
-rw-r--r--internal/io/dlog/level.go89
-rw-r--r--internal/io/dlog/loggers/factory.go60
-rw-r--r--internal/io/dlog/loggers/file.go156
-rw-r--r--internal/io/dlog/loggers/fout.go46
-rw-r--r--internal/io/dlog/loggers/logger.go18
-rw-r--r--internal/io/dlog/loggers/none.go21
-rw-r--r--internal/io/dlog/loggers/stdout.go73
-rw-r--r--internal/io/dlog/rotation.go27
-rw-r--r--internal/io/dlog/source.go19
-rw-r--r--internal/io/dlog/strategy.go22
-rw-r--r--internal/io/fs/permissions/permission.go4
-rw-r--r--internal/io/fs/readfile.go16
-rw-r--r--internal/io/logger/logger.go23
-rw-r--r--internal/io/logger/modes.go9
-rw-r--r--internal/io/prompt/prompt.go6
-rw-r--r--internal/mapr/aggregateset.go6
-rw-r--r--internal/mapr/client/aggregate.go4
-rw-r--r--internal/mapr/funcs/function.go4
-rw-r--r--internal/mapr/groupset.go4
-rw-r--r--internal/mapr/logformat/default.go1
-rw-r--r--internal/mapr/logformat/generickv.go2
-rw-r--r--internal/mapr/logformat/parser.go3
-rw-r--r--internal/mapr/query.go4
-rw-r--r--internal/mapr/server/aggregate.go22
-rw-r--r--internal/mapr/token.go10
-rw-r--r--internal/mapr/whereclause.go6
-rw-r--r--internal/mapr/wherecondition.go6
-rw-r--r--internal/regex/regex.go11
-rw-r--r--internal/regex/regex_test.go16
-rw-r--r--internal/server/continuous.go21
-rw-r--r--internal/server/handlers/controlhandler.go12
-rw-r--r--internal/server/handlers/readcommand.go26
-rw-r--r--internal/server/handlers/serverhandler.go46
-rw-r--r--internal/server/scheduler.go22
-rw-r--r--internal/server/server.go52
-rw-r--r--internal/server/stats.go6
-rw-r--r--internal/ssh/client/authmethods.go28
-rw-r--r--internal/ssh/client/knownhostscallback.go14
-rw-r--r--internal/ssh/server/hostkey.go17
-rw-r--r--internal/ssh/server/publickeycallback.go12
-rw-r--r--internal/ssh/ssh.go6
-rw-r--r--internal/user/server/user.go22
-rw-r--r--samples/dtail.json.sample2
77 files changed, 1350 insertions, 548 deletions
diff --git a/.gitignore b/.gitignore
index c9e8333..1f158b6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,9 +1,9 @@
*_proprietary.go
-<<<<<<< HEAD
+*.csv
+*.tmp
cache/
log/
tags
-=======
/cache/
/log/
/dtail
@@ -11,4 +11,3 @@ tags
/dcat
/dmap
/dserver
->>>>>>> 7ee0121afed3e7cab6457142f70e411020ab2b21
diff --git a/Makefile b/Makefile
index 2109e56..b544388 100644
--- a/Makefile
+++ b/Makefile
@@ -3,18 +3,18 @@ all: test build
build: dserver dcat dgrep dmap dtail
dserver:
ifndef USE_ACL
- ${GO} build -o dserver ./cmd/dserver/main.go
+ ${GO} build ${GO_FLAGS} -o dserver ./cmd/dserver/main.go
else
- ${GO} build -tags linuxacl -o dserver ./cmd/dserver/main.go
+ ${GO} build ${GO_FLAGS} -tags linuxacl -o dserver ./cmd/dserver/main.go
endif
dcat:
- ${GO} build -o dcat ./cmd/dcat/main.go
+ ${GO} build ${GO_FLAGS} -o dcat ./cmd/dcat/main.go
dgrep:
- ${GO} build -o dgrep ./cmd/dgrep/main.go
+ ${GO} build ${GO_FLAGS} -o dgrep ./cmd/dgrep/main.go
dmap:
- ${GO} build -o dmap ./cmd/dmap/main.go
+ ${GO} build ${GO_FLAGS} -o dmap ./cmd/dmap/main.go
dtail:
- ${GO} build -o dtail ./cmd/dtail/main.go
+ ${GO} build ${GO_FLAGS} -o dtail ./cmd/dtail/main.go
install:
ifndef USE_ACL
${GO} install ./cmd/dserver/main.go
diff --git a/TODO.md b/TODO.md
index 156c745..49fc9b9 100644
--- a/TODO.md
+++ b/TODO.md
@@ -15,20 +15,17 @@ This is a loose list of what to do. Maybe for the next releae or maybe for a lat
[?] Client 4.x should print a warning when trying to connect to a 3.x server.
[ ] Update docs for color configuration
[ ] Update animated gifs
-[x] Fix dmap so that it always reads to the end of file
[ ] Add more default fields to the default MAPREDUCE format.
[x] By default connect to localhost
[x] Can use additional args as file lists
[ ] Document the two things above
[x] Implement spartan mode
-[x] Make sure that diff is the same (plain file fs dcatted-file) in spartan mode
-[ ] Document servless mode
+[ ] Document serverless mode
[x] Implement serverless mode
-[ ] Fix serverless mode (e.g. dmap doesn't aggregate all lines)
[ ] document spartan mode
[ ] Default client log dir is ~/log not ./log
- [ ] Make sure dmap results aren't in color in local log file
-[ ] Unit test for dcat in serverless mode
-[ ] Unit test for dgrep in serverless mode
-[ ] Unit test for dmap in serverless mode
+[ ] Integration test for dcat in serverless mode
+[ ] Integration test for dgrep in serverless mode
+[ ] Integration test for dmap in serverless mode
[ ] Separate logger into server logger and client logger for serverless operation (e.g. server info logs are all Debug)
+[ ] In serverless, use prefix LOCAL and not REMOTE. And also use another color schema (magenta?)
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index 63b4b61..d5dfba4 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"os"
+ "sync"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
@@ -15,33 +16,28 @@ import (
// The evil begins here.
func main() {
- var args clients.Args
- var cfgFile string
- var debugEnable bool
+ var args config.Args
var displayVersion bool
- var sshPort int
userName := user.Name()
+ flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode")
- flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys")
- flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages")
+ flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
- flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently")
- flag.IntVar(&sshPort, "port", 2222, "SSH server port")
+ flag.IntVar(&args.SSHPort, "port", 2222, "SSH server port")
+ flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
- flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.Parse()
- args.Transform(flag.Args())
- config.Read(cfgFile, sshPort, args.NoColor)
- args.TransformAfterConfigFile()
+ config.Setup(&args, flag.Args())
if displayVersion {
version.PrintAndExit()
@@ -50,11 +46,10 @@ func main() {
version.Print()
}
- ctx := context.TODO()
- logger.Start(ctx, logger.Modes{
- Debug: debugEnable || config.Common.DebugEnable,
- Quiet: args.Quiet,
- })
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dlog.Start(ctx, &wg, dlog.CLIENT, config.Common.LogLevel)
client, err := clients.NewCatClient(args)
if err != nil {
@@ -62,6 +57,8 @@ func main() {
}
status := client.Start(ctx, signal.InterruptCh(ctx))
- logger.Flush()
+ cancel()
+
+ wg.Wait()
os.Exit(status)
}
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index 7b96472..c6bece0 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"os"
+ "sync"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
@@ -15,37 +16,32 @@ import (
// The evil begins here.
func main() {
- var args clients.Args
- var cfgFile string
- var debugEnable bool
+ var args config.Args
var displayVersion bool
var grep string
- var sshPort int
userName := user.Name()
+ flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex")
flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode")
- flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys")
- flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages")
+ flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
- flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently")
- flag.IntVar(&sshPort, "port", 2222, "SSH server port")
+ flag.IntVar(&args.SSHPort, "port", 2222, "SSH server port")
+ flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
- flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.Parse()
- args.Transform(flag.Args())
- config.Read(cfgFile, sshPort, args.NoColor)
- args.TransformAfterConfigFile()
+ config.Setup(&args, flag.Args())
if displayVersion {
version.PrintAndExit()
@@ -54,11 +50,10 @@ func main() {
version.Print()
}
- ctx := context.TODO()
- logger.Start(ctx, logger.Modes{
- Debug: debugEnable || config.Common.DebugEnable,
- Quiet: args.Quiet,
- })
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dlog.Start(ctx, &wg, dlog.CLIENT, args.LogLevel)
if grep != "" {
args.RegexStr = grep
@@ -70,6 +65,8 @@ func main() {
}
status := client.Start(ctx, signal.InterruptCh(ctx))
- logger.Flush()
+ cancel()
+
+ wg.Wait()
os.Exit(status)
}
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index 4525503..061d859 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"os"
+ "sync"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/user"
@@ -16,39 +17,36 @@ import (
// The evil begins here.
func main() {
- var cfgFile string
- var debugEnable bool
var displayVersion bool
var queryStr string
- var sshPort int
- args := clients.Args{
+ args := config.Args{
Mode: omode.MapClient,
}
userName := user.Name()
+ flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode")
- flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys")
- flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages")
+ flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
flag.BoolVar(&displayVersion, "version", false, "Display version")
- flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
+ // TODO: Make ConnectionsPerCPU default value as a constant in config package.
flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently")
+ // TODO: make default ssh port a constant in the config package.
+ flag.IntVar(&args.SSHPort, "port", 2222, "SSH server port")
flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection")
- flag.IntVar(&sshPort, "port", 2222, "SSH server port")
+ flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
- flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.StringVar(&queryStr, "query", "", "Map reduce query")
flag.Parse()
- args.Transform(flag.Args())
- config.Read(cfgFile, sshPort, args.NoColor)
- args.TransformAfterConfigFile()
+ config.Setup(&args, flag.Args())
if displayVersion {
version.PrintAndExit()
@@ -57,18 +55,19 @@ func main() {
version.Print()
}
- ctx := context.TODO()
- logger.Start(ctx, logger.Modes{
- Debug: debugEnable || config.Common.DebugEnable,
- Quiet: args.Quiet,
- })
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dlog.Start(ctx, &wg, dlog.CLIENT, config.Common.LogLevel)
client, err := clients.NewMaprClient(args, queryStr, clients.DefaultMode)
if err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
status := client.Start(ctx, signal.InterruptCh(ctx))
- logger.Flush()
+ cancel()
+
+ wg.Wait()
os.Exit(status)
}
diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go
index bc3cb91..5788a87 100644
--- a/cmd/dserver/main.go
+++ b/cmd/dserver/main.go
@@ -9,11 +9,12 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
+ "sync"
"syscall"
"time"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/server"
"github.com/mimecast/dtail/internal/user"
"github.com/mimecast/dtail/internal/version"
@@ -21,31 +22,31 @@ import (
// The evil begins here.
func main() {
- var cfgFile string
+ var args config.Args
var color bool
- var debugEnable bool
var displayVersion bool
var logDir string
var pprof int
var shutdownAfter int
- var sshPort int
user.NoRootCheck()
flag.BoolVar(&color, "color", false, "Enable ANSII terminal colors")
flag.BoolVar(&config.ServerRelaxedAuthEnable, "relaxedAuth", false, "Enable relaxced SSH auth mode (don't use in production!)")
- flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages")
flag.BoolVar(&displayVersion, "version", false, "Display version")
+ flag.IntVar(&args.SSHPort, "port", 2222, "SSH server port")
flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port")
- flag.IntVar(&shutdownAfter, "shutdownAfter", 0, "Automatically shutdown after so many seconds")
- flag.IntVar(&sshPort, "port", 2222, "SSH server port")
- flag.StringVar(&cfgFile, "cfg", "", "Config file path")
+ flag.IntVar(&shutdownAfter, "shutdownAfter", 0, "Shutdown after so many seconds")
+ flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&logDir, "logDir", "", "Log dir path")
flag.Parse()
- config.Read(cfgFile, sshPort, !color)
+ args.NoColor = !color
+ config.Setup(&args, flag.Args())
if logDir != "" {
+ // TODO: Re-Implement log strategy support.
config.Common.LogDir = logDir
if config.Common.LogStrategy == "" {
config.Common.LogStrategy = "daily"
@@ -73,25 +74,25 @@ func main() {
}
}()
- if debugEnable {
- config.Common.DebugEnable = true
- }
-
- logger.Start(ctx, logger.Modes{Server: true, Debug: debugEnable || config.Common.DebugEnable})
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dlog.Start(ctx, &wg, dlog.SERVER, config.Common.LogLevel)
if config.ServerRelaxedAuthEnable {
- logger.Fatal("SSH relaxed-auth mode enabled")
+ dlog.Server.Fatal("SSH relaxed-auth mode enabled")
}
if pprof > -1 {
- // For debugging purposes only
+ // Start of pprof server for development purposes only.
pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof)
- logger.Info("Starting PProf", pprofArgs)
+ dlog.Server.Info("Starting PProf", pprofArgs)
go http.ListenAndServe(pprofArgs, nil)
}
serv := server.New()
status := serv.Start(ctx)
- logger.Flush()
+ cancel()
+
+ wg.Wait()
os.Exit(status)
}
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 5c2d393..dcf1fab 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -8,12 +8,13 @@ import (
_ "net/http"
_ "net/http/pprof"
"os"
+ "sync"
"time"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/user"
@@ -22,41 +23,41 @@ import (
// The evil begins here.
func main() {
- var args clients.Args
- var cfgFile string
+ var args config.Args
var checkHealth bool
- var debugEnable bool
var displayColorTable bool
+ var displayWideColorTable bool
var displayVersion bool
var grep string
var pprof int
var queryStr string
var shutdownAfter int
- var sshPort int
userName := user.Name()
+ flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&args.Quiet, "quiet", false, "Quiet output mode")
flag.BoolVar(&args.RegexInvert, "invert", false, "Invert regex")
flag.BoolVar(&args.Spartan, "spartan", false, "Spartan output mode")
- flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys")
+ flag.BoolVar(&args.TrustAllHosts, "trustAllHosts", false, "Trust all unknown host keys")
+ // TODO: Check whether the health check still works.
flag.BoolVar(&checkHealth, "checkHealth", false, "Only check for server health")
- flag.BoolVar(&debugEnable, "debug", false, "Activate debug messages")
flag.BoolVar(&displayColorTable, "colorTable", false, "Show color table")
+ flag.BoolVar(&displayWideColorTable, "wideColorTable", false, "Show a large color table")
flag.BoolVar(&displayVersion, "version", false, "Display version")
- flag.BoolVar(&args.NoColor, "noColor", false, "Disable ANSII terminal colors")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently")
+ flag.IntVar(&args.SSHPort, "port", 2222, "SSH server port")
flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection")
flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port")
- flag.IntVar(&shutdownAfter, "shutdownAfter", 3600*24, "Automatically shutdown after so many seconds")
- flag.IntVar(&sshPort, "port", 2222, "SSH server port")
+ flag.IntVar(&shutdownAfter, "shutdownAfter", 3600*24, "Shutdown after so many seconds")
+ flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
- flag.StringVar(&cfgFile, "cfg", "", "Config file path")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.StringVar(&queryStr, "query", "", "Map reduce query")
@@ -64,22 +65,22 @@ func main() {
if grep != "" {
args.RegexStr = grep
}
- args.Transform(flag.Args())
- config.Read(cfgFile, sshPort, args.NoColor)
- args.TransformAfterConfigFile()
+ config.Setup(&args, flag.Args())
if displayVersion {
version.PrintAndExit()
}
if !args.Spartan {
version.Print()
+ if displayWideColorTable {
+ color.TablePrintAndExit(true)
+ }
if displayColorTable {
- color.TablePrintAndExit(debugEnable)
+ color.TablePrintAndExit(false)
}
}
ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
if shutdownAfter > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second)
@@ -88,19 +89,19 @@ func main() {
if checkHealth {
healthClient, _ := clients.NewHealthClient(omode.HealthClient)
+ cancel()
os.Exit(healthClient.Start(ctx))
}
- logger.Start(ctx, logger.Modes{
- Debug: debugEnable || config.Common.DebugEnable,
- Quiet: args.Quiet,
- })
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dlog.Start(ctx, &wg, dlog.CLIENT, config.Common.LogLevel)
if pprof > -1 {
// For debugging purposes only
pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof)
go http.ListenAndServe(pprofArgs, nil)
- logger.Info("Started PProf", pprofArgs)
+ dlog.Client.Info("Started PProf", pprofArgs)
}
var client clients.Client
@@ -119,6 +120,8 @@ func main() {
}
status := client.Start(ctx, signal.InterruptCh(ctx))
- logger.Flush()
+ cancel()
+
+ wg.Wait()
os.Exit(status)
}
diff --git a/docker/.gitignore b/docker/.gitignore
index ef60a31..e69de29 100644
--- a/docker/.gitignore
+++ b/docker/.gitignore
@@ -1,5 +0,0 @@
-dserver
-mapr_testdata.log
-log
-*.csv
-*.out
diff --git a/docker/Makefile b/docker/Makefile
index 71fd249..c89467c 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -18,7 +18,10 @@ dgrep:
../dgrep --servers serverlist.txt --files '/var/log/dserver/*' --regex MAPREDUCE --trustAllHosts
dcat:
../dcat --servers serverlist.txt --files '/etc/passwd' --trustAllHosts
+dcat_notrust:
+ ../dcat --servers serverlist.txt --files '/etc/passwd'
dcat2:
+ # TODO: All serverless tests in this Makefile have to move to actual unit tests
../dcat /etc/passwd
dmap:
../dmap --servers serverlist.txt --files '/var/log/dserver/*' --trustAllHosts --query 'from stats select avg(goroutines),max(goroutines),min(goroutines),last(goroutines),count($$hostname),$$hostname group by $$hostname order by avg(goroutines)'
@@ -30,7 +33,7 @@ dmap2:
diff -u <(sort dmap2-A.csv) <(sort dmap2-B.csv)
diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv)
dmap3:
- ../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
+ ../dmap --servers <(head -n 1 serverlist.txt) --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log
@echo Expecting zero diff!
diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv)
diff --git a/docker/dtail.json b/docker/dtail.json
index d923101..badd42f 100644
--- a/docker/dtail.json
+++ b/docker/dtail.json
@@ -33,7 +33,7 @@
"TmpDir" : "tmp",
"LogStrategy": "daily",
"SSHPort": 2222,
- "DebugEnable": true,
+ "LogLevel": "DEVEL",
"ExperimentalFeaturesEnable": false
}
}
diff --git a/internal/clients/args.go b/internal/clients/args.go
deleted file mode 100644
index dc71e83..0000000
--- a/internal/clients/args.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package clients
-
-import (
- "flag"
- "fmt"
- "strings"
-
- "github.com/mimecast/dtail/internal/omode"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// Args is a helper struct to summarize common client arguments.
-type Args struct {
- Mode omode.Mode
- ServersStr string
- UserName string
- What string
- Arguments []string
- RegexStr string
- RegexInvert bool
- TrustAllHosts bool
- Discovery string
- ConnectionsPerCPU int
- Timeout int
- SSHAuthMethods []gossh.AuthMethod
- SSHHostKeyCallback gossh.HostKeyCallback
- PrivateKeyPathFile string
- Quiet bool
- Spartan bool
- NoColor bool
- Serverless bool
-}
-
-// Transform the arguments based on certain conditions.
-func (a *Args) Transform(args []string) {
- // Interpret additional args as file list.
- if a.What == "" {
- var files []string
- for _, file := range flag.Args() {
- files = append(files, file)
- }
- a.What = strings.Join(files, ",")
- }
-
- if a.Spartan {
- a.Quiet = true
- a.NoColor = true
- }
-
- if a.Discovery == "" && a.ServersStr == "" {
- a.Serverless = true
- }
-}
-
-// TransformAfterConfigFile same as Transform, but after the config file has been read.
-func (a *Args) TransformAfterConfigFile() {
- // TODO: Remove this method. It's not used.
-}
-
-func (a *Args) SerializeOptions() string {
- return fmt.Sprintf("quiet=%v:spartan=%v", a.Quiet, a.Spartan)
-}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 5523052..fc01955 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -6,8 +6,9 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/connectors"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/discovery"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
"github.com/mimecast/dtail/internal/ssh/client"
@@ -17,7 +18,7 @@ import (
// This is the main client data structure.
type baseClient struct {
- Args
+ config.Args
// To display client side stats
stats *stats
// List of remote servers to connect to.
@@ -39,7 +40,8 @@ type baseClient struct {
}
func (c *baseClient) init() {
- logger.Debug("Initiating base client")
+ dlog.Client.Debug("Initiating base client")
+ dlog.Client.Debug(c.Args.String())
flag := regex.Default
if c.Args.RegexInvert {
@@ -47,11 +49,14 @@ func (c *baseClient) init() {
}
regex, err := regex.New(c.Args.RegexStr, flag)
if err != nil {
- logger.FatalExit(c.Regex, "invalid regex!", err, regex)
+ dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex)
}
c.Regex = regex
- logger.Debug("Regex", c.Regex)
+ dlog.Client.Debug("Regex", c.Regex)
+ if c.Args.Serverless {
+ return
+ }
c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile)
}
@@ -67,8 +72,11 @@ func (c *baseClient) makeConnections(maker maker) {
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
- // Periodically check for unknown hosts, and ask the user whether to trust them or not.
- go c.hostKeyCallback.PromptAddHosts(ctx)
+ // Can be nil when serverless.
+ if c.hostKeyCallback != nil {
+ // Periodically check for unknown hosts, and ask the user whether to trust them or not.
+ go c.hostKeyCallback.PromptAddHosts(ctx)
+ }
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
@@ -112,7 +120,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
}
time.Sleep(time.Second * 2)
- logger.Debug(conn.Server(), "Reconnecting")
+ dlog.Client.Debug(conn.Server(), "Reconnecting")
conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback)
c.connections[i] = conn
}
@@ -127,7 +135,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe
}
func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer logger.Debug("Terminated connection")
+ defer dlog.Client.Debug("Terminated connection")
// We want to have at least one active connection
<-active
@@ -143,7 +151,7 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
if numActive == 0 {
return
}
- logger.Debug("Active connections", numActive)
+ dlog.Client.Debug("Active connections", numActive)
time.Sleep(time.Second)
}
}
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index d14cdcc..2726e7e 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -7,6 +7,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +18,7 @@ type CatClient struct {
}
// NewCatClient returns a new cat client.
-func NewCatClient(args Args) (*CatClient, error) {
+func NewCatClient(args config.Args) (*CatClient, error) {
if args.RegexStr != "" {
return nil, errors.New("Can't use regex with 'cat' operating mode")
}
@@ -42,11 +44,13 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
}
func (c CatClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file, c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
return
}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index 0904ba1..5bc63ee 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -10,7 +10,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/ssh/client"
"golang.org/x/crypto/ssh"
@@ -30,7 +30,7 @@ type ServerConnection struct {
// NewServerConnection returns a new DTail SSH server connection.
func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection {
- logger.Debug(server, "Creating new connection", server, handler, commands)
+ dlog.Client.Debug(server, "Creating new connection", server, handler, commands)
c := ServerConnection{
hostKeyCallback: hostKeyCallback,
@@ -81,10 +81,10 @@ func (c *ServerConnection) initServerPort() {
parts := strings.Split(c.server, ":")
if len(parts) == 2 {
- logger.Debug("Parsing port from hostname", parts)
+ dlog.Client.Debug("Parsing port from hostname", parts)
port, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse client port", c.server, parts, err)
+ dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err)
}
c.server = parts[0]
c.port = port
@@ -93,21 +93,21 @@ func (c *ServerConnection) initServerPort() {
func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
// Throttle how many connections can be established concurrently (based on ch length)
- logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
select {
case throttleCh <- struct{}{}:
case <-ctx.Done():
- logger.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
return
}
- logger.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
go func() {
defer func() {
if !c.throttlingDone {
- logger.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
@@ -115,9 +115,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
}()
if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil {
- logger.Warn(c.server, c.port, err)
+ dlog.Client.Warn(c.server, c.port, err)
if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.server, c.port)) {
- logger.Debug(c.server, "Not trusting host")
+ dlog.Client.Debug(c.server, "Not trusting host")
}
}
}()
@@ -127,14 +127,14 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
// Dail into a new SSH connection. Close connection in case of an error.
func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error {
- logger.Debug(c.server, "Incrementing connection stats")
+ dlog.Client.Debug(c.server, "Incrementing connection stats")
statsCh <- struct{}{}
defer func() {
- logger.Debug(c.server, "Decrementing connection stats")
+ dlog.Client.Debug(c.server, "Decrementing connection stats")
<-statsCh
}()
- logger.Debug(c.server, "Dialing into the connection")
+ dlog.Client.Debug(c.server, "Dialing into the connection")
address := fmt.Sprintf("%s:%d", c.server, c.port)
client, err := ssh.Dial("tcp", address, c.config)
@@ -148,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
// Create the SSH session. Close the session in case of an error.
func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error {
- logger.Debug(c.server, "Creating SSH session")
+ dlog.Client.Debug(c.server, "Creating SSH session")
session, err := client.NewSession()
if err != nil {
@@ -160,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun
}
func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error {
- logger.Debug(c.server, "Creating handler for SSH session")
+ dlog.Client.Debug(c.server, "Creating handler for SSH session")
stdinPipe, err := session.StdinPipe()
if err != nil {
@@ -196,12 +196,12 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc
// Send all commands to client.
for _, command := range c.commands {
- logger.Debug(command)
+ dlog.Client.Debug(command)
c.handler.SendMessage(command)
}
if !c.throttlingDone {
- logger.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 0500645..c7b5f62 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -6,7 +6,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
serverHandlers "github.com/mimecast/dtail/internal/server/handlers"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -26,7 +26,7 @@ func NewServerless(userName string, handler handlers.Handler, commands []string)
commands: commands,
}
- logger.Debug("Creating new serverless connector", handler, commands)
+ dlog.Client.Debug("Creating new serverless connector", handler, commands)
return &s
}
@@ -43,7 +43,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
defer cancel()
if err := s.handle(ctx, cancel); err != nil {
- logger.Warn(err)
+ dlog.Client.Warn(err)
}
}()
@@ -51,7 +51,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
}
func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error {
- logger.Debug("Creating server handler for a serverless session")
+ dlog.Client.Debug("Creating server handler for a serverless session")
serverHandler := serverHandlers.NewServerHandler(
user.New(s.userName, s.Server()),
@@ -59,14 +59,19 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
make(chan struct{}, config.Server.MaxConcurrentTails),
)
+ terminate := func() {
+ serverHandler.Shutdown()
+ cancel()
+ }
+
go func() {
io.Copy(serverHandler, s.handler)
- cancel()
+ terminate()
}()
go func() {
io.Copy(s.handler, serverHandler)
- cancel()
+ terminate()
}()
go func() {
@@ -74,12 +79,12 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
case <-s.handler.Done():
case <-ctx.Done():
}
- cancel()
+ terminate()
}()
// Send all commands to client.
for _, command := range s.commands {
- logger.Debug(command)
+ dlog.Client.Debug(command)
s.handler.SendMessage(command)
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index ea5022b..ae21ff2 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -7,6 +7,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +18,7 @@ type GrepClient struct {
}
// NewGrepClient creates a new grep client.
-func NewGrepClient(args Args) (*GrepClient, error) {
+func NewGrepClient(args config.Args) (*GrepClient, error) {
if args.RegexStr == "" {
return nil, errors.New("No regex specified, use '-regex' flag")
}
@@ -41,12 +43,13 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
}
func (c GrepClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
return
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index af1ad62..3291b43 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -51,7 +51,7 @@ func (h *baseHandler) Shutdown() {
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
- logger.Debug("Sending command", h.server, command, encoded)
+ dlog.Client.Debug("Sending command", h.server, command, encoded)
select {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
@@ -112,7 +112,7 @@ func (h *baseHandler) handleMessageType(message string) {
return
}
- logger.Raw(message)
+ dlog.Client.Raw(message)
}
// Handle messages received from server which are not meant to be displayed
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 2bcb038..27ac85e 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -2,7 +2,7 @@ package handlers
import (
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ClientHandler is the basic client handler interface.
@@ -12,7 +12,7 @@ type ClientHandler struct {
// NewClientHandler creates a new client handler.
func NewClientHandler(server string) *ClientHandler {
- logger.Debug(server, "Creating new client handler")
+ dlog.Client.Debug(server, "Creating new client handler")
return &ClientHandler{
baseHandler{
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 65b1454..848e7f0 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -4,7 +4,7 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
"github.com/mimecast/dtail/internal/protocol"
@@ -40,7 +40,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
continue
case protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
- logger.Debug(message)
+ dlog.Client.Debug(message)
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
@@ -60,10 +60,10 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
func (h *MaprHandler) handleAggregateMessage(message string) {
parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
if len(parts) != 3 {
- logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
return
}
if err := h.aggregate.Aggregate(parts[2]); err != nil {
- logger.Error("Unable to aggregate data", h.server, message, err)
+ dlog.Client.Error("Unable to aggregate data", h.server, message, err)
}
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index e6ab96b..f23aa08 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -11,7 +11,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
)
@@ -44,14 +44,14 @@ type MaprClient struct {
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
+func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
if queryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
query, err := mapr.NewQuery(queryStr)
if err != nil {
- logger.FatalExit(queryStr, "Can't parse mapr query", err)
+ dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err)
}
// Don't retry connection if in tail mode and no outfile specified.
@@ -68,7 +68,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
cumulative = args.Mode == omode.MapClient || query.HasOutfile()
}
- logger.Debug("Cumulative mapreduce mode?", cumulative)
+ dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)
c := MaprClient{
baseClient: baseClient{
@@ -103,7 +103,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Debug("Received final mapreduce result")
+ dlog.Client.Debug("Received final mapreduce result")
c.reportResults()
}
@@ -123,15 +123,17 @@ func (c MaprClient) makeCommands() (commands []string) {
}
for _, file := range strings.Split(c.What, ",") {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
+ modeStr, file, regex))
continue
}
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- modeStr,
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ modeStr, c.Args.SerializeOptions(), file, regex))
}
return
@@ -141,7 +143,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
- logger.Debug("Gathering interim mapreduce result")
+ dlog.Client.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -177,17 +179,17 @@ func (c *MaprClient) printResults() {
}
if err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
if result == c.lastResult {
- logger.Debug("Result hasn't changed compared to last time...")
+ dlog.Client.Debug("Result hasn't changed compared to last time...")
return
}
c.lastResult = result
if numRows == 0 {
- logger.Debug("Empty result set this time...")
+ dlog.Client.Debug("Empty result set this time...")
return
}
@@ -198,24 +200,24 @@ func (c *MaprClient) printResults() {
config.Client.TermColors.MaprTable.RawQueryBg,
config.Client.TermColors.MaprTable.RawQueryAttr)
}
- logger.Raw(rawQuery)
+ dlog.Client.Raw(rawQuery)
if rowsLimit > 0 && numRows > rowsLimit {
- logger.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!",
+ dlog.Client.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!",
numRows, rowsLimit))
}
- logger.Raw(result)
+ dlog.Client.Raw(result)
}
func (c *MaprClient) writeResultsToOutfile() {
if c.cumulative {
if err := c.globalGroup.WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
return
}
if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 6da443c..fbef572 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -10,7 +10,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -67,7 +67,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
s.printStatsDueInterrupt(messages)
default:
data := s.statsData(connected, newConnections, throttle)
- logger.Mapreduce("STATS", data)
+ dlog.Client.Mapreduce("STATS", data)
}
connectedLast = connected
@@ -78,7 +78,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
}
func (s *stats) printStatsDueInterrupt(messages []string) {
- logger.Pause()
+ dlog.Client.Pause()
for i, message := range messages {
if i > 0 && config.Client.TermColorsEnable {
fmt.Println(color.PaintStrWithAttr(message,
@@ -91,7 +91,7 @@ func (s *stats) printStatsDueInterrupt(messages []string) {
fmt.Println(fmt.Sprintf(" %s", message))
}
time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
- logger.Resume()
+ dlog.Client.Resume()
}
func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} {
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 360354b..d42a0e4 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -6,7 +6,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +17,7 @@ type TailClient struct {
}
// NewTailClient returns a new TailClient.
-func NewTailClient(args Args) (*TailClient, error) {
+func NewTailClient(args config.Args) (*TailClient, error) {
args.Mode = omode.TailClient
c := TailClient{
@@ -38,14 +39,15 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
}
func (c TailClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
- logger.Debug(commands)
+ dlog.Client.Debug(commands)
return
}
diff --git a/internal/color/table.go b/internal/color/table.go
index 7ecfbca..2115edf 100644
--- a/internal/color/table.go
+++ b/internal/color/table.go
@@ -7,17 +7,17 @@ import (
const sampleParagraph string = "Mimecast is Making Email Safer for Business. We believe that securely operating a business in the cloud requires new levels of IT preparedness, centered around cyber resilience. This is why we unify the delivery and management of security, continuity and data protection for email via one, simple-to-use cloud platform. Thousands of organizations trust us to increase their cyber resilience preparedness, streamline compliance, reduce IT complexity and keep their business running. We give employees fast and secure access to sensitive business information, and ensure email keeps flowing in the event of an outage. Mimecast will remain committed to protecting your IT assets through constant innovation and focus on your success."
-func TablePrintAndExit(useSampleParagraph bool) {
+func TablePrintAndExit(displaySampleParagraph bool) {
for _, attr := range AttributeNames {
if attr == "Hidden" || attr == "SlowBlink" {
continue
}
- printColorTable(attr, useSampleParagraph)
+ printColorTable(attr, displaySampleParagraph)
}
os.Exit(0)
}
-func printColorTable(attr string, useSampleParagraph bool) {
+func printColorTable(attr string, displaySampleParagraph bool) {
for _, fg := range ColorNames {
fgColor, _ := ToFgColor(fg)
for _, bg := range ColorNames {
@@ -29,7 +29,7 @@ func printColorTable(attr string, useSampleParagraph bool) {
text := fmt.Sprintf(" Foreground:%10s | Background:%10s | Attribute:%10s ", fg, bg, attr)
fmt.Print(PaintStrWithAttr(text, fgColor, bgColor, attribute))
- if useSampleParagraph {
+ if displaySampleParagraph {
fmt.Print("\n")
fmt.Print(PaintStrWithAttr(sampleParagraph, fgColor, bgColor, attribute))
fmt.Print("\n")
diff --git a/internal/config/args.go b/internal/config/args.go
new file mode 100644
index 0000000..89e4bc9
--- /dev/null
+++ b/internal/config/args.go
@@ -0,0 +1,105 @@
+package config
+
+import (
+ "flag"
+ "fmt"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/omode"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// Args is a helper struct to summarize common client arguments.
+type Args struct {
+ Arguments []string
+ ConfigFile string
+ ConnectionsPerCPU int
+ Discovery string
+ LogLevel string
+ Mode omode.Mode
+ NoColor bool
+ PrivateKeyPathFile string
+ Quiet bool
+ RegexInvert bool
+ RegexStr string
+ Serverless bool
+ ServersStr string
+ Spartan bool
+ SSHAuthMethods []gossh.AuthMethod
+ SSHHostKeyCallback gossh.HostKeyCallback
+ SSHPort int
+ Timeout int
+ TrustAllHosts bool
+ UserName string
+ What string
+}
+
+func (a *Args) String() string {
+ var sb strings.Builder
+
+ sb.WriteString("Args(")
+ sb.WriteString(fmt.Sprintf("%s:%s,", "LogLevel", a.LogLevel))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Arguments", a.Arguments))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "ConfigFile", a.ConfigFile))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "ConnectionsPerCPU", a.ConnectionsPerCPU))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Discovery", a.Discovery))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Mode", a.Mode))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "NoColor", a.NoColor))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "PrivateKeyPathFile", a.PrivateKeyPathFile))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Quiet", a.Quiet))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "RegexInvert", a.RegexInvert))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "RegexStr", a.RegexStr))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Serverless", a.Serverless))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "ServersStr", a.ServersStr))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Spartan", a.Spartan))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "SSHAuthMethods", a.SSHAuthMethods))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "SSHHostKeyCallback", a.SSHHostKeyCallback))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "SSHPort", a.SSHPort))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "Timeout", a.Timeout))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "TrustAllHosts", a.TrustAllHosts))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "UserName", a.UserName))
+ sb.WriteString(fmt.Sprintf("%s:%v", "What", a.What))
+ sb.WriteString(")")
+
+ return sb.String()
+}
+
+// Based on the argument list, transform/manipulate some of the arguments.
+func (a *Args) transform(args []string) {
+ if a.LogLevel != "" {
+ Common.LogLevel = a.LogLevel
+ }
+
+ if a.SSHPort != 2222 {
+ Common.SSHPort = a.SSHPort
+ }
+ if a.NoColor {
+ Client.TermColorsEnable = false
+ }
+
+ if a.Spartan {
+ a.Quiet = true
+ a.NoColor = true
+ }
+
+ if a.Discovery == "" && a.ServersStr == "" {
+ a.Serverless = true
+ }
+
+ // Interpret additional args as file list.
+ if a.What == "" {
+ var files []string
+ for _, file := range flag.Args() {
+ files = append(files, file)
+ }
+ a.What = strings.Join(files, ",")
+ }
+}
+
+// SerializeOptions returns a string ready to be sent over the wire to the server.
+func (a *Args) SerializeOptions() string {
+ return fmt.Sprintf("quiet=%v:spartan=%v", a.Quiet, a.Spartan)
+}
+
+// TODO: Put the DeseializeOptions function here (move it away from the internal/server package)
diff --git a/internal/config/common.go b/internal/config/common.go
index c3e203e..7d45261 100644
--- a/internal/config/common.go
+++ b/internal/config/common.go
@@ -6,10 +6,8 @@ type CommonConfig struct {
SSHPort int
// Enable experimental features (mainly for dev purposes)
ExperimentalFeaturesEnable bool `json:",omitempty"`
- // Enable debug logging. Don't enable in production.
- DebugEnable bool `json:",omitempty"`
- // Enable trace logging. Don't enable in production.
- TraceEnable bool `json:",omitempty"`
+ // LogLevel defines how much is logged. TODO: Adjust JSONschema
+ LogLevel string `json:",omitempty"`
// The log strategy to use, one of
// stdout: only log to stdout (useful when used with systemd)
// daily: create a log file for every day
@@ -26,8 +24,6 @@ type CommonConfig struct {
func newDefaultCommonConfig() *CommonConfig {
return &CommonConfig{
SSHPort: 2222,
- DebugEnable: false,
- TraceEnable: false,
ExperimentalFeaturesEnable: false,
LogDir: "log",
CacheDir: "cache",
diff --git a/internal/config/config.go b/internal/config/config.go
index 276ddcf..2d77041 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -15,6 +15,9 @@ const ScheduleUser string = "DTAIL-SCHEDULE"
// ContinuousUser is used for non-interactive continuous mapreduce queries.
const ContinuousUser string = "DTAIL-CONTINUOUS"
+// TestUser is used for unit tests and potentially also for integration tests.
+const TestUser string = "DTAIL-TEST"
+
// InterruptTimeoutS is used to terminate DTail when Ctrl+C was pressed twice within a given interval.
const InterruptTimeoutS int = 3
diff --git a/internal/config/read.go b/internal/config/read.go
deleted file mode 100644
index ea358f8..0000000
--- a/internal/config/read.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package config
-
-import (
- "os"
-)
-
-// Read the DTail configuration.
-func Read(configFile string, sshPort int, noColor bool) {
- initializer := configInitializer{
- Common: newDefaultCommonConfig(),
- Server: newDefaultServerConfig(),
- Client: newDefaultClientConfig(),
- }
-
- if configFile == "" {
- configFile = "./cfg/dtail.json"
- }
-
- if _, err := os.Stat(configFile); !os.IsNotExist(err) {
- initializer.parseConfig(configFile)
- }
-
- // Assign pointers to global variables, so that we can access the
- // configuration from any place of the program.
- Common = initializer.Common
- Server = initializer.Server
- Client = initializer.Client
-
- if Server.MapreduceLogFormat == "" {
- Server.MapreduceLogFormat = "default"
- }
-
- // If non-standard port specified, overwrite config
- if sshPort != 2222 {
- Common.SSHPort = sshPort
- }
- if noColor {
- Client.TermColorsEnable = false
- }
-}
diff --git a/internal/config/server.go b/internal/config/server.go
index dc0d587..8bbb394 100644
--- a/internal/config/server.go
+++ b/internal/config/server.go
@@ -76,6 +76,7 @@ func newDefaultServerConfig() *ServerConfig {
MaxConcurrentTails: 50,
HostKeyFile: "./cache/ssh_host_key",
HostKeyBits: 4096,
+ MapreduceLogFormat: "default",
Permissions: Permissions{
Default: defaultPermissions,
},
diff --git a/internal/config/setup.go b/internal/config/setup.go
new file mode 100644
index 0000000..3c4bcc4
--- /dev/null
+++ b/internal/config/setup.go
@@ -0,0 +1,35 @@
+package config
+
+import (
+ "os"
+)
+
+const NoConfigFile string = "Don't read a config file - use defaults only"
+
+// Setup the DTail configuration.
+func Setup(args *Args, additionalArgs []string) {
+ initializer := configInitializer{
+ Common: newDefaultCommonConfig(),
+ Server: newDefaultServerConfig(),
+ Client: newDefaultClientConfig(),
+ }
+
+ if args.ConfigFile == "" {
+ // TODO: Search more paths for config file (e.g. in /etc and in ~/.config/...
+ args.ConfigFile = "./cfg/dtail.json"
+ }
+
+ if args.ConfigFile != NoConfigFile {
+ if _, err := os.Stat(args.ConfigFile); !os.IsNotExist(err) {
+ initializer.parseConfig(args.ConfigFile)
+ }
+ }
+
+ // Assign pointers to global variables, so that we can access the
+ // configuration from any place of the program.
+ Common = initializer.Common
+ Server = initializer.Server
+ Client = initializer.Client
+
+ args.transform(additionalArgs)
+}
diff --git a/internal/discovery/comma.go b/internal/discovery/comma.go
index 4344240..9bea89c 100644
--- a/internal/discovery/comma.go
+++ b/internal/discovery/comma.go
@@ -3,11 +3,11 @@ package discovery
import (
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ServerListFromCOMMA retrieves a list of servers from comma separated input list.
func (d *Discovery) ServerListFromCOMMA() []string {
- logger.Debug("Retrieving server list from comma separated list", d.server)
+ dlog.Common.Debug("Retrieving server list from comma separated list", d.server)
return strings.Split(d.server, ",")
}
diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go
index 3608ce7..83ee95e 100644
--- a/internal/discovery/discovery.go
+++ b/internal/discovery/discovery.go
@@ -9,7 +9,7 @@ import (
"strings"
"time"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ServerOrder to specify how to sort the server list.
@@ -42,7 +42,7 @@ func New(method, server string, order ServerOrder) *Discovery {
if strings.Contains(module, ":") {
s := strings.Split(module, ":")
if len(s) != 2 {
- logger.FatalExit("Unable to parse discovery module", module)
+ dlog.Common.FatalPanic("Unable to parse discovery module", module)
}
module = s[0]
options = s[1]
@@ -72,11 +72,11 @@ func (d *Discovery) initRegex() {
}
regexStr := string(runes)
- logger.Debug("Using filter regex", regexStr)
+ dlog.Common.Debug("Using filter regex", regexStr)
regex, err := regexp.Compile(regexStr)
if err != nil {
- logger.FatalExit("Could not compile regex", regexStr, err)
+ dlog.Common.FatalPanic("Could not compile regex", regexStr, err)
}
d.regex = regex
@@ -97,7 +97,7 @@ func (d *Discovery) ServerList() []string {
servers = d.shuffleList(servers)
}
- logger.Debug("Discovered servers", len(servers), servers)
+ dlog.Common.Debug("Discovered servers", len(servers), servers)
return servers
}
@@ -124,7 +124,7 @@ func (d *Discovery) serverListFromReflectedModule() []string {
rt := reflect.TypeOf(d)
reflectedMethod, ok := rt.MethodByName(methodName)
if !ok {
- logger.FatalExit("No such server discovery module", d.module, methodName)
+ dlog.Common.FatalPanic("No such server discovery module", d.module, methodName)
}
inputValues := make([]reflect.Value, 1)
@@ -138,7 +138,7 @@ func (d *Discovery) serverListFromReflectedModule() []string {
// Filter server list based on a regexp.
func (d *Discovery) filterList(servers []string) (filtered []string) {
- logger.Debug("Filtering server list")
+ dlog.Common.Debug("Filtering server list")
for _, server := range servers {
if d.regex.MatchString(server) {
@@ -160,13 +160,13 @@ func (d *Discovery) dedupList(servers []string) (deduped []string) {
}
}
- logger.Debug("Deduped server list", len(servers), len(deduped))
+ dlog.Common.Debug("Deduped server list", len(servers), len(deduped))
return
}
// Randomly shuffle the server list.
func (d *Discovery) shuffleList(servers []string) []string {
- logger.Debug("Shuffling server list")
+ dlog.Common.Debug("Shuffling server list")
r := rand.New(rand.NewSource(time.Now().Unix()))
shuffled := make([]string, len(servers))
diff --git a/internal/discovery/file.go b/internal/discovery/file.go
index 1250755..fb46eeb 100644
--- a/internal/discovery/file.go
+++ b/internal/discovery/file.go
@@ -4,16 +4,16 @@ import (
"bufio"
"os"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ServerListFromFILE retrieves a list of servers from a file.
func (d *Discovery) ServerListFromFILE() (servers []string) {
- logger.Debug("Retrieving server list from file", d.server)
+ dlog.Common.Debug("Retrieving server list from file", d.server)
file, err := os.Open(d.server)
if err != nil {
- logger.FatalExit(d.server, err)
+ dlog.Common.FatalPanic(d.server, err)
}
defer file.Close()
@@ -22,7 +22,7 @@ func (d *Discovery) ServerListFromFILE() (servers []string) {
servers = append(servers, scanner.Text())
}
if err := scanner.Err(); err != nil {
- logger.FatalExit(d.server, err)
+ dlog.Common.FatalPanic(d.server, err)
}
return
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
new file mode 100644
index 0000000..7282741
--- /dev/null
+++ b/internal/io/dlog/dlog.go
@@ -0,0 +1,206 @@
+package dlog
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color/brush"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// Client is the log handler for the client packages.
+var Client *DLog
+
+// Server is the log handler for the server packages.
+var Server *DLog
+
+// Common is the log handler for all other packages.
+// TODO: Rename Common to Common
+var Common *DLog
+
+var mutex sync.Mutex
+var started bool
+
+// Start logger(s).
+func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLevel string) {
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ if started {
+ Common.FatalPanic("Logger already started")
+ }
+
+ level := newLevel(logLevel)
+ switch sourceProcess {
+ case CLIENT:
+ // This is a DTail client process running.
+ impl := loggers.FOUT
+ Client = New(CLIENT, CLIENT, impl, level)
+ Server = New(CLIENT, SERVER, impl, level)
+ Common = Client
+ case SERVER:
+ // This is a DTail server process running.
+ impl := loggers.FILE
+ Client = New(SERVER, CLIENT, impl, level)
+ Server = New(SERVER, SERVER, impl, level)
+ Common = Server
+ }
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ Client.start(ctx, &wg2)
+ Server.start(ctx, &wg2)
+ started = true
+
+ go rotation(ctx)
+ go func() {
+ wg2.Wait()
+ wg.Done()
+ }()
+}
+
+// DLog is the DTail logger.
+type DLog struct {
+ logger loggers.Logger
+ // Is this a DTail server or client process logging?
+ sourceProcess source
+ // Is this a DTail server or client package logging? In serverless mode
+ // the client can also execute code from the server package.
+ sourcePackage source
+ // Max log level to log.
+ maxLevel level
+}
+
+// New creates a new DTail logger.
+func New(sourceProcess, sourcePackage source, impl loggers.Impl, maxLevel level) *DLog {
+ return &DLog{
+ logger: loggers.Factory(sourceProcess.String(), impl),
+ sourceProcess: sourceProcess,
+ sourcePackage: sourcePackage,
+ maxLevel: maxLevel,
+ }
+}
+
+func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) {
+ go func() {
+ defer wg.Done()
+ var wg2 sync.WaitGroup
+ wg2.Add(1)
+ d.logger.Start(ctx, &wg2)
+ <-ctx.Done()
+ wg2.Wait()
+ }()
+}
+
+func (d *DLog) log(level level, args []interface{}) string {
+ if d.maxLevel < level {
+ return ""
+ }
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+ now := time.Now()
+
+ sb.WriteString(d.sourcePackage.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(now.Format("20060102-150405"))
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(level.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ d.writeArgStrings(sb, args)
+
+ message := sb.String()
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(now, message)
+ return message
+ }
+
+ d.logger.LogWithColors(now, message, brush.Colorfy(message))
+ return message
+}
+
+func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) {
+ for i, arg := range args {
+ if i > 0 {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ switch v := arg.(type) {
+ case string:
+ sb.WriteString(v)
+ case error:
+ sb.WriteString(v.Error())
+ default:
+ sb.WriteString(fmt.Sprintf("%v", v))
+ }
+ }
+}
+
+func (d *DLog) FatalPanic(args ...interface{}) {
+ d.log(FATAL, args)
+ d.logger.Flush()
+ panic("Not recovering from this fatal error...")
+}
+
+func (d *DLog) Fatal(args ...interface{}) string {
+ return d.log(FATAL, args)
+}
+
+func (d *DLog) Error(args ...interface{}) string {
+ return d.log(ERROR, args)
+}
+
+func (d *DLog) Warn(args ...interface{}) string {
+ return d.log(WARN, args)
+}
+
+func (d *DLog) Info(args ...interface{}) string {
+ return d.log(INFO, args)
+}
+
+func (d *DLog) Verbose(args ...interface{}) string {
+ return d.log(VERBOSE, args)
+}
+
+func (d *DLog) Debug(args ...interface{}) string {
+ return d.log(DEBUG, args)
+}
+
+func (d *DLog) Trace(args ...interface{}) string {
+ return d.log(TRACE, args)
+}
+
+func (d *DLog) Devel(args ...interface{}) string {
+ return d.log(DEVEL, args)
+}
+
+func (d *DLog) Raw(message string) string {
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(time.Now(), message)
+ return message
+ }
+
+ d.logger.Log(time.Now(), brush.Colorfy(message))
+ return message
+}
+
+func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
+ args := make([]interface{}, len(data)+1)
+ args[0] = fmt.Sprintf("%s:%s", "MAPREDUCE", strings.ToUpper(table))
+
+ i := 1
+ for k, v := range data {
+ args[i] = fmt.Sprintf("%s=%v", k, v)
+ i++
+ }
+
+ return d.log(INFO, args)
+}
+
+func (d *DLog) Flush() { d.logger.Flush() }
+func (d *DLog) Pause() { d.logger.Pause() }
+func (d *DLog) Resume() { d.logger.Resume() }
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
new file mode 100644
index 0000000..84550f0
--- /dev/null
+++ b/internal/io/dlog/level.go
@@ -0,0 +1,89 @@
+package dlog
+
+import (
+ "fmt"
+ "strings"
+)
+
+type level int
+
+const (
+ FATAL level = iota
+ ERROR level = iota
+ WARN level = iota
+ INFO level = iota
+ DEFAULT level = iota
+ VERBOSE level = iota
+ DEBUG level = iota
+ DEVEL level = iota
+ TRACE level = iota
+ ALL level = iota
+)
+
+var allLevels = []level{
+ FATAL,
+ ERROR,
+ WARN,
+ INFO,
+ DEFAULT,
+ VERBOSE,
+ DEBUG,
+ DEVEL,
+ TRACE,
+ ALL,
+}
+
+func newLevel(l string) level {
+ switch strings.ToUpper(l) {
+ case "FATAL":
+ return FATAL
+ case "ERROR":
+ return ERROR
+ case "WARN":
+ return WARN
+ case "INFO":
+ return INFO
+ case "":
+ fallthrough
+ case "DEFAULT":
+ return DEFAULT
+ case "VERBOSE":
+ return VERBOSE
+ case "DEBUG":
+ return DEBUG
+ case "DEVEL":
+ return DEVEL
+ case "TRACE":
+ return TRACE
+ case "ALL":
+ return ALL
+ }
+ panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels))
+}
+
+func (l level) String() string {
+ switch l {
+ case FATAL:
+ return "FATAL"
+ case ERROR:
+ return "ERROR"
+ case WARN:
+ return "WARN"
+ case INFO:
+ return "INFO"
+ case DEFAULT:
+ return "DEFAULT"
+ case VERBOSE:
+ return "VERBOSE"
+ case DEBUG:
+ return "DEBUG"
+ case DEVEL:
+ return "DEVEL"
+ case TRACE:
+ return "TRACE"
+ case ALL:
+ return "ALL"
+ }
+
+ panic("Unknown log level " + fmt.Sprintf("%d", l))
+}
diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go
new file mode 100644
index 0000000..3eb29c5
--- /dev/null
+++ b/internal/io/dlog/loggers/factory.go
@@ -0,0 +1,60 @@
+package loggers
+
+import (
+ "fmt"
+ "sync"
+)
+
+type Impl int
+
+const (
+ NONE Impl = iota
+ STDOUT Impl = iota
+ FILE Impl = iota
+ FOUT Impl = iota
+)
+
+var factoryMap map[string]Logger
+var factoryMutex sync.Mutex
+
+func Factory(name string, impl Impl) Logger {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+
+ id := fmt.Sprintf("name:%s,impl:%v", name, impl)
+
+ if factoryMap == nil {
+ factoryMap = make(map[string]Logger)
+ }
+
+ singleton, ok := factoryMap[id]
+ if !ok {
+ switch impl {
+ case NONE:
+ singleton = none{}
+ case STDOUT:
+ singleton = newStdout()
+ factoryMap[id] = singleton
+ case FILE:
+ singleton = newFile()
+ factoryMap[id] = singleton
+ case FOUT:
+ singleton = newFout()
+ factoryMap[id] = singleton
+ }
+ }
+
+ return singleton
+}
+
+func FactoryRotate() {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+ if factoryMap == nil {
+ return
+ }
+
+ for _, impl := range factoryMap {
+ impl.Rotate()
+ }
+}
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
new file mode 100644
index 0000000..1c525c9
--- /dev/null
+++ b/internal/io/dlog/loggers/file.go
@@ -0,0 +1,156 @@
+package loggers
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+)
+
+type fileMessageBuf struct {
+ now time.Time
+ message string
+}
+
+type file struct {
+ bufferCh chan *fileMessageBuf
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+ rotateCh chan struct{}
+ flushCh chan struct{}
+ lastDateStr string
+ fd *os.File
+ writer *bufio.Writer
+ mutex sync.Mutex
+ started bool
+}
+
+func newFile() *file {
+ f := file{
+ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100),
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ rotateCh: make(chan struct{}),
+ flushCh: make(chan struct{}),
+ }
+ f.getWriter(time.Now().Format("20060102"))
+ return &f
+}
+
+func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ // Logger already started from another Goroutine.
+ if s.started {
+ wg.Done()
+ return
+ }
+
+ pause := func(ctx context.Context) {
+ select {
+ case <-s.resumeCh:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ go func() {
+ defer wg.Done()
+
+ for {
+ select {
+ case m := <-s.bufferCh:
+ s.write(m)
+ case <-s.pauseCh:
+ pause(ctx)
+ case <-s.flushCh:
+ s.flush()
+ case <-ctx.Done():
+ s.flush()
+ s.fd.Close()
+ return
+ }
+ }
+ }()
+
+ s.started = true
+}
+
+func (s *file) Log(now time.Time, message string) {
+ s.bufferCh <- &fileMessageBuf{now, message}
+}
+
+func (s *file) LogWithColors(now time.Time, message, coloredMessage string) {
+ panic("Colors not supported in file logger")
+}
+
+func (s *file) Pause() { s.pauseCh <- struct{}{} }
+func (s *file) Resume() { s.resumeCh <- struct{}{} }
+func (s *file) Flush() { s.flushCh <- struct{}{} }
+
+// TODO: Test that Rotate() actually works.
+func (s *file) Rotate() { s.rotateCh <- struct{}{} }
+func (file) SupportsColors() bool { return false }
+
+func (s *file) write(m *fileMessageBuf) {
+ select {
+ case <-s.rotateCh:
+ // Force re-opening the outfile.
+ s.lastDateStr = ""
+ default:
+ }
+
+ writer := s.getWriter(m.now.Format("20060102"))
+ writer.WriteString(m.message)
+ writer.WriteByte('\n')
+}
+
+func (s *file) getWriter(dateStr string) *bufio.Writer {
+ if s.lastDateStr == dateStr {
+ return s.writer
+ }
+
+ if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
+ if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
+ panic(err)
+ }
+ }
+
+ logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr)
+ newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ panic(err)
+ }
+
+ // Close old writer.
+ if s.fd != nil {
+ s.writer.Flush()
+ s.fd.Close()
+ }
+
+ s.fd = newFd
+ s.writer = bufio.NewWriterSize(s.fd, 1)
+ s.lastDateStr = dateStr
+
+ return s.writer
+}
+
+func (s *file) flush() {
+ defer s.writer.Flush()
+
+ for {
+ select {
+ case m := <-s.bufferCh:
+ s.write(m)
+ default:
+ return
+ }
+ }
+}
diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go
new file mode 100644
index 0000000..603dbe9
--- /dev/null
+++ b/internal/io/dlog/loggers/fout.go
@@ -0,0 +1,46 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+type fout struct {
+ file *file
+ stdout *stdout
+}
+
+// Logs to both, a file and stdout
+func newFout() *fout {
+ return &fout{file: newFile(), stdout: newStdout()}
+}
+
+func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ go func() {
+ defer wg.Done()
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ f.file.Start(ctx, &wg2)
+ f.stdout.Start(ctx, &wg2)
+ wg2.Wait()
+ }()
+}
+
+func (f *fout) Log(now time.Time, message string) {
+ f.stdout.Log(now, message)
+ f.file.Log(now, message)
+}
+
+func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) {
+ f.stdout.LogWithColors(now, "", coloredMessage)
+ f.file.Log(now, message)
+}
+
+func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() }
+func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() }
+func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() }
+func (f *fout) Rotate() { f.file.Rotate() }
+
+func (fout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go
new file mode 100644
index 0000000..c88900d
--- /dev/null
+++ b/internal/io/dlog/loggers/logger.go
@@ -0,0 +1,18 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+type Logger interface {
+ Log(now time.Time, message string)
+ LogWithColors(now time.Time, message, messageWithColors string)
+ Start(ctx context.Context, wg *sync.WaitGroup)
+ Flush()
+ Pause()
+ Resume()
+ Rotate()
+ SupportsColors() bool
+}
diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go
new file mode 100644
index 0000000..270027f
--- /dev/null
+++ b/internal/io/dlog/loggers/none.go
@@ -0,0 +1,21 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+// don't log anything
+type none struct{}
+
+func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }
+func (none) Log(now time.Time, message string) {}
+
+func (none) LogWithColors(now time.Time, message, coloredMessage string) {}
+
+func (none) Flush() {}
+func (none) Pause() {}
+func (none) Resume() {}
+func (none) Rotate() {}
+func (none) SupportsColors() bool { return false }
diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go
new file mode 100644
index 0000000..9738323
--- /dev/null
+++ b/internal/io/dlog/loggers/stdout.go
@@ -0,0 +1,73 @@
+package loggers
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+)
+
+type stdout struct {
+ bufferCh chan string
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+}
+
+func newStdout() *stdout {
+ return &stdout{
+ bufferCh: make(chan string, 100),
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ }
+}
+
+func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ pause := func(ctx context.Context) {
+ select {
+ case <-s.resumeCh:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ go func() {
+ defer wg.Done()
+
+ for {
+ select {
+ case message := <-s.bufferCh:
+ fmt.Println(message)
+ case <-s.pauseCh:
+ pause(ctx)
+ case <-ctx.Done():
+ s.Flush()
+ return
+ }
+ }
+ }()
+}
+
+func (s *stdout) Log(now time.Time, message string) {
+ s.bufferCh <- message
+}
+
+func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) {
+ s.bufferCh <- coloredMessage
+}
+
+func (s *stdout) Flush() {
+ for {
+ select {
+ case message := <-s.bufferCh:
+ fmt.Println(message)
+ default:
+ return
+ }
+ }
+}
+
+func (s *stdout) Pause() { s.pauseCh <- struct{}{} }
+func (s *stdout) Resume() { s.resumeCh <- struct{}{} }
+func (s *stdout) Rotate() {}
+func (stdout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go
new file mode 100644
index 0000000..15ce1fd
--- /dev/null
+++ b/internal/io/dlog/rotation.go
@@ -0,0 +1,27 @@
+package dlog
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+)
+
+func rotation(ctx context.Context) {
+ rotateCh := make(chan os.Signal, 1)
+ signal.Notify(rotateCh, syscall.SIGHUP)
+ go func() {
+ for {
+ select {
+ case <-rotateCh:
+ Common.Debug("Invoking log rotation")
+ loggers.FactoryRotate()
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
diff --git a/internal/io/dlog/source.go b/internal/io/dlog/source.go
new file mode 100644
index 0000000..265885e
--- /dev/null
+++ b/internal/io/dlog/source.go
@@ -0,0 +1,19 @@
+package dlog
+
+type source int
+
+const (
+ CLIENT source = iota
+ SERVER source = iota
+)
+
+func (s source) String() string {
+ switch s {
+ case CLIENT:
+ return "CLIENT"
+ case SERVER:
+ return "SERVER"
+ }
+
+ panic("Unknown log source type")
+}
diff --git a/internal/io/dlog/strategy.go b/internal/io/dlog/strategy.go
new file mode 100644
index 0000000..32d8298
--- /dev/null
+++ b/internal/io/dlog/strategy.go
@@ -0,0 +1,22 @@
+package dlog
+
+import "github.com/mimecast/dtail/internal/config"
+
+// Strategy allows to specify a log rotation strategy.
+type Strategy int
+
+// Possible log strategies.
+const (
+ NormalStrategy Strategy = iota
+ DailyStrategy Strategy = iota
+ StdoutStrategy Strategy = iota
+)
+
+func logStrategy() Strategy {
+ switch config.Common.LogStrategy {
+ case "daily":
+ return DailyStrategy
+ default:
+ }
+ return StdoutStrategy
+}
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
index cc5dd9b..bbcb74e 100644
--- a/internal/io/fs/permissions/permission.go
+++ b/internal/io/fs/permissions/permission.go
@@ -3,12 +3,12 @@
package permissions
import (
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ToRead is to check whether user has read permissions to a given file.
func ToRead(user, filePath string) (bool, error) {
// Only implemented for Linux, always expect true
- logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
+ dlog.Common.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
return true, nil
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index ec33c60..07486a1 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -14,7 +14,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/regex"
@@ -62,7 +62,7 @@ func (f readFile) Retry() bool {
// Start tailing a log file.
func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
- logger.Debug("readFile", f)
+ dlog.Common.Debug("readFile", f)
defer func() {
select {
case <-f.limiter:
@@ -74,7 +74,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
case f.limiter <- struct{}{}:
default:
select {
- case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
case <-ctx.Done():
return nil
}
@@ -126,7 +126,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
case strings.HasSuffix(f.FilePath(), ".gz"):
fallthrough
case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
+ dlog.Common.Info(f.FilePath(), "Detected gzip compression format")
var gzipReader *gzip.Reader
gzipReader, err = gzip.NewReader(fd)
if err != nil {
@@ -134,7 +134,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
}
reader = bufio.NewReader(gzipReader)
case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
+ dlog.Common.Info(f.FilePath(), "Detected zstd compression format")
reader = bufio.NewReader(zstd.NewReader(fd))
default:
reader = bufio.NewReader(fd)
@@ -172,7 +172,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
default:
}
if !f.seekEOF {
- logger.Info(f.FilePath(), "End of file reached")
+ dlog.Common.Info(f.FilePath(), "End of file reached")
return nil
}
time.Sleep(time.Millisecond * 100)
@@ -201,7 +201,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
default:
if message.Len() >= lineLengthThreshold {
if !warnedAboutLongLine {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
+ f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines")
warnedAboutLongLine = true
}
message.WriteString("\n")
@@ -268,7 +268,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, r
// Check wether log file is truncated. Returns nil if not.
func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
+ dlog.Common.Debug(f.filePath, "File truncation check")
// Can not seek currently open FD.
curPos, err := fd.Seek(0, os.SEEK_CUR)
diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go
index 6890201..6a6b5ec 100644
--- a/internal/io/logger/logger.go
+++ b/internal/io/logger/logger.go
@@ -1,5 +1,7 @@
package logger
+// TODO: Rewrite this logger
+
import (
"bufio"
"context"
@@ -64,12 +66,25 @@ var resumeCh chan struct{}
// Tell the logger about logrotation
var rotateCh chan os.Signal
+// Override the logger with a custom callack (e.g. for the t.Log for unit tests)
+type unitTestCallback func(message string)
+
+var unitTestOkCb unitTestCallback
+var unitTestErrorCb unitTestCallback
+
// Helper type to make logging non-blocking.
type buf struct {
time time.Time
message string
}
+// StartUnitTests enables to log all messages to the unit tests.
+func StartUnitTests(ctx context.Context, okCb, errCb unitTestCallback) {
+ unitTestOkCb = okCb
+ unitTestErrorCb = errCb
+ Start(ctx, Modes{UnitTest: true})
+}
+
// Start logging.
func Start(ctx context.Context, mode Modes) {
Mode = mode
@@ -91,12 +106,12 @@ func Start(ctx context.Context, mode Modes) {
switch strategy {
case DailyStrategy:
_, err := os.Stat(config.Common.LogDir)
- Mode.logToFile = !os.IsNotExist(err)
+ Mode.logToFile = !os.IsNotExist(err) && !Mode.UnitTest
Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet
case StdoutStrategy:
fallthrough
default:
- Mode.logToFile = !Mode.Server
+ Mode.logToFile = !Mode.Server && !Mode.UnitTest
Mode.logToStdout = true
}
@@ -182,8 +197,8 @@ func Fatal(args ...interface{}) string {
return log(clientStr, fatalStr, args)
}
-// FatalExit logs an error and exists the process.
-func FatalExit(args ...interface{}) {
+// FatalPanic logs an error and exists the process.
+func FatalPanic(args ...interface{}) {
what := clientStr
if Mode.Server {
what = serverStr
diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go
index 8864179..85f90a5 100644
--- a/internal/io/logger/modes.go
+++ b/internal/io/logger/modes.go
@@ -2,11 +2,12 @@ package logger
// Modes specifies the logging mode.
type Modes struct {
- Server bool
- Trace bool
Debug bool
+ logToFile bool
+ logToStdout bool
Nothing bool
Quiet bool
- logToStdout bool
- logToFile bool
+ Server bool
+ Trace bool
+ UnitTest bool
}
diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go
index 36ebdb5..7c3cdb5 100644
--- a/internal/io/prompt/prompt.go
+++ b/internal/io/prompt/prompt.go
@@ -6,7 +6,7 @@ import (
"os"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// Answer is a user input of a prompt question.
@@ -58,7 +58,7 @@ func (p *Prompt) Add(answer Answer) {
// Ask a question.
func (p *Prompt) Ask() {
reader := bufio.NewReader(os.Stdin)
- logger.Pause()
+ dlog.Common.Pause()
for {
fmt.Print(p.askString())
@@ -70,7 +70,7 @@ func (p *Prompt) Ask() {
}
if !a.AskAgain {
- logger.Resume()
+ dlog.Common.Resume()
if a.EndCallback != nil {
a.EndCallback()
}
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index 47f4925..14e6943 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -37,7 +37,7 @@ func (s *AggregateSet) String() string {
// Merge one aggregate set into this one.
func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
s.Samples += set.Samples
- //logger.Trace("Merge", set)
+ //dlog.Common.Trace("Merge", set)
for _, sc := range query.Select {
storage := sc.FieldStorage
@@ -70,7 +70,7 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- logger.Trace("Serialising mapr.AggregateSet", s)
+ dlog.Common.Trace("Serialising mapr.AggregateSet", s)
sb := pool.BuilderBuffer.Get().(*strings.Builder)
defer pool.RecycleBuilderBuffer(sb)
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 5cc09a1..d0c1d70 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -5,7 +5,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -52,7 +52,7 @@ func (a *Aggregate) Aggregate(message string) error {
for _, sc := range a.query.Select {
if val, ok := fields[sc.FieldStorage]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSamples = true
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go
index 1a89c3a..0433b9a 100644
--- a/internal/mapr/funcs/function.go
+++ b/internal/mapr/funcs/function.go
@@ -58,9 +58,9 @@ func NewFunctionStack(in string) (FunctionStack, string, error) {
// Call the function stack.
func (fs FunctionStack) Call(str string) string {
for i := len(fs) - 1; i >= 0; i-- {
- //logger.Debug("Call", fs[i].Name, str)
+ //dlog.Common.Debug("Call", fs[i].Name, str)
str = fs[i].call(str)
- //logger.Debug("Call.result", fs[i].Name, str)
+ //dlog.Common.Debug("Call.result", fs[i].Name, str)
}
return str
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 9bff790..df8c603 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -11,7 +11,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -189,7 +189,7 @@ func (g *GroupSet) WriteResult(query *Query) error {
return err
}
- logger.Info("Writing outfile", query.Outfile)
+ dlog.Common.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
file, err := os.Create(tmpOutfile)
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index c67137e..e0bbc30 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -26,6 +26,7 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
fields["$timeoffset"] = p.timeZoneOffset
fields["$severity"] = splitted[0]
+ fields["$loglevel"] = splitted[0]
// TODO: Parse time like we do at Mimecast
fields["$time"] = splitted[1]
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
index 23d75cb..3769c22 100644
--- a/internal/mapr/logformat/generickv.go
+++ b/internal/mapr/logformat/generickv.go
@@ -21,7 +21,7 @@ func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error)
for _, kv := range splitted[0:] {
keyAndValue := strings.SplitN(kv, "=", 2)
if len(keyAndValue) != 2 {
- //logger.Debug("Unable to parse key-value token, ignoring it", kv)
+ //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv)
continue
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index 6582b5f..a352580 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -8,7 +8,6 @@ import (
"strings"
"time"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
)
@@ -76,12 +75,10 @@ func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err erro
if errInterface == nil {
fields, err = returnValues[0].Interface().(map[string]string), nil
- logger.Trace("parser.MakeFields", fields, err)
return
}
fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error)
- logger.Trace("parser.MakeFields", fields, err)
return
}
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index 01852da..6c1d849 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -6,8 +6,6 @@ import (
"strconv"
"strings"
"time"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
const (
@@ -67,8 +65,6 @@ func NewQuery(queryStr string) (*Query, error) {
}
err := q.parse(tokens)
-
- logger.Debug(q)
return &q, err
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index d11ed7d..767aada 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
@@ -40,7 +40,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
fqdn, err := os.Hostname()
if err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
s := strings.Split(fqdn, ".")
@@ -55,12 +55,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
parserName = query.LogFormat
}
- logger.Info("Creating log format parser", parserName)
+ dlog.Common.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err)
if logParser, err = logformat.NewParser("generic", query); err != nil {
- logger.FatalExit("Could not create log format parser", err)
+ dlog.Common.FatalPanic("Could not create log format parser", err)
}
}
@@ -153,7 +153,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
if err != nil {
// Should fields be ignored anyway?
if err != logformat.IgnoreFieldsErr {
- logger.Error(fields, err)
+ dlog.Common.Error(fields, err)
}
continue
}
@@ -187,7 +187,7 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map
return
}
if err := a.query.SetClause(fields); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
select {
@@ -204,7 +204,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m
group := mapr.NewGroupSet()
serialize := func() {
- logger.Info("Serializing mapreduce result")
+ dlog.Common.Info("Serializing mapreduce result")
group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
}
@@ -243,7 +243,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSample = true
@@ -255,7 +255,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
return
}
- logger.Trace("Aggregated data locally without adding new samples")
+ dlog.Common.Trace("Aggregated data locally without adding new samples")
}
// Serialize all the aggregated data.
@@ -263,7 +263,7 @@ func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
case <-time.After(time.Minute):
- logger.Warn("Starting to serialize mapredice data takes over a minute")
+ dlog.Common.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
diff --git a/internal/mapr/token.go b/internal/mapr/token.go
index 8972188..7c6578b 100644
--- a/internal/mapr/token.go
+++ b/internal/mapr/token.go
@@ -58,12 +58,12 @@ func tokenize(queryStr string) []token {
}
func tokensConsume(tokens []token) ([]token, []token) {
- //logger.Trace("=====================")
+ //dlog.Common.Trace("=====================")
var consumed []token
for i, t := range tokens {
if t.isKeyword() {
- //logger.Trace("keyword", t)
+ //dlog.Common.Trace("keyword", t)
return tokens[i:], consumed
}
// strip escapes, such as ` from `foo`, this allows to use keywords as field names
@@ -73,7 +73,7 @@ func tokensConsume(tokens []token) ([]token, []token) {
}
if t.str[0] == '`' && t.str[length-1] == '`' {
stripped := t.str[1 : length-1]
- //logger.Trace("stripped", stripped)
+ //dlog.Common.Trace("stripped", stripped)
t := token{
str: stripped,
isBareword: t.isBareword,
@@ -81,11 +81,11 @@ func tokensConsume(tokens []token) ([]token, []token) {
consumed = append(consumed, t)
continue
}
- //logger.Trace("bare", token)
+ //dlog.Common.Trace("bare", token)
consumed = append(consumed, t)
}
- //logger.Trace("result", consumed)
+ //dlog.Common.Trace("result", consumed)
return nil, consumed
}
diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go
index cc1c164..6356d94 100644
--- a/internal/mapr/whereclause.go
+++ b/internal/mapr/whereclause.go
@@ -3,7 +3,7 @@ package mapr
import (
"strconv"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// WhereClause interprets the where clause of the mapreduce query.
@@ -55,7 +55,7 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64,
}
return f, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, float, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, float, t)
return 0, false
}
}
@@ -71,7 +71,7 @@ func whereClauseStringValue(fields map[string]string, str string, t fieldType) (
case String:
return str, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, t)
return str, false
}
}
diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go
index 7a60dba..c60c0a5 100644
--- a/internal/mapr/wherecondition.go
+++ b/internal/mapr/wherecondition.go
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// QueryOperation determines the mapreduce operation.
@@ -168,7 +168,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
case FloatGe:
return lValue >= rValue
default:
- logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue)
}
return false
@@ -193,7 +193,7 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
case StringNotHasSuffix:
return !strings.HasSuffix(lValue, rValue)
default:
- logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue)
}
return false
diff --git a/internal/regex/regex.go b/internal/regex/regex.go
index 2561659..352ffd6 100644
--- a/internal/regex/regex.go
+++ b/internal/regex/regex.go
@@ -4,8 +4,6 @@ import (
"fmt"
"regexp"
"strings"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
// Regex for filtering lines.
@@ -91,17 +89,17 @@ func (r Regex) MatchString(str string) bool {
}
// Serialize the regex.
-func (r Regex) Serialize() string {
+func (r Regex) Serialize() (string, error) {
var flags []string
for _, flag := range r.flags {
flags = append(flags, flag.String())
}
if !r.initialized {
- logger.FatalExit("Unable to serialize regex as not initialized properly", r)
+ return "", fmt.Errorf("Unable to serialize regex as not initialized properly: %v", r)
}
- return fmt.Sprintf("regex:%s %s", strings.Join(flags, ","), r.regexStr)
+ return fmt.Sprintf("regex:%s %s", strings.Join(flags, ","), r.regexStr), nil
}
// Deserialize the regex.
@@ -109,7 +107,6 @@ func Deserialize(str string) (Regex, error) {
// Get regex string
s := strings.SplitN(str, " ", 2)
if len(s) < 2 {
- logger.Debug("Using noop regex", str)
return NewNoop(), nil
}
@@ -127,10 +124,8 @@ func Deserialize(str string) (Regex, error) {
for _, flagStr := range strings.Split(s[1], ",") {
flag, err := NewFlag(flagStr)
if err != nil {
- logger.Error("ignoring flag", err)
continue
}
- logger.Debug("Adding regex flag", flag)
flags = append(flags, flag)
}
}
diff --git a/internal/regex/regex_test.go b/internal/regex/regex_test.go
index a5e7faf..2ce49ac 100644
--- a/internal/regex/regex_test.go
+++ b/internal/regex/regex_test.go
@@ -20,9 +20,13 @@ func TestRegex(t *testing.T) {
t.Errorf("expected to match string '%s' with regex '%v' but didn't\n", input, r)
}
- r2, err := Deserialize(r.Serialize())
+ serialized, err := r.Serialize()
if err != nil {
- t.Errorf("unable to serialize deserialized regex: %v: %v\n", r.Serialize(), err)
+ t.Errorf("unable to serialize regex: %v: %v\n", serialized, err)
+ }
+ r2, err := Deserialize(serialized)
+ if err != nil {
+ t.Errorf("unable to serialize deserialized regex: %v: %v\n", serialized, err)
}
if r.String() != r2.String() {
t.Errorf("regex should be the same after deserialize(serialize(..)), got '%s' but expected '%s'.\n",
@@ -37,9 +41,13 @@ func TestRegex(t *testing.T) {
t.Errorf("expected to not match string '%s' with regex '%v' but matched\n", input, r)
}
- r2, err = Deserialize(r.Serialize())
+ serialized, err = r.Serialize()
+ if err != nil {
+ t.Errorf("unable to serialize regex: %v: %v\n", serialized, err)
+ }
+ r2, err = Deserialize(serialized)
if err != nil {
- t.Errorf("unable to serialize deserialized regex: %v: %v\n", r.Serialize(), err)
+ t.Errorf("unable to serialize deserialized regex: %v: %v\n", serialized, err)
}
if r.String() != r2.String() {
t.Errorf("regex should be the same after deserialize(serialize(..)), got '%s' but expected '%s'.\n",
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index f75c732..5f4c454 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -8,9 +8,8 @@ import (
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
-
gossh "golang.org/x/crypto/ssh"
)
@@ -22,7 +21,7 @@ func newContinuous() *continuous {
}
func (c *continuous) start(ctx context.Context) {
- logger.Info("Starting continuous job runner after 10s")
+ dlog.Server.Info("Starting continuous job runner after 10s")
time.Sleep(time.Second * 10)
c.runJobs(ctx)
@@ -31,7 +30,7 @@ func (c *continuous) start(ctx context.Context) {
func (c *continuous) runJobs(ctx context.Context) {
for _, job := range config.Server.Continuous {
if !job.Enable {
- logger.Debug(job.Name, "Not running job as not enabled")
+ dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
}
@@ -51,7 +50,7 @@ func (c *continuous) runJobs(ctx context.Context) {
}
func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
- logger.Debug(job.Name, "Processing job")
+ dlog.Server.Debug(job.Name, "Processing job")
files := fillDates(job.Files)
outfile := fillDates(job.Outfile)
@@ -61,7 +60,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
servers = config.Server.SSHBindAddress
}
- args := clients.Args{
+ args := config.Args{
ConnectionsPerCPU: 10,
Discovery: job.Discovery,
ServersStr: servers,
@@ -75,7 +74,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
query := fmt.Sprintf("%s outfile %s", job.Query, outfile)
client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode)
if err != nil {
- logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
+ dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
return
}
@@ -85,21 +84,21 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
if job.RestartOnDayChange {
go func() {
if c.waitForDayChange(ctx) {
- logger.Info(fmt.Sprintf("Canceling job %s due to day change", job.Name))
+ dlog.Server.Info(fmt.Sprintf("Canceling job %s due to day change", job.Name))
cancel()
}
}()
}
- logger.Info(fmt.Sprintf("Starting job %s", job.Name))
+ dlog.Server.Info(fmt.Sprintf("Starting job %s", job.Name))
status := client.Start(jobCtx, make(chan string))
logMessage := fmt.Sprintf("Job exited with status %d", status)
if status != 0 {
- logger.Warn(logMessage)
+ dlog.Server.Warn(logMessage)
return
}
- logger.Info(logMessage)
+ dlog.Server.Info(logMessage)
}
func (c *continuous) waitForDayChange(ctx context.Context) bool {
diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go
index 1e17c78..ae70675 100644
--- a/internal/server/handlers/controlhandler.go
+++ b/internal/server/handlers/controlhandler.go
@@ -7,7 +7,7 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -22,7 +22,7 @@ type ControlHandler struct {
// NewControlHandler returns a new control handler.
func NewControlHandler(user *user.User) *ControlHandler {
- logger.Debug(user, "Creating control handler")
+ dlog.Server.Debug(user, "Creating control handler")
h := ControlHandler{
done: internal.NewDone(),
@@ -32,7 +32,7 @@ func NewControlHandler(user *user.User) *ControlHandler {
fqdn, err := os.Hostname()
if err != nil {
- logger.FatalExit(err)
+ dlog.Server.FatalPanic(err)
}
s := strings.Split(fqdn, ".")
@@ -84,15 +84,15 @@ func (h *ControlHandler) Write(p []byte) (n int, err error) {
}
func (h *ControlHandler) handleCommand(command string) {
- logger.Info(h.user, command)
+ dlog.Server.Info(h.user, command)
s := strings.Split(command, " ")
- logger.Debug(h.user, "Receiving command", command, s)
+ dlog.Server.Debug(h.user, "Receiving command", command, s)
switch s[0] {
case "health":
h.serverMessages <- "OK: DTail SSH Server seems fine"
h.serverMessages <- "done;"
default:
- h.serverMessages <- logger.Error(h.user, "Received unknown control command", command, s)
+ h.serverMessages <- dlog.Server.Error(h.user, "Received unknown control command", command, s)
}
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 69dd4a5..60ad2a0 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -32,13 +32,13 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err))
+ r.server.sendServerMessage(dlog.Server.Error(r.server.user, commandParseWarning, err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc))
+ r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, commandParseWarning, args, argc))
return
}
r.readGlob(ctx, args[1], re, retries)
@@ -51,14 +51,14 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
for retryCount := 0; retryCount < retries; retryCount++ {
paths, err := filepath.Glob(glob)
if err != nil {
- logger.Warn(r.server.user, glob, err)
+ dlog.Server.Warn(r.server.user, glob, err)
time.Sleep(retryInterval)
continue
}
if numPaths := len(paths); numPaths == 0 {
- logger.Error(r.server.user, "No such file(s) to read", glob)
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ dlog.Server.Error(r.server.user, "No such file(s) to read", glob)
+ r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
return
@@ -72,7 +72,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)"))
+ r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Giving up to read file(s)"))
return
}
@@ -92,8 +92,8 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr
globID := r.makeGlobID(path, glob)
if !r.server.user.HasFilePermission(path, "readfiles") {
- logger.Error(r.server.user, "No permission to read file", path, globID)
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
+ r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs"))
return
}
@@ -101,7 +101,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr
}
func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) {
- logger.Info(r.server.user, "Start reading file", path, globID)
+ dlog.Server.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
switch r.mode {
@@ -122,7 +122,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
aggregate.NextLinesCh <- lines
}
if err := reader.Start(ctx, lines, re); err != nil {
- logger.Error(r.server.user, path, globID, err)
+ dlog.Server.Error(r.server.user, path, globID, err)
}
if aggregate != nil {
// Also makes aggregate to Flush
@@ -139,7 +139,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
time.Sleep(time.Second * 2)
- logger.Info(path, globID, "Reading file again")
+ dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -161,6 +161,6 @@ func (r *readCommand) makeGlobID(path, glob string) string {
return pathParts[len(pathParts)-1]
}
- r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob))
+ r.server.sendServerWarnMessage(dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 4820476..b664566 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -16,7 +16,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
@@ -66,7 +66,7 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S
fqdn, err := os.Hostname()
if err != nil {
- logger.FatalExit(err)
+ dlog.Server.FatalPanic(err)
}
s := strings.Split(fqdn, ".")
@@ -165,18 +165,18 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) {
}
func (h *ServerHandler) handleCommand(commandStr string) {
- logger.Debug(h.user, commandStr)
+ dlog.Server.Debug(h.user, commandStr)
ctx := context.Background()
args, argc, add, err := h.handleProtocolVersion(strings.Split(commandStr, " "))
if err != nil {
- h.send(h.serverMessages, logger.Error(h.user, err)+add)
+ h.send(h.serverMessages, dlog.Server.Error(h.user, err)+add)
return
}
args, argc, err = h.handleBase64(args, argc)
if err != nil {
- h.send(h.serverMessages, logger.Error(h.user, err))
+ h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
@@ -239,7 +239,7 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er
args = strings.Split(decodedStr, " ")
argc = len(decodedStr)
- logger.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args)
+ dlog.Server.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args)
return args, argc, nil
}
@@ -247,14 +247,14 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er
func (h *ServerHandler) handleControlCommand(argc int, args []string) {
switch args[0] {
case "debug":
- h.send(h.serverMessages, logger.Debug(h.user, "Receiving debug command", argc, args))
+ h.send(h.serverMessages, dlog.Server.Debug(h.user, "Receiving debug command", argc, args))
default:
- logger.Warn(h.user, "Received unknown control command", argc, args)
+ dlog.Server.Warn(h.user, "Received unknown control command", argc, args)
}
}
func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) {
- logger.Debug(h.user, "handleUserCommand", argc, args)
+ dlog.Server.Debug(h.user, "handleUserCommand", argc, args)
h.incrementActiveCommands()
commandFinished := func() {
@@ -268,19 +268,19 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
options, err := readOptions(splitted[1:])
if err != nil {
- h.sendServerMessage(logger.Error(h.user, err))
+ h.sendServerMessage(dlog.Server.Error(h.user, err))
commandFinished()
return
}
if quiet, ok := options["quiet"]; ok {
if quiet == "true" {
- logger.Debug(h.user, "Enabling quiet mode")
+ dlog.Server.Debug(h.user, "Enabling quiet mode")
h.quiet = true
}
}
if spartan, ok := options["spartan"]; ok {
if spartan == "true" {
- logger.Debug(h.user, "Enabling spartan mode")
+ dlog.Server.Debug(h.user, "Enabling spartan mode")
h.spartan = true
}
}
@@ -304,7 +304,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command, aggregate, err := newMapCommand(h, argc, args)
if err != nil {
h.sendServerMessage(err.Error())
- logger.Error(h.user, err)
+ dlog.Server.Error(h.user, err)
commandFinished()
return
}
@@ -320,14 +320,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
commandFinished()
default:
- h.sendServerMessage(logger.Error(h.user, "Received unknown user command", commandName, argc, args, options))
+ h.sendServerMessage(dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options))
commandFinished()
}
}
func (h *ServerHandler) handleAckCommand(argc int, args []string) {
if argc < 3 {
- h.sendServerWarnMessage(logger.Warn(h.user, commandParseWarning, args, argc))
+ h.sendServerWarnMessage(dlog.Server.Warn(h.user, commandParseWarning, args, argc))
return
}
if args[1] == "close" && args[2] == "connection" {
@@ -362,25 +362,25 @@ func (h *ServerHandler) serverMessageC() chan<- string {
}
func (h *ServerHandler) flushMessages() {
- logger.Debug(h.user, "flushMessages()")
+ dlog.Server.Debug(h.user, "flushMessages()")
unsentMessages := func() int {
return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
for i := 0; i < 3; i++ {
if unsentMessages() == 0 {
- logger.Debug(h.user, "All lines sent")
+ dlog.Server.Debug(h.user, "All lines sent")
return
}
- logger.Debug(h.user, "Still lines to be sent")
+ dlog.Server.Debug(h.user, "Still lines to be sent")
time.Sleep(time.Second)
}
- logger.Warn(h.user, "Some lines remain unsent", unsentMessages())
+ dlog.Server.Warn(h.user, "Some lines remain unsent", unsentMessages())
}
func (h *ServerHandler) shutdown() {
- logger.Debug(h.user, "shutdown()")
+ dlog.Server.Debug(h.user, "shutdown()")
h.flushMessages()
go func() {
@@ -393,7 +393,7 @@ func (h *ServerHandler) shutdown() {
select {
case <-h.ackCloseReceived:
case <-time.After(time.Second * 5):
- logger.Debug(h.user, "Shutdown timeout reached, enforcing shutdown")
+ dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown")
case <-h.done.Done():
}
@@ -410,7 +410,7 @@ func (h *ServerHandler) decrementActiveCommands() int32 {
}
func readOptions(opts []string) (map[string]string, error) {
- logger.Debug("Parsing options", opts)
+ dlog.Server.Debug("Parsing options", opts)
options := make(map[string]string, len(opts))
for _, o := range opts {
@@ -430,7 +430,7 @@ func readOptions(opts []string) (map[string]string, error) {
val = string(decoded)
}
- logger.Debug("Setting option", key, val)
+ dlog.Server.Debug("Setting option", key, val)
options[key] = val
}
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index a1e9e36..f474cc8 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -10,7 +10,7 @@ import (
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -24,7 +24,7 @@ func newScheduler() *scheduler {
}
func (s *scheduler) start(ctx context.Context) {
- logger.Info("Starting scheduled job runner after 10s")
+ dlog.Server.Info("Starting scheduled job runner after 10s")
// First run after just 10s!
time.Sleep(time.Second * 10)
s.runJobs(ctx)
@@ -42,18 +42,18 @@ func (s *scheduler) start(ctx context.Context) {
func (s *scheduler) runJobs(ctx context.Context) {
for _, job := range config.Server.Schedule {
if !job.Enable {
- logger.Debug(job.Name, "Not running job as not enabled")
+ dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
}
hour, err := strconv.Atoi(time.Now().Format("15"))
if err != nil {
- logger.Error(job.Name, "Unable to create job", err)
+ dlog.Server.Error(job.Name, "Unable to create job", err)
continue
}
if hour < job.TimeRange[0] || hour >= job.TimeRange[1] {
- logger.Debug(job.Name, "Not running job out of time range")
+ dlog.Server.Debug(job.Name, "Not running job out of time range")
continue
}
@@ -62,7 +62,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
_, err = os.Stat(outfile)
if !os.IsNotExist(err) {
- logger.Debug(job.Name, "Not running job as outfile already exists", outfile)
+ dlog.Server.Debug(job.Name, "Not running job as outfile already exists", outfile)
continue
}
@@ -71,7 +71,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
servers = config.Server.SSHBindAddress
}
- args := clients.Args{
+ args := config.Args{
ConnectionsPerCPU: 10,
Discovery: job.Discovery,
ServersStr: servers,
@@ -85,21 +85,21 @@ func (s *scheduler) runJobs(ctx context.Context) {
query := fmt.Sprintf("%s outfile %s", job.Query, outfile)
client, err := clients.NewMaprClient(args, query, clients.CumulativeMode)
if err != nil {
- logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
+ dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
continue
}
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
- logger.Info(fmt.Sprintf("Starting job %s", job.Name))
+ dlog.Server.Info(fmt.Sprintf("Starting job %s", job.Name))
status := client.Start(jobCtx, make(chan string))
logMessage := fmt.Sprintf("Job exited with status %d", status)
if status != 0 {
- logger.Warn(logMessage)
+ dlog.Server.Warn(logMessage)
continue
}
- logger.Info(logMessage)
+ dlog.Server.Info(logMessage)
}
}
diff --git a/internal/server/server.go b/internal/server/server.go
index a20737e..a8f541b 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -9,7 +9,7 @@ import (
"strings"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/server/handlers"
"github.com/mimecast/dtail/internal/ssh/server"
user "github.com/mimecast/dtail/internal/user/server"
@@ -36,7 +36,7 @@ type Server struct {
// New returns a new server.
func New() *Server {
- logger.Info("Creating server", version.String())
+ dlog.Server.Info("Creating server", version.String())
s := Server{
sshServerConfig: &gossh.ServerConfig{},
@@ -51,7 +51,7 @@ func New() *Server {
private, err := gossh.ParsePrivateKey(server.PrivateHostKey())
if err != nil {
- logger.FatalExit(err)
+ dlog.Server.FatalPanic(err)
}
s.sshServerConfig.AddHostKey(private)
@@ -60,14 +60,14 @@ func New() *Server {
// Start the server.
func (s *Server) Start(ctx context.Context) int {
- logger.Info("Starting server")
+ dlog.Server.Info("Starting server")
bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort)
- logger.Info("Binding server", bindAt)
+ dlog.Server.Info("Binding server", bindAt)
listener, err := net.Listen("tcp", bindAt)
if err != nil {
- logger.FatalExit("Failed to open listening TCP socket", err)
+ dlog.Server.FatalPanic("Failed to open listening TCP socket", err)
}
go s.stats.start(ctx)
@@ -82,7 +82,7 @@ func (s *Server) Start(ctx context.Context) int {
}
func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
- logger.Debug("Starting listener loop")
+ dlog.Server.Debug("Starting listener loop")
for {
conn, err := listener.Accept() // Blocking
@@ -92,12 +92,12 @@ func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
return
default:
}
- logger.Error("Failed to accept incoming connection", err)
+ dlog.Server.Error("Failed to accept incoming connection", err)
continue
}
if err := s.stats.serverLimitExceeded(); err != nil {
- logger.Error(err)
+ dlog.Server.Error(err)
conn.Close()
continue
}
@@ -107,11 +107,11 @@ func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
}
func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
- logger.Info("Handling connection")
+ dlog.Server.Info("Handling connection")
sshConn, chans, reqs, err := gossh.NewServerConn(conn, s.sshServerConfig)
if err != nil {
- logger.Error("Something just happened", err)
+ dlog.Server.Error("Something just happened", err)
return
}
@@ -125,29 +125,29 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) {
user := user.New(sshConn.User(), sshConn.RemoteAddr().String())
- logger.Info(user, "Invoking channel handler")
+ dlog.Server.Info(user, "Invoking channel handler")
if newChannel.ChannelType() != "session" {
err := errors.New("Don'w allow other channel types than session")
- logger.Error(user, err)
+ dlog.Server.Error(user, err)
newChannel.Reject(gossh.Prohibited, err.Error())
return
}
channel, requests, err := newChannel.Accept()
if err != nil {
- logger.Error(user, "Could not accept channel", err)
+ dlog.Server.Error(user, "Could not accept channel", err)
return
}
if err := s.handleRequests(ctx, sshConn, requests, channel, user); err != nil {
- logger.Error(user, err)
+ dlog.Server.Error(user, err)
sshConn.Close()
}
}
func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error {
- logger.Info(user, "Invoking request handler")
+ dlog.Server.Info(user, "Invoking request handler")
for req := range in {
var payload = struct{ Value string }{}
@@ -190,10 +190,10 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
go func() {
if err := sshConn.Wait(); err != nil && err != io.EOF {
- logger.Error(user, err)
+ dlog.Server.Error(user, err)
}
s.stats.decrementConnections()
- logger.Info(user, "Good bye Mister!")
+ dlog.Server.Info(user, "Good bye Mister!")
terminate()
}()
@@ -216,7 +216,7 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
user := user.New(c.User(), c.RemoteAddr().String())
if config.ServerRelaxedAuthEnable {
- logger.Fatal(user, "Granting permissions via relaxed-auth")
+ dlog.Server.Fatal(user, "Granting permissions via relaxed-auth")
return nil, nil
}
@@ -228,20 +228,20 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
switch user.Name {
case config.ControlUser:
if authInfo == config.ControlUser {
- logger.Debug(user, "Granting permissions to control user")
+ dlog.Server.Debug(user, "Granting permissions to control user")
return nil, nil
}
case config.ScheduleUser:
for _, job := range config.Server.Schedule {
if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
- logger.Debug(user, "Granting SSH connection")
+ dlog.Server.Debug(user, "Granting SSH connection")
return nil, nil
}
}
case config.ContinuousUser:
for _, job := range config.Server.Continuous {
if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
- logger.Debug(user, "Granting SSH connection")
+ dlog.Server.Debug(user, "Granting SSH connection")
return nil, nil
}
}
@@ -252,22 +252,22 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
}
func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool {
- logger.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom)
+ dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom)
if jobName != allowedJobName {
- logger.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName)
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName)
return false
}
for _, myAddr := range allowFrom {
ips, err := net.LookupIP(myAddr)
if err != nil {
- logger.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP address for allowed hosts lookup, skipping to next one...", myAddr, err)
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP address for allowed hosts lookup, skipping to next one...", myAddr, err)
continue
}
for _, ip := range ips {
- logger.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String())
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String())
if remoteIP == ip.String() {
return true
}
diff --git a/internal/server/stats.go b/internal/server/stats.go
index 3e8c71d..8583318 100644
--- a/internal/server/stats.go
+++ b/internal/server/stats.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// Used to collect and display various server stats.
@@ -41,7 +41,7 @@ func (s *stats) hasConnections() bool {
s.mutex.Unlock()
has := currentConnections > 0
- logger.Info("stats", "Server with open connections?", has, currentConnections)
+ dlog.Server.Info("stats", "Server with open connections?", has, currentConnections)
return has
}
@@ -57,7 +57,7 @@ func (s *stats) logServerStats() {
data["cgocalls"] = runtime.NumCgoCall()
data["cpu"] = runtime.NumCPU()
- logger.Mapreduce("STATS", data)
+ dlog.Server.Mapreduce("STATS", data)
}
func (s *stats) serverLimitExceeded() error {
diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go
index 2ff80b2..4508319 100644
--- a/internal/ssh/client/authmethods.go
+++ b/internal/ssh/client/authmethods.go
@@ -4,7 +4,7 @@ import (
"os"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/ssh"
gossh "golang.org/x/crypto/ssh"
@@ -15,7 +15,7 @@ func InitSSHAuthMethods(sshAuthMethods []gossh.AuthMethod, hostKeyCallback gossh
if len(sshAuthMethods) > 0 {
simpleCallback, err := NewSimpleCallback()
if err != nil {
- logger.FatalExit(err)
+ dlog.Common.FatalPanic(err)
}
return sshAuthMethods, simpleCallback
}
@@ -29,13 +29,13 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri
knownHostsPath := os.Getenv("HOME") + "/.ssh/known_hosts"
knownHostsCallback, err := NewKnownHostsCallback(knownHostsPath, trustAllHosts, throttleCh)
if err != nil {
- logger.FatalExit(knownHostsPath, err)
+ dlog.Common.FatalPanic(knownHostsPath, err)
}
- logger.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsPath)
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsPath)
if config.Common.ExperimentalFeaturesEnable {
sshAuthMethods = append(sshAuthMethods, gossh.Password("experimental feature test"))
- logger.Debug("initKnownHostsAuthMethods", "Added experimental method to list of auth methods")
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Added experimental method to list of auth methods")
}
// First try to read custom private key path.
@@ -43,41 +43,41 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri
authMethod, err := ssh.PrivateKey(privateKeyPath)
if err == nil {
sshAuthMethods = append(sshAuthMethods, authMethod)
- logger.Debug("initKnownHostsAuthMethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
return sshAuthMethods, knownHostsCallback
}
- logger.FatalExit("Unable to use private SSH key", privateKeyPath, err)
+ dlog.Common.FatalPanic("Unable to use private SSH key", privateKeyPath, err)
}
// Second, try SSH Agent
authMethod, err := ssh.Agent()
if err == nil {
sshAuthMethods = append(sshAuthMethods, authMethod)
- logger.Debug("initKnownHostsAuthMethods", "Added SSH Agent (SSH_AUTH_SOCK) to list of auth methods, not adding further methods")
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Added SSH Agent (SSH_AUTH_SOCK) to list of auth methods, not adding further methods")
return sshAuthMethods, knownHostsCallback
}
- logger.Debug("initKnownHostsAuthMethods", "Unable to init SSH Agent auth method", err)
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to init SSH Agent auth method", err)
// Third, try Linux/UNIX default key paths
privateKeyPath = os.Getenv("HOME") + "/.ssh/id_rsa"
authMethod, err = ssh.PrivateKey(privateKeyPath)
if err == nil {
sshAuthMethods = append(sshAuthMethods, authMethod)
- logger.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
+ dlog.Common.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
return sshAuthMethods, knownHostsCallback
}
- logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
privateKeyPath = os.Getenv("HOME") + "/.ssh/id_dsa"
authMethod, err = ssh.PrivateKey(privateKeyPath)
if err == nil {
sshAuthMethods = append(sshAuthMethods, authMethod)
- logger.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
+ dlog.Common.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
return sshAuthMethods, knownHostsCallback
}
- logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
+ dlog.Common.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
- logger.FatalExit("Unable to find private SSH key information")
+ dlog.Common.FatalPanic("Unable to find private SSH key information")
// Never reach this point.
return sshAuthMethods, knownHostsCallback
diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go
index 1ccf6c6..a73d612 100644
--- a/internal/ssh/client/knownhostscallback.go
+++ b/internal/ssh/client/knownhostscallback.go
@@ -10,7 +10,7 @@ import (
"sync"
"time"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/prompt"
"golang.org/x/crypto/ssh"
@@ -97,7 +97,7 @@ func (c KnownHostsCallback) Wrap() ssh.HostKeyCallback {
responseCh: make(chan response),
}
- logger.Warn("Encountered unknown host", unknown)
+ dlog.Common.Warn("Encountered unknown host", unknown)
// Notify user that there is an unknown host
c.unknownCh <- unknown
@@ -139,7 +139,7 @@ func (c KnownHostsCallback) PromptAddHosts(ctx context.Context) {
hosts = []unknownHost{}
}
case <-ctx.Done():
- logger.Debug("Stopping goroutine prompting new hosts...")
+ dlog.Common.Debug("Stopping goroutine prompting new hosts...")
return
}
}
@@ -154,7 +154,7 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) {
select {
case <-c.trustAllHostsCh:
- logger.Warn("Trusting host keys of servers", servers)
+ dlog.Common.Warn("Trusting host keys of servers", servers)
c.trustHosts(hosts)
return
default:
@@ -175,7 +175,7 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) {
c.trustHosts(hosts)
},
EndCallback: func() {
- logger.Info("Added hosts to known hosts file", c.knownHostsPath)
+ dlog.Common.Info("Added hosts to known hosts file", c.knownHostsPath)
},
}
p.Add(a)
@@ -188,7 +188,7 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) {
c.trustHosts(hosts)
},
EndCallback: func() {
- logger.Info("Added hosts to known hosts file", c.knownHostsPath)
+ dlog.Common.Info("Added hosts to known hosts file", c.knownHostsPath)
},
}
p.Add(a)
@@ -200,7 +200,7 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) {
c.dontTrustHosts(hosts)
},
EndCallback: func() {
- logger.Info("Didn't add hosts to known hosts file", c.knownHostsPath)
+ dlog.Common.Info("Didn't add hosts to known hosts file", c.knownHostsPath)
},
}
p.Add(a)
diff --git a/internal/ssh/server/hostkey.go b/internal/ssh/server/hostkey.go
index 07790ad..20de1f0 100644
--- a/internal/ssh/server/hostkey.go
+++ b/internal/ssh/server/hostkey.go
@@ -1,11 +1,12 @@
package server
import (
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/ssh"
"io/ioutil"
"os"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/ssh"
)
// PrivateHostKey retrieves the private server RSA host key.
@@ -14,24 +15,24 @@ func PrivateHostKey() []byte {
_, err := os.Stat(hostKeyFile)
if os.IsNotExist(err) {
- logger.Info("Generating private server RSA host key")
+ dlog.Common.Info("Generating private server RSA host key")
privateKey, err := ssh.GeneratePrivateRSAKey(config.Server.HostKeyBits)
if err != nil {
- logger.FatalExit("Failed to generate private server RSA host key", err)
+ dlog.Common.FatalPanic("Failed to generate private server RSA host key", err)
}
pem := ssh.EncodePrivateKeyToPEM(privateKey)
if err := ioutil.WriteFile(hostKeyFile, pem, 0600); err != nil {
- logger.Error("Unable to write private server RSA host key to file", hostKeyFile, err)
+ dlog.Common.Error("Unable to write private server RSA host key to file", hostKeyFile, err)
}
return pem
}
- logger.Info("Reading private server RSA host key from file", hostKeyFile)
+ dlog.Common.Info("Reading private server RSA host key from file", hostKeyFile)
pem, err := ioutil.ReadFile(hostKeyFile)
if err != nil {
- logger.FatalExit("Failed to load private server RSA host key", err)
+ dlog.Common.FatalPanic("Failed to load private server RSA host key", err)
}
return pem
}
diff --git a/internal/ssh/server/publickeycallback.go b/internal/ssh/server/publickeycallback.go
index e81f019..65ecdd1 100644
--- a/internal/ssh/server/publickeycallback.go
+++ b/internal/ssh/server/publickeycallback.go
@@ -7,7 +7,7 @@ import (
osUser "os/user"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
user "github.com/mimecast/dtail/internal/user/server"
gossh "golang.org/x/crypto/ssh"
@@ -16,7 +16,7 @@ import (
// PublicKeyCallback is for the server to check whether a public SSH key is authorized ot not.
func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*gossh.Permissions, error) {
user := user.New(c.User(), c.RemoteAddr().String())
- logger.Info(user, "Incoming authorization")
+ dlog.Common.Info(user, "Incoming authorization")
cwd, err := os.Getwd()
if err != nil {
@@ -24,7 +24,7 @@ func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*go
}
if config.ServerRelaxedAuthEnable {
- logger.Fatal(user, "Granting permissions via relaxed-auth")
+ dlog.Common.Fatal(user, "Granting permissions via relaxed-auth")
return nil, nil
}
@@ -38,7 +38,7 @@ func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*go
authorizedKeysFile = user.HomeDir + "/.ssh/authorized_keys"
}
- logger.Info(user, "Reading", authorizedKeysFile)
+ dlog.Common.Info(user, "Reading", authorizedKeysFile)
authorizedKeysBytes, err := ioutil.ReadFile(authorizedKeysFile)
if err != nil {
return nil, fmt.Errorf("Unable to read authorized keys file|%s|%s|%s", authorizedKeysFile, user, err.Error())
@@ -53,10 +53,10 @@ func PublicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey) (*go
authorizedKeysMap[string(authorizedPubKey.Marshal())] = true
authorizedKeysBytes = restBytes
- logger.Debug(user, "Authorized public key fingerprint", gossh.FingerprintSHA256(authorizedPubKey))
+ dlog.Common.Debug(user, "Authorized public key fingerprint", gossh.FingerprintSHA256(authorizedPubKey))
}
- logger.Debug(user, "Offered public key fingerprint", gossh.FingerprintSHA256(offeredPubKey))
+ dlog.Common.Debug(user, "Offered public key fingerprint", gossh.FingerprintSHA256(offeredPubKey))
if authorizedKeysMap[string(offeredPubKey.Marshal())] {
return &gossh.Permissions{
diff --git a/internal/ssh/ssh.go b/internal/ssh/ssh.go
index 78bf99e..56494a7 100644
--- a/internal/ssh/ssh.go
+++ b/internal/ssh/ssh.go
@@ -11,7 +11,7 @@ import (
"os"
"syscall"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
gossh "golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
@@ -58,7 +58,7 @@ func Agent() (gossh.AuthMethod, error) {
return nil, err
}
for i, key := range keys {
- logger.Debug("Public key", i, key)
+ dlog.Common.Debug("Public key", i, key)
}
return gossh.PublicKeysCallback(agentClient.Signers), nil
}
@@ -106,7 +106,7 @@ func KeyFile(keyFile string) (gossh.AuthMethod, error) {
func PrivateKey(keyFile string) (gossh.AuthMethod, error) {
signer, err := KeyFile(keyFile)
if err != nil {
- logger.Debug(keyFile, err)
+ dlog.Common.Debug(keyFile, err)
return nil, err
}
return gossh.AuthMethod(signer), nil
diff --git a/internal/user/server/user.go b/internal/user/server/user.go
index 637945c..99cd211 100644
--- a/internal/user/server/user.go
+++ b/internal/user/server/user.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/fs/permissions"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
const maxLinkDepth int = 100
@@ -39,9 +39,9 @@ func (u *User) String() string {
// HasFilePermission is used to determine whether user is alowed to read a file.
func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission bool) {
- logger.Debug(u, filePath, permissionType, "Checking config permissions")
+ dlog.Common.Debug(u, filePath, permissionType, "Checking config permissions")
if config.ServerRelaxedAuthEnable {
- logger.Fatal(u, filePath, permissionType, "Server releaxed auth enabled")
+ dlog.Common.Fatal(u, filePath, permissionType, "Server releaxed auth enabled")
return true
}
@@ -52,25 +52,25 @@ func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission
cleanPath, err := filepath.EvalSymlinks(filePath)
if err != nil {
- logger.Error(u, filePath, permissionType, "Unable to evaluate symlinks", err)
+ dlog.Common.Error(u, filePath, permissionType, "Unable to evaluate symlinks", err)
hasPermission = false
return
}
cleanPath, err = filepath.Abs(cleanPath)
if err != nil {
- logger.Error(u, cleanPath, permissionType, "Unable to make file path absolute", err)
+ dlog.Common.Error(u, cleanPath, permissionType, "Unable to make file path absolute", err)
hasPermission = false
return
}
if cleanPath != filePath {
- logger.Info(u, filePath, cleanPath, permissionType, "Calculated new clean path from original file path (possibly symlink)")
+ dlog.Common.Info(u, filePath, cleanPath, permissionType, "Calculated new clean path from original file path (possibly symlink)")
}
hasPermission, err = u.hasFilePermission(cleanPath, permissionType)
if err != nil {
- logger.Warn(u, cleanPath, err)
+ dlog.Common.Warn(u, cleanPath, err)
}
return
@@ -81,7 +81,7 @@ func (u *User) hasFilePermission(cleanPath, permissionType string) (bool, error)
if _, err := permissions.ToRead(u.Name, cleanPath); err != nil {
return false, fmt.Errorf("User without OS file system permissions to read path: '%v'", err)
}
- logger.Info(u, cleanPath, permissionType, "User with OS file system permissions to path")
+ dlog.Common.Info(u, cleanPath, permissionType, "User with OS file system permissions to path")
// Only allow to follow regular files or symlinks.
info, err := os.Lstat(cleanPath)
@@ -123,7 +123,7 @@ func (u *User) iteratePaths(cleanPath, permissionType string) (bool, error) {
permission = strings.Join(splitted[1:], ":")
}
- logger.Debug(u, cleanPath, typeStr, permission)
+ dlog.Common.Debug(u, cleanPath, typeStr, permission)
if typeStr != permissionType {
continue
@@ -141,12 +141,12 @@ func (u *User) iteratePaths(cleanPath, permissionType string) (bool, error) {
}
if negate && re.MatchString(cleanPath) {
- logger.Info(u, cleanPath, "Permission test failed partially, matching negative pattern '%s'", permission)
+ dlog.Common.Info(u, cleanPath, "Permission test failed partially, matching negative pattern '%s'", permission)
hasPermission = false
}
if !negate && re.MatchString(cleanPath) {
- logger.Info(u, cleanPath, "Permission test passed partially, matching positive pattern", permission)
+ dlog.Common.Info(u, cleanPath, "Permission test passed partially, matching positive pattern", permission)
hasPermission = true
}
}
diff --git a/samples/dtail.json.sample b/samples/dtail.json.sample
index 91233f9..4f9b9ab 100644
--- a/samples/dtail.json.sample
+++ b/samples/dtail.json.sample
@@ -70,7 +70,7 @@
"TmpDir": "tmp",
"LogStrategy": "stdout",
"SSHPort": 2222,
- "DebugEnable": false,
+ "LogLevel": "INFO",
"ExperimentalFeaturesEnable": false
}
}