summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-19 13:22:59 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commitfe3e68afd99d8ea246be52893730f987e138ec24 (patch)
tree726e0914730912e0a3b223f7b37facc05ba31140 /internal/clients
parentabeac87aec44249bf67f1b0eca471a31086265ca (diff)
move args to config package
logger package rewrite as dlog
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/args.go63
-rw-r--r--internal/clients/baseclient.go28
-rw-r--r--internal/clients/catclient.go12
-rw-r--r--internal/clients/connectors/serverconnection.go34
-rw-r--r--internal/clients/connectors/serverless.go21
-rw-r--r--internal/clients/grepclient.go13
-rw-r--r--internal/clients/handlers/basehandler.go6
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/maprhandler.go8
-rw-r--r--internal/clients/maprclient.go40
-rw-r--r--internal/clients/stats.go8
-rw-r--r--internal/clients/tailclient.go16
12 files changed, 107 insertions, 146 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go
deleted file mode 100644
index dc71e83..0000000
--- a/internal/clients/args.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package clients
-
-import (
- "flag"
- "fmt"
- "strings"
-
- "github.com/mimecast/dtail/internal/omode"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// Args is a helper struct to summarize common client arguments.
-type Args struct {
- Mode omode.Mode
- ServersStr string
- UserName string
- What string
- Arguments []string
- RegexStr string
- RegexInvert bool
- TrustAllHosts bool
- Discovery string
- ConnectionsPerCPU int
- Timeout int
- SSHAuthMethods []gossh.AuthMethod
- SSHHostKeyCallback gossh.HostKeyCallback
- PrivateKeyPathFile string
- Quiet bool
- Spartan bool
- NoColor bool
- Serverless bool
-}
-
-// Transform the arguments based on certain conditions.
-func (a *Args) Transform(args []string) {
- // Interpret additional args as file list.
- if a.What == "" {
- var files []string
- for _, file := range flag.Args() {
- files = append(files, file)
- }
- a.What = strings.Join(files, ",")
- }
-
- if a.Spartan {
- a.Quiet = true
- a.NoColor = true
- }
-
- if a.Discovery == "" && a.ServersStr == "" {
- a.Serverless = true
- }
-}
-
-// TransformAfterConfigFile same as Transform, but after the config file has been read.
-func (a *Args) TransformAfterConfigFile() {
- // TODO: Remove this method. It's not used.
-}
-
-func (a *Args) SerializeOptions() string {
- return fmt.Sprintf("quiet=%v:spartan=%v", a.Quiet, a.Spartan)
-}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 5523052..fc01955 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -6,8 +6,9 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/connectors"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/discovery"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
"github.com/mimecast/dtail/internal/ssh/client"
@@ -17,7 +18,7 @@ import (
// This is the main client data structure.
type baseClient struct {
- Args
+ config.Args
// To display client side stats
stats *stats
// List of remote servers to connect to.
@@ -39,7 +40,8 @@ type baseClient struct {
}
func (c *baseClient) init() {
- logger.Debug("Initiating base client")
+ dlog.Client.Debug("Initiating base client")
+ dlog.Client.Debug(c.Args.String())
flag := regex.Default
if c.Args.RegexInvert {
@@ -47,11 +49,14 @@ func (c *baseClient) init() {
}
regex, err := regex.New(c.Args.RegexStr, flag)
if err != nil {
- logger.FatalExit(c.Regex, "invalid regex!", err, regex)
+ dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex)
}
c.Regex = regex
- logger.Debug("Regex", c.Regex)
+ dlog.Client.Debug("Regex", c.Regex)
+ if c.Args.Serverless {
+ return
+ }
c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile)
}
@@ -67,8 +72,11 @@ func (c *baseClient) makeConnections(maker maker) {
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
- // Periodically check for unknown hosts, and ask the user whether to trust them or not.
- go c.hostKeyCallback.PromptAddHosts(ctx)
+ // Can be nil when serverless.
+ if c.hostKeyCallback != nil {
+ // Periodically check for unknown hosts, and ask the user whether to trust them or not.
+ go c.hostKeyCallback.PromptAddHosts(ctx)
+ }
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
@@ -112,7 +120,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
}
time.Sleep(time.Second * 2)
- logger.Debug(conn.Server(), "Reconnecting")
+ dlog.Client.Debug(conn.Server(), "Reconnecting")
conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback)
c.connections[i] = conn
}
@@ -127,7 +135,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe
}
func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer logger.Debug("Terminated connection")
+ defer dlog.Client.Debug("Terminated connection")
// We want to have at least one active connection
<-active
@@ -143,7 +151,7 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
if numActive == 0 {
return
}
- logger.Debug("Active connections", numActive)
+ dlog.Client.Debug("Active connections", numActive)
time.Sleep(time.Second)
}
}
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index d14cdcc..2726e7e 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -7,6 +7,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +18,7 @@ type CatClient struct {
}
// NewCatClient returns a new cat client.
-func NewCatClient(args Args) (*CatClient, error) {
+func NewCatClient(args config.Args) (*CatClient, error) {
if args.RegexStr != "" {
return nil, errors.New("Can't use regex with 'cat' operating mode")
}
@@ -42,11 +44,13 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
}
func (c CatClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file, c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
return
}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index 0904ba1..5bc63ee 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -10,7 +10,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/ssh/client"
"golang.org/x/crypto/ssh"
@@ -30,7 +30,7 @@ type ServerConnection struct {
// NewServerConnection returns a new DTail SSH server connection.
func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection {
- logger.Debug(server, "Creating new connection", server, handler, commands)
+ dlog.Client.Debug(server, "Creating new connection", server, handler, commands)
c := ServerConnection{
hostKeyCallback: hostKeyCallback,
@@ -81,10 +81,10 @@ func (c *ServerConnection) initServerPort() {
parts := strings.Split(c.server, ":")
if len(parts) == 2 {
- logger.Debug("Parsing port from hostname", parts)
+ dlog.Client.Debug("Parsing port from hostname", parts)
port, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse client port", c.server, parts, err)
+ dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err)
}
c.server = parts[0]
c.port = port
@@ -93,21 +93,21 @@ func (c *ServerConnection) initServerPort() {
func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
// Throttle how many connections can be established concurrently (based on ch length)
- logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
select {
case throttleCh <- struct{}{}:
case <-ctx.Done():
- logger.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
return
}
- logger.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
go func() {
defer func() {
if !c.throttlingDone {
- logger.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
@@ -115,9 +115,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
}()
if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil {
- logger.Warn(c.server, c.port, err)
+ dlog.Client.Warn(c.server, c.port, err)
if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.server, c.port)) {
- logger.Debug(c.server, "Not trusting host")
+ dlog.Client.Debug(c.server, "Not trusting host")
}
}
}()
@@ -127,14 +127,14 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
// Dail into a new SSH connection. Close connection in case of an error.
func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error {
- logger.Debug(c.server, "Incrementing connection stats")
+ dlog.Client.Debug(c.server, "Incrementing connection stats")
statsCh <- struct{}{}
defer func() {
- logger.Debug(c.server, "Decrementing connection stats")
+ dlog.Client.Debug(c.server, "Decrementing connection stats")
<-statsCh
}()
- logger.Debug(c.server, "Dialing into the connection")
+ dlog.Client.Debug(c.server, "Dialing into the connection")
address := fmt.Sprintf("%s:%d", c.server, c.port)
client, err := ssh.Dial("tcp", address, c.config)
@@ -148,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
// Create the SSH session. Close the session in case of an error.
func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error {
- logger.Debug(c.server, "Creating SSH session")
+ dlog.Client.Debug(c.server, "Creating SSH session")
session, err := client.NewSession()
if err != nil {
@@ -160,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun
}
func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error {
- logger.Debug(c.server, "Creating handler for SSH session")
+ dlog.Client.Debug(c.server, "Creating handler for SSH session")
stdinPipe, err := session.StdinPipe()
if err != nil {
@@ -196,12 +196,12 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc
// Send all commands to client.
for _, command := range c.commands {
- logger.Debug(command)
+ dlog.Client.Debug(command)
c.handler.SendMessage(command)
}
if !c.throttlingDone {
- logger.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 0500645..c7b5f62 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -6,7 +6,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
serverHandlers "github.com/mimecast/dtail/internal/server/handlers"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -26,7 +26,7 @@ func NewServerless(userName string, handler handlers.Handler, commands []string)
commands: commands,
}
- logger.Debug("Creating new serverless connector", handler, commands)
+ dlog.Client.Debug("Creating new serverless connector", handler, commands)
return &s
}
@@ -43,7 +43,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
defer cancel()
if err := s.handle(ctx, cancel); err != nil {
- logger.Warn(err)
+ dlog.Client.Warn(err)
}
}()
@@ -51,7 +51,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
}
func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error {
- logger.Debug("Creating server handler for a serverless session")
+ dlog.Client.Debug("Creating server handler for a serverless session")
serverHandler := serverHandlers.NewServerHandler(
user.New(s.userName, s.Server()),
@@ -59,14 +59,19 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
make(chan struct{}, config.Server.MaxConcurrentTails),
)
+ terminate := func() {
+ serverHandler.Shutdown()
+ cancel()
+ }
+
go func() {
io.Copy(serverHandler, s.handler)
- cancel()
+ terminate()
}()
go func() {
io.Copy(s.handler, serverHandler)
- cancel()
+ terminate()
}()
go func() {
@@ -74,12 +79,12 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
case <-s.handler.Done():
case <-ctx.Done():
}
- cancel()
+ terminate()
}()
// Send all commands to client.
for _, command := range s.commands {
- logger.Debug(command)
+ dlog.Client.Debug(command)
s.handler.SendMessage(command)
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index ea5022b..ae21ff2 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -7,6 +7,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +18,7 @@ type GrepClient struct {
}
// NewGrepClient creates a new grep client.
-func NewGrepClient(args Args) (*GrepClient, error) {
+func NewGrepClient(args config.Args) (*GrepClient, error) {
if args.RegexStr == "" {
return nil, errors.New("No regex specified, use '-regex' flag")
}
@@ -41,12 +43,13 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
}
func (c GrepClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
return
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index af1ad62..3291b43 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -51,7 +51,7 @@ func (h *baseHandler) Shutdown() {
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
- logger.Debug("Sending command", h.server, command, encoded)
+ dlog.Client.Debug("Sending command", h.server, command, encoded)
select {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
@@ -112,7 +112,7 @@ func (h *baseHandler) handleMessageType(message string) {
return
}
- logger.Raw(message)
+ dlog.Client.Raw(message)
}
// Handle messages received from server which are not meant to be displayed
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 2bcb038..27ac85e 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -2,7 +2,7 @@ package handlers
import (
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ClientHandler is the basic client handler interface.
@@ -12,7 +12,7 @@ type ClientHandler struct {
// NewClientHandler creates a new client handler.
func NewClientHandler(server string) *ClientHandler {
- logger.Debug(server, "Creating new client handler")
+ dlog.Client.Debug(server, "Creating new client handler")
return &ClientHandler{
baseHandler{
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 65b1454..848e7f0 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -4,7 +4,7 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
"github.com/mimecast/dtail/internal/protocol"
@@ -40,7 +40,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
continue
case protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
- logger.Debug(message)
+ dlog.Client.Debug(message)
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
@@ -60,10 +60,10 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
func (h *MaprHandler) handleAggregateMessage(message string) {
parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
if len(parts) != 3 {
- logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
return
}
if err := h.aggregate.Aggregate(parts[2]); err != nil {
- logger.Error("Unable to aggregate data", h.server, message, err)
+ dlog.Client.Error("Unable to aggregate data", h.server, message, err)
}
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index e6ab96b..f23aa08 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -11,7 +11,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
)
@@ -44,14 +44,14 @@ type MaprClient struct {
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
+func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
if queryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
query, err := mapr.NewQuery(queryStr)
if err != nil {
- logger.FatalExit(queryStr, "Can't parse mapr query", err)
+ dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err)
}
// Don't retry connection if in tail mode and no outfile specified.
@@ -68,7 +68,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
cumulative = args.Mode == omode.MapClient || query.HasOutfile()
}
- logger.Debug("Cumulative mapreduce mode?", cumulative)
+ dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)
c := MaprClient{
baseClient: baseClient{
@@ -103,7 +103,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Debug("Received final mapreduce result")
+ dlog.Client.Debug("Received final mapreduce result")
c.reportResults()
}
@@ -123,15 +123,17 @@ func (c MaprClient) makeCommands() (commands []string) {
}
for _, file := range strings.Split(c.What, ",") {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
+ modeStr, file, regex))
continue
}
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- modeStr,
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ modeStr, c.Args.SerializeOptions(), file, regex))
}
return
@@ -141,7 +143,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
- logger.Debug("Gathering interim mapreduce result")
+ dlog.Client.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -177,17 +179,17 @@ func (c *MaprClient) printResults() {
}
if err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
if result == c.lastResult {
- logger.Debug("Result hasn't changed compared to last time...")
+ dlog.Client.Debug("Result hasn't changed compared to last time...")
return
}
c.lastResult = result
if numRows == 0 {
- logger.Debug("Empty result set this time...")
+ dlog.Client.Debug("Empty result set this time...")
return
}
@@ -198,24 +200,24 @@ func (c *MaprClient) printResults() {
config.Client.TermColors.MaprTable.RawQueryBg,
config.Client.TermColors.MaprTable.RawQueryAttr)
}
- logger.Raw(rawQuery)
+ dlog.Client.Raw(rawQuery)
if rowsLimit > 0 && numRows > rowsLimit {
- logger.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!",
+ dlog.Client.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!",
numRows, rowsLimit))
}
- logger.Raw(result)
+ dlog.Client.Raw(result)
}
func (c *MaprClient) writeResultsToOutfile() {
if c.cumulative {
if err := c.globalGroup.WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
return
}
if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 6da443c..fbef572 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -10,7 +10,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -67,7 +67,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
s.printStatsDueInterrupt(messages)
default:
data := s.statsData(connected, newConnections, throttle)
- logger.Mapreduce("STATS", data)
+ dlog.Client.Mapreduce("STATS", data)
}
connectedLast = connected
@@ -78,7 +78,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
}
func (s *stats) printStatsDueInterrupt(messages []string) {
- logger.Pause()
+ dlog.Client.Pause()
for i, message := range messages {
if i > 0 && config.Client.TermColorsEnable {
fmt.Println(color.PaintStrWithAttr(message,
@@ -91,7 +91,7 @@ func (s *stats) printStatsDueInterrupt(messages []string) {
fmt.Println(fmt.Sprintf(" %s", message))
}
time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
- logger.Resume()
+ dlog.Client.Resume()
}
func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} {
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 360354b..d42a0e4 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -6,7 +6,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,7 +17,7 @@ type TailClient struct {
}
// NewTailClient returns a new TailClient.
-func NewTailClient(args Args) (*TailClient, error) {
+func NewTailClient(args config.Args) (*TailClient, error) {
args.Mode = omode.TailClient
c := TailClient{
@@ -38,14 +39,15 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
}
func (c TailClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(),
- c.Args.SerializeOptions(),
- file,
- c.Regex.Serialize()))
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
- logger.Debug(commands)
+ dlog.Client.Debug(commands)
return
}