From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/clients/handlers/basehandler.go | 6 +++--- internal/clients/handlers/maprhandler.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index f07fd90..acafe0e 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error { logger.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error { func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..7ac5895 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // related data. func (h *MaprHandler) handleAggregateMessage(message string) { h.count++ - parts := strings.Split(message, "➔") + parts := strings.Split(message, protocol.AggregateDelimiter) // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. -- cgit v1.2.3 From 9883a190109623b64e6d311dc2b462a6eae68003 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 22 Aug 2021 10:07:00 +0300 Subject: introduces the protocol package --- internal/clients/handlers/basehandler.go | 3 ++- internal/clients/handlers/healthhandler.go | 3 ++- internal/clients/handlers/maprhandler.go | 2 +- internal/clients/maprclient.go | 3 ++- 4 files changed, 7 insertions(+), 4 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index acafe0e..fe83faa 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -56,13 +56,14 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } message := string(h.receiveBuf) h.handleMessageType(message) + } else { + h.receiveBuf = append(h.receiveBuf, b) } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 0440706..213748c 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -6,6 +6,7 @@ import ( "time" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/protocol" ) // HealthHandler implements the handler required for health checks. @@ -72,7 +73,7 @@ func (h *HealthHandler) SendMessage(command string) error { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { // '\n' { h.receive <- string(h.receiveBuf) h.receiveBuf = h.receiveBuf[:0] } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 7ac5895..afad507 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -37,7 +37,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { // '\n' { if len(h.baseHandler.receiveBuf) == 0 { continue } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 1c0c2cc..77b674b 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -170,7 +170,8 @@ func (c *MaprClient) printResults() { return } - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + //logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + logger.Raw(c.query.RawQuery) logger.Raw(result) } -- cgit v1.2.3 From 23982f331c2154a66b86d596226c24454fd06be5 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 20:26:32 +0100 Subject: 1. Major performance gain by not checking for file truncation aftter each bytes read. 2. Introduce field separator to the protocol package. --- internal/clients/healthclient.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'internal/clients') diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index e93f6be..692464c 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -11,6 +11,7 @@ import ( "github.com/mimecast/dtail/internal/clients/remote" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/protocol" gossh "golang.org/x/crypto/ssh" ) @@ -57,7 +58,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) { select { case data := <-receive: // Parse recieved data. - s := strings.Split(data, "|") + s := strings.Split(data, protocol.FieldDelimiter) message := s[len(s)-1] if strings.HasPrefix(message, "done;") { return -- cgit v1.2.3 From 2c1c70313bb03cf2b2d7e7afadb07a48ff6bb690 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 6 Sep 2021 09:22:21 +0300 Subject: REMOTE and CLIENT colors are brushed correctly too now --- internal/clients/stats.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/stats.go b/internal/clients/stats.go index d8163d4..3f76e0f 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" ) @@ -54,7 +55,6 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < throttle := len(throttleCh) newConnections := connected - connectedLast - if (connected == connectedLast || quiet) && !force { continue } @@ -77,7 +77,15 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < func (s *stats) printStatsDueInterrupt(messages []string) { logger.Pause() - for _, message := range messages { + for i, message := range messages { + if i > 0 && config.Client.TermColorsEnable { + fmt.Println(color.PaintStrWithAttr(message, + config.Client.TermColors.Client.TextFg, + config.Client.TermColors.Client.TextBg, + config.Client.TermColors.Client.TextAttr, + )) + continue + } fmt.Println(fmt.Sprintf(" %s", message)) } time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) @@ -99,7 +107,6 @@ func (s *stats) statsLine(connected, newConnections int, throttle int) string { func (s *stats) numConnected() int { s.mutex.Lock() defer s.mutex.Unlock() - return s.connected } @@ -107,6 +114,5 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } - return value / (total / 100.0) } -- cgit v1.2.3 From cc89d3fb8be2465af276d7ef03ea2a8affd87b2e Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 6 Sep 2021 13:48:55 +0300 Subject: Print out client/server update notice even from dtail server 4 to dtail client 3. --- internal/clients/handlers/basehandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index fe83faa..63ceaac 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -56,7 +56,7 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - if b == protocol.MessageDelimiter { + if b == protocol.MessageDelimiter || b == '\n' { if len(h.receiveBuf) == 0 { continue } -- cgit v1.2.3 From 6ae75e8f106d3eee18ea61e6c4d6925c6f514460 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 6 Sep 2021 14:07:07 +0300 Subject: fine tweak color schema --- internal/clients/stats.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 3f76e0f..faeb9fb 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -80,9 +80,9 @@ func (s *stats) printStatsDueInterrupt(messages []string) { for i, message := range messages { if i > 0 && config.Client.TermColorsEnable { fmt.Println(color.PaintStrWithAttr(message, - config.Client.TermColors.Client.TextFg, - config.Client.TermColors.Client.TextBg, - config.Client.TermColors.Client.TextAttr, + config.Client.TermColors.Client.ClientFg, + config.Client.TermColors.Client.ClientBg, + config.Client.TermColors.Client.ClientAttr, )) continue } -- cgit v1.2.3 From f74a9e4b35feb8c07d8a70b5a581088a0a59889d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 7 Sep 2021 10:01:32 +0300 Subject: Produce MAPREDUCE lines, can aggregate these via default log format --- internal/clients/stats.go | 49 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 13 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/stats.go b/internal/clients/stats.go index faeb9fb..6da443c 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -11,12 +11,13 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/protocol" ) // Used to collect and display various client stats. type stats struct { // Total amount servers to connect to. - connectionsTotal int + servers int // To keep track of what connected and disconnected connectionsEstCh chan struct{} // Amount of servers connections are established. @@ -25,10 +26,10 @@ type stats struct { mutex sync.Mutex } -func newTailStats(connectionsTotal int) *stats { +func newTailStats(servers int) *stats { return &stats{ - connectionsTotal: connectionsTotal, - connectionsEstCh: make(chan struct{}, connectionsTotal), + servers: servers, + connectionsEstCh: make(chan struct{}, servers), connected: 0, } } @@ -59,13 +60,14 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < continue } - stats := s.statsLine(connected, newConnections, throttle) switch force { case true: + stats := s.statsLine(connected, newConnections, throttle) messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) s.printStatsDueInterrupt(messages) default: - logger.Info(stats) + data := s.statsData(connected, newConnections, throttle) + logger.Mapreduce("STATS", data) } connectedLast = connected @@ -92,16 +94,37 @@ func (s *stats) printStatsDueInterrupt(messages []string) { logger.Resume() } +func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} { + percConnected := percentOf(float64(s.servers), float64(connected)) + + data := make(map[string]interface{}) + data["connected"] = connected + data["servers"] = s.servers + data["connected%"] = int(percConnected) + data["new"] = newConnections + data["throttle"] = throttle + data["goroutines"] = runtime.NumGoroutine() + data["cgocalls"] = runtime.NumCgoCall() + data["cpu"] = runtime.NumCPU() + + return data +} + func (s *stats) statsLine(connected, newConnections int, throttle int) string { - percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) + sb := strings.Builder{} - var stats []string - stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))) - stats = append(stats, fmt.Sprintf("new=%d", newConnections)) - stats = append(stats, fmt.Sprintf("throttle=%d", throttle)) - stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())) + i := 0 + for k, v := range s.statsData(connected, newConnections, throttle) { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } + sb.WriteString(k) + sb.WriteByte('=') + sb.WriteString(fmt.Sprintf("%v", v)) + i++ + } - return strings.Join(stats, "|") + return sb.String() } func (s *stats) numConnected() int { -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/clients/baseclient.go | 1 + internal/clients/handlers/basehandler.go | 29 +++++++++++++-------- internal/clients/handlers/healthhandler.go | 11 ++++---- internal/clients/handlers/maprhandler.go | 42 +++++++++++++++--------------- internal/clients/maprclient.go | 31 +++++++++++++++++----- 5 files changed, 70 insertions(+), 44 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index f20156f..de0c101 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -71,6 +71,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) + // Keep count of active connections active := make(chan struct{}, len(c.connections)) diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 63ceaac..51f33c1 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "encoding/base64" "fmt" "io" @@ -17,7 +18,7 @@ type baseHandler struct { server string shellStarted bool commands chan string - receiveBuf []byte + receiveBuf bytes.Buffer status int } @@ -56,14 +57,23 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - if b == protocol.MessageDelimiter || b == '\n' { - if len(h.receiveBuf) == 0 { + switch b { + /* + // TODO: Next DTail version make it so that '\n' gets ignored. For now + // leave it for compatibility with older DTail server + ability to display + // the protocol mismatch warn message. + case '\n' { + continue + */ + case '\n', protocol.MessageDelimiter: + message := h.receiveBuf.String() + if len(message) == 0 { continue } - message := string(h.receiveBuf) h.handleMessageType(message) - } else { - h.receiveBuf = append(h.receiveBuf, b) + h.receiveBuf.Reset() + default: + h.receiveBuf.WriteByte(b) } } @@ -78,25 +88,22 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } - return } // Handle various message types. func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { + if len(message) == 0 { return } // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { + if message[0] == '.' { h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] return } logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] } // Handle messages received from server which are not meant to be displayed diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 213748c..eca0348 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "errors" "fmt" "time" @@ -13,7 +14,7 @@ import ( type HealthHandler struct { done *internal.Done // Buffer of incoming data from server. - receiveBuf []byte + receiveBuf bytes.Buffer // To send commands to the server. commands chan string // To receive messages from the server. @@ -72,10 +73,10 @@ func (h *HealthHandler) SendMessage(command string) error { // Server writes byte stream to client. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] + h.receiveBuf.WriteByte(b) + if b == protocol.MessageDelimiter { + h.receive <- h.receiveBuf.String() + h.receiveBuf.Reset() } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index afad507..65b1454 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -15,7 +15,6 @@ type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. @@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + logger.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessageType(message) } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue - } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } @@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // Handle a message received from server including mapr aggregation // related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, protocol.AggregateDelimiter) - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + logger.Error("Unable to aggregate data", h.server, message, err) + } } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 77b674b..cab9a6c 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -9,6 +9,8 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" @@ -37,6 +39,8 @@ type MaprClient struct { query *mapr.Query // Additative result or new result every interval run? cumulative bool + // The last result string received + lastResult string } // NewMaprClient returns a new mapreduce client. @@ -154,24 +158,37 @@ func (c *MaprClient) reportResults() { func (c *MaprClient) printResults() { var result string var err error - var numLines int + var numRows int if c.cumulative { - result, numLines, err = c.globalGroup.Result(c.query) + result, numRows, err = c.globalGroup.Result(c.query) } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + result, numRows, err = c.globalGroup.SwapOut().Result(c.query) } + if err != nil { logger.FatalExit(err) } - if numLines == 0 { - logger.Warn("Empty result set this time...") + if result == c.lastResult { + logger.Debug("Result hasn't changed compared to last time...") return } + c.lastResult = result - //logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(c.query.RawQuery) + if numRows == 0 { + logger.Debug("Empty result set this time...") + return + } + + rawQuery := c.query.RawQuery + if config.Client.TermColorsEnable { + rawQuery = color.PaintStrWithAttr(rawQuery, + config.Client.TermColors.MaprTable.RawQueryFg, + config.Client.TermColors.MaprTable.RawQueryBg, + config.Client.TermColors.MaprTable.RawQueryAttr) + } + logger.Raw(rawQuery) logger.Raw(result) } -- cgit v1.2.3 From 842fd5800000bb68d6306a9ecad80a98ed762a2f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:01:34 +0300 Subject: limit mapreduce table output to 10 rows by default --- internal/clients/maprclient.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index cab9a6c..2cad15d 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -158,12 +158,18 @@ func (c *MaprClient) reportResults() { func (c *MaprClient) printResults() { var result string var err error - var numRows int + var numRows, rowsLimit int + + if c.query.Limit == -1 { + // Limit output to 10 rows when the result is printed to stdout. + // This can be overriden with the limit clause though. + rowsLimit = 10 + } if c.cumulative { - result, numRows, err = c.globalGroup.Result(c.query) + result, numRows, err = c.globalGroup.Result(c.query, rowsLimit) } else { - result, numRows, err = c.globalGroup.SwapOut().Result(c.query) + result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } if err != nil { @@ -189,6 +195,11 @@ func (c *MaprClient) printResults() { config.Client.TermColors.MaprTable.RawQueryAttr) } logger.Raw(rawQuery) + + if rowsLimit > 0 && numRows > rowsLimit { + logger.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!", + numRows, rowsLimit)) + } logger.Raw(result) } -- cgit v1.2.3 From 2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:04:42 +0300 Subject: bugfix: dmap skipped the last couple of mapreduce lines --- internal/clients/maprclient.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 2cad15d..68352ea 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -158,7 +158,8 @@ func (c *MaprClient) reportResults() { func (c *MaprClient) printResults() { var result string var err error - var numRows, rowsLimit int + var numRows int + rowsLimit := -1 if c.query.Limit == -1 { // Limit output to 10 rows when the result is printed to stdout. -- cgit v1.2.3 From 5e717af91e8012c72ec7dc0204420dea46f187db Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 11:57:09 +0300 Subject: new docker test cases - also change default FATAL bg color to magenta --- internal/clients/args.go | 46 +++++++++++++++++++++++++++++++- internal/clients/baseclient.go | 3 +++ internal/clients/handlers/basehandler.go | 7 ----- 3 files changed, 48 insertions(+), 8 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/args.go b/internal/clients/args.go index 7f782f1..7ce1634 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -1,6 +1,13 @@ package clients import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -22,5 +29,42 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string - Quiet bool + Quiet bool +} + +// When no servers are given, connect to localhost! +func (a *Args) handleEmptyServer() { + if a.Discovery != "" || a.ServersStr != "" { + return + } + + fqdn, err := os.Hostname() + if err != nil { + logger.FatalExit(err) + } + a.ServersStr = fmt.Sprintf("%s:%d", fqdn, config.Common.SSHPort) + // I am trusting my own hostname. + a.TrustAllHosts = true + logger.Debug("Will connect to local server", a.ServersStr) + + cleanPath := func(dirtyPath string) string { + cleanPath, err := filepath.EvalSymlinks(dirtyPath) + if err != nil { + logger.FatalExit("Unable to evaluate symlinks", dirtyPath, err) + } + cleanPath, err = filepath.Abs(cleanPath) + if err != nil { + logger.FatalExit("Unable to make file path absolute", dirtyPath, cleanPath, err) + } + return cleanPath + } + + logger.Debug("Dirty file paths", a.What) + var filePaths []string + for _, dirtyPath := range strings.Split(a.What, ",") { + filePaths = append(filePaths, cleanPath(dirtyPath)) + } + + a.What = strings.Join(filePaths, ",") + logger.Debug("Clean file paths", a.What) } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index de0c101..455174e 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -41,6 +41,9 @@ type baseClient struct { func (c *baseClient) init() { logger.Debug("Initiating base client") + // When empty server list provided, connect to localhost by default. + c.Args.handleEmptyServer() + flag := regex.Default if c.Args.RegexInvert { flag = regex.Invert diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 51f33c1..74559e9 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -112,12 +112,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") - select { - case <-time.After(time.Second * 5): - logger.Debug("Shutting down client after timeout and sending ack to server") - h.Shutdown() - case <-h.Done(): - return - } } } -- cgit v1.2.3 From 6506e20f6c80f4acb7434eb9dd14f784a67189cd Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 14:41:25 +0300 Subject: add spartan mode --- internal/clients/args.go | 34 ++++++++++++++++++++++++++++---- internal/clients/baseclient.go | 3 --- internal/clients/catclient.go | 6 ++++-- internal/clients/grepclient.go | 7 +++++-- internal/clients/handlers/basehandler.go | 15 +++++++------- internal/clients/maprclient.go | 7 +++++-- internal/clients/tailclient.go | 7 +++++-- 7 files changed, 56 insertions(+), 23 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/args.go b/internal/clients/args.go index 7ce1634..081b911 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -1,6 +1,7 @@ package clients import ( + "flag" "fmt" "os" "path/filepath" @@ -30,14 +31,35 @@ type Args struct { SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string Quiet bool + Spartan bool + NoColor bool } -// When no servers are given, connect to localhost! -func (a *Args) handleEmptyServer() { - if a.Discovery != "" || a.ServersStr != "" { - return +// Transform the arguments based on certain conditions. +func (a *Args) Transform(args []string) { + // Interpret additional args as file list. + if a.What == "" { + var files []string + for _, file := range flag.Args() { + files = append(files, file) + } + a.What = strings.Join(files, ",") + } + + if a.Spartan { + a.Quiet = true + a.NoColor = true } +} +// TransformAfterConfigFile same as Transform, but after the config file has been read. +func (a *Args) TransformAfterConfigFile() { + if a.Discovery == "" && a.ServersStr == "" { + a.handleEmptyServer() + } +} + +func (a *Args) handleEmptyServer() { fqdn, err := os.Hostname() if err != nil { logger.FatalExit(err) @@ -68,3 +90,7 @@ func (a *Args) handleEmptyServer() { a.What = strings.Join(filePaths, ",") logger.Debug("Clean file paths", a.What) } + +func (a *Args) SerializeOptions() string { + return fmt.Sprintf("quiet=%v:spartan=%v", a.Quiet, a.Spartan) +} diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 455174e..de0c101 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -41,9 +41,6 @@ type baseClient struct { func (c *baseClient) init() { logger.Debug("Initiating base client") - // When empty server list provided, connect to localhost by default. - c.Args.handleEmptyServer() - flag := regex.Default if c.Args.RegexInvert { flag = regex.Invert diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index b7b6131..d14cdcc 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,9 +42,11 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), + c.Args.SerializeOptions(), + file, c.Regex.Serialize())) } return } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 652c31b..ea5022b 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,9 +41,12 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), + c.Args.SerializeOptions(), + file, + c.Regex.Serialize())) } return diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 74559e9..0f2d1b5 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -67,9 +67,12 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - if len(message) == 0 { - continue - } + /* + // dcat/grep should actually display empty lines. + if len(message) == 0 { + continue + } + */ h.handleMessageType(message) h.receiveBuf.Reset() default: @@ -93,12 +96,8 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { // Handle various message types. func (h *baseHandler) handleMessageType(message string) { - if len(message) == 0 { - return - } - // Hidden server commands starti with a dot "." - if message[0] == '.' { + if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 68352ea..e6ab96b 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -116,7 +116,6 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { @@ -128,7 +127,11 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + modeStr, + c.Args.SerializeOptions(), + file, + c.Regex.Serialize())) } return diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index cefbaa7..360354b 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -38,9 +38,12 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { - options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), + c.Args.SerializeOptions(), + file, + c.Regex.Serialize())) } logger.Debug(commands) -- cgit v1.2.3 From 69b88a1cae0a61bd22530c384f40166b37b9f1ea Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 18:43:19 +0300 Subject: remote connector is now an interface --- internal/clients/baseclient.go | 24 +-- internal/clients/connectors/connector.go | 17 ++ internal/clients/connectors/serverconnection.go | 225 ++++++++++++++++++++++++ internal/clients/healthclient.go | 12 +- internal/clients/remote/connection.go | 212 ---------------------- 5 files changed, 259 insertions(+), 231 deletions(-) create mode 100644 internal/clients/connectors/connector.go create mode 100644 internal/clients/connectors/serverconnection.go delete mode 100644 internal/clients/remote/connection.go (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index de0c101..d0631fc 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/clients/remote" + "github.com/mimecast/dtail/internal/clients/connectors" "github.com/mimecast/dtail/internal/discovery" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" @@ -23,7 +23,7 @@ type baseClient struct { // List of remote servers to connect to. servers []string // We have one connection per remote server. - connections []*remote.Connection + connections []connectors.Connector // SSH auth methods to use to connect to the remote servers. sshAuthMethods []gossh.AuthMethod // To deal with SSH host keys @@ -77,7 +77,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var mutex sync.Mutex for i, conn := range c.connections { - go func(i int, conn *remote.Connection) { + go func(i int, conn connectors.Connector) { connStatus := c.start(ctx, active, i, conn) // Update global status. @@ -93,7 +93,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn *remote.Connection) (status int) { +func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { // Increment connection count active <- struct{}{} // Derement connection count @@ -105,26 +105,20 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh) // Retrieve status code from handler (dtail client will exit with that status) - status = conn.Handler.Status() + status = conn.Handler().Status() if !c.retry { return } time.Sleep(time.Second * 2) - logger.Debug(conn.Server, "Reconnecting") - - conn = c.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback) - c.connections[i] = conn + logger.Debug(conn.Server(), "Reconnecting") + c.connections[i] = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) } } -func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = c.maker.makeHandler(server) - conn.Commands = c.maker.makeCommands() - - return conn +func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { + return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { diff --git a/internal/clients/connectors/connector.go b/internal/clients/connectors/connector.go new file mode 100644 index 0000000..3ab6a08 --- /dev/null +++ b/internal/clients/connectors/connector.go @@ -0,0 +1,17 @@ +package connectors + +import ( + "context" + + "github.com/mimecast/dtail/internal/clients/handlers" +) + +// Connector interface. +type Connector interface { + // Start the connection. + Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) + // Server hostname. + Server() string + // Handler for the connection. + Handler() handlers.Handler +} diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go new file mode 100644 index 0000000..fab2f87 --- /dev/null +++ b/internal/clients/connectors/serverconnection.go @@ -0,0 +1,225 @@ +package connectors + +import ( + "context" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/ssh/client" + + "golang.org/x/crypto/ssh" +) + +// ServerConnection represents a client connection connection to a single server. +type ServerConnection struct { + // The remote server's hostname connected to. + server string + // The remote server's port connected to. + port int + // The SSH client configuration used. + config *ssh.ClientConfig + // The SSH client handler to use. + handler handlers.Handler + // DTail commands sent from client to server. When client loses + // connection to the server it re-connects automatically and sends the + // same commands again. + commands []string + // Is it a persistent connection or a one-off? + isOneOff bool + // To deal with SSH server host keys + hostKeyCallback client.HostKeyCallback + // To determine if connection throttling has finished or not + throttlingDone bool +} + +// NewServerConnection returns a new connection. +func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { + logger.Debug(server, "Creating new connection") + + c := ServerConnection{ + hostKeyCallback: hostKeyCallback, + server: server, + handler: handler, + commands: commands, + config: &ssh.ClientConfig{ + User: userName, + Auth: authMethods, + HostKeyCallback: hostKeyCallback.Wrap(), + Timeout: time.Second * 2, + }, + } + + c.initServerPort() + return &c +} + +// NewOneOffServerConnection creates new one-off connection (only for sending a series of commands and then quit). +func NewOneOffServerConnection(server string, userName string, authMethods []ssh.AuthMethod, handler handlers.Handler, commands []string) *ServerConnection { + c := ServerConnection{ + server: server, + handler: handler, + commands: commands, + config: &ssh.ClientConfig{ + User: userName, + Auth: authMethods, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + }, + isOneOff: true, + } + + c.initServerPort() + return &c +} + +// Server hostname +func (c *ServerConnection) Server() string { + return c.server +} + +// Handler for the connection +func (c *ServerConnection) Handler() handlers.Handler { + return c.handler +} + +// Attempt to parse the server port address from the provided server FQDN. +func (c *ServerConnection) initServerPort() { + c.port = config.Common.SSHPort + parts := strings.Split(c.server, ":") + + if len(parts) == 2 { + logger.Debug("Parsing port from hostname", parts) + port, err := strconv.Atoi(parts[1]) + if err != nil { + logger.FatalExit("Unable to parse client port", c.server, parts, err) + } + c.server = parts[0] + c.port = port + } +} + +// Start the server connection. Build up SSH session and send some DTail commands. +func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + // Throttle how many connections can be established concurrently (based on ch length) + logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) + + select { + case throttleCh <- struct{}{}: + case <-ctx.Done(): + logger.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) + return + } + + logger.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) + + go func() { + defer func() { + if !c.throttlingDone { + logger.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) + c.throttlingDone = true + <-throttleCh + } + cancel() + }() + + if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { + logger.Warn(c.server, c.port, err) + if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.server, c.port)) { + logger.Debug(c.server, "Not trusting host") + } + } + }() + + <-ctx.Done() +} + +// Dail into a new SSH connection. Close connection in case of an error. +func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { + logger.Debug(c.server, "Incrementing connection stats") + statsCh <- struct{}{} + defer func() { + logger.Debug(c.server, "Decrementing connection stats") + <-statsCh + }() + + logger.Debug(c.server, "Dialing into the connection") + address := fmt.Sprintf("%s:%d", c.server, c.port) + + client, err := ssh.Dial("tcp", address, c.config) + if err != nil { + return err + } + defer client.Close() + + return c.session(ctx, cancel, client, throttleCh) +} + +// Create the SSH session. Close the session in case of an error. +func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { + logger.Debug(c.server, "session") + + session, err := client.NewSession() + if err != nil { + return err + } + defer session.Close() + + return c.handle(ctx, cancel, session, throttleCh) +} + +func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { + logger.Debug(c.server, "handle") + + stdinPipe, err := session.StdinPipe() + if err != nil { + return err + } + + stdoutPipe, err := session.StdoutPipe() + if err != nil { + return err + } + + if err := session.Shell(); err != nil { + return err + } + + go func() { + io.Copy(stdinPipe, c.handler) + cancel() + }() + + go func() { + io.Copy(c.handler, stdoutPipe) + cancel() + }() + + go func() { + select { + case <-c.handler.Done(): + case <-ctx.Done(): + } + cancel() + }() + + // Send all commands to client. + for _, command := range c.commands { + logger.Debug(command) + c.handler.SendMessage(command) + } + + if !c.throttlingDone { + logger.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) + c.throttlingDone = true + <-throttleCh + } + + <-ctx.Done() + c.handler.Shutdown() + return nil +} diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 692464c..47007b6 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -7,8 +7,8 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal/clients/connectors" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/clients/remote" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/protocol" @@ -47,9 +47,13 @@ func (c *HealthClient) Start(ctx context.Context) (status int) { throttleCh := make(chan struct{}, runtime.NumCPU()) statsCh := make(chan struct{}, 1) - conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods) - conn.Handler = handlers.NewHealthHandler(c.server, receive) - conn.Commands = []string{c.mode.String()} + conn := connectors.NewOneOffServerConnection( + c.server, + c.userName, + c.sshAuthMethods, + handlers.NewHealthHandler(c.server, receive), + []string{c.mode.String()}, + ) connCtx, cancel := context.WithCancel(ctx) go conn.Start(connCtx, cancel, throttleCh, statsCh) diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go deleted file mode 100644 index b29ffed..0000000 --- a/internal/clients/remote/connection.go +++ /dev/null @@ -1,212 +0,0 @@ -package remote - -import ( - "context" - "fmt" - "io" - "strconv" - "strings" - "time" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/ssh/client" - - "golang.org/x/crypto/ssh" -) - -// Connection represents a client connection connection to a single server. -type Connection struct { - // The remote server's hostname connected to. - Server string - // The remote server's port connected to. - port int - // The SSH client configuration used. - config *ssh.ClientConfig - // The SSH client handler to use. - Handler handlers.Handler - // DTail commands sent from client to server. When client loses - // connection to the server it re-connects automatically and sends the - // same commands again. - Commands []string - // Is it a persistent connection or a one-off? - isOneOff bool - // To deal with SSH server host keys - hostKeyCallback client.HostKeyCallback - // To determine if connection throttling has finished or not - throttlingDone bool -} - -// NewConnection returns a new connection. -func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback) *Connection { - logger.Debug(server, "Creating new connection") - - c := Connection{ - hostKeyCallback: hostKeyCallback, - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: hostKeyCallback.Wrap(), - Timeout: time.Second * 3, - }, - } - - c.initServerPort(server) - - return &c -} - -// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit). -func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection { - c := Connection{ - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - }, - isOneOff: true, - } - - c.initServerPort(server) - - return &c -} - -// Attempt to parse the server port address from the provided server FQDN. -func (c *Connection) initServerPort(server string) { - c.Server = server - c.port = config.Common.SSHPort - parts := strings.Split(server, ":") - - if len(parts) == 2 { - logger.Debug("Parsing port from hostname", parts) - port, err := strconv.Atoi(parts[1]) - if err != nil { - logger.FatalExit("Unable to parse client port", server, parts, err) - } - c.Server = parts[0] - c.port = port - } -} - -// Start the server connection. Build up SSH session and send some DTail commands. -func (c *Connection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { - // Throttle how many connections can be established concurrently (based on ch length) - logger.Debug(c.Server, "Throttling connection", len(throttleCh), cap(throttleCh)) - - select { - case throttleCh <- struct{}{}: - case <-ctx.Done(): - logger.Debug(c.Server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) - return - } - - logger.Debug(c.Server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) - - go func() { - defer func() { - if !c.throttlingDone { - logger.Debug(c.Server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) - c.throttlingDone = true - <-throttleCh - } - cancel() - }() - - if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { - logger.Warn(c.Server, c.port, err) - if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) { - logger.Debug(c.Server, "Not trusting host") - } - } - }() - - <-ctx.Done() -} - -// Dail into a new SSH connection. Close connection in case of an error. -func (c *Connection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { - logger.Debug(c.Server, "Incrementing connection stats") - statsCh <- struct{}{} - defer func() { - logger.Debug(c.Server, "Decrementing connection stats") - <-statsCh - }() - - logger.Debug(c.Server, "Dialing into the connection") - address := fmt.Sprintf("%s:%d", c.Server, c.port) - - client, err := ssh.Dial("tcp", address, c.config) - if err != nil { - return err - } - defer client.Close() - - return c.session(ctx, cancel, client, throttleCh) -} - -// Create the SSH session. Close the session in case of an error. -func (c *Connection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - logger.Debug(c.Server, "session") - - session, err := client.NewSession() - if err != nil { - return err - } - defer session.Close() - - return c.handle(ctx, cancel, session, throttleCh) -} - -func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - logger.Debug(c.Server, "handle") - - stdinPipe, err := session.StdinPipe() - if err != nil { - return err - } - - stdoutPipe, err := session.StdoutPipe() - if err != nil { - return err - } - - if err := session.Shell(); err != nil { - return err - } - - go func() { - io.Copy(stdinPipe, c.Handler) - cancel() - }() - - go func() { - io.Copy(c.Handler, stdoutPipe) - cancel() - }() - - go func() { - select { - case <-c.Handler.Done(): - case <-ctx.Done(): - } - cancel() - }() - - // Send all commands to client. - for _, command := range c.Commands { - logger.Debug(command) - c.Handler.SendMessage(command) - } - - if !c.throttlingDone { - logger.Debug(c.Server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) - c.throttlingDone = true - <-throttleCh - } - - <-ctx.Done() - c.Handler.Shutdown() - return nil -} -- cgit v1.2.3 From abeac87aec44249bf67f1b0eca471a31086265ca Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 19:27:50 +0300 Subject: fix auto reconnect --- internal/clients/args.go | 43 ++---------- internal/clients/baseclient.go | 9 ++- internal/clients/connectors/serverconnection.go | 38 ++++------- internal/clients/connectors/serverless.go | 90 +++++++++++++++++++++++++ internal/clients/handlers/basehandler.go | 10 +++ 5 files changed, 125 insertions(+), 65 deletions(-) create mode 100644 internal/clients/connectors/serverless.go (limited to 'internal/clients') diff --git a/internal/clients/args.go b/internal/clients/args.go index 081b911..dc71e83 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -3,12 +3,8 @@ package clients import ( "flag" "fmt" - "os" - "path/filepath" "strings" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -33,6 +29,7 @@ type Args struct { Quiet bool Spartan bool NoColor bool + Serverless bool } // Transform the arguments based on certain conditions. @@ -50,45 +47,15 @@ func (a *Args) Transform(args []string) { a.Quiet = true a.NoColor = true } -} -// TransformAfterConfigFile same as Transform, but after the config file has been read. -func (a *Args) TransformAfterConfigFile() { if a.Discovery == "" && a.ServersStr == "" { - a.handleEmptyServer() + a.Serverless = true } } -func (a *Args) handleEmptyServer() { - fqdn, err := os.Hostname() - if err != nil { - logger.FatalExit(err) - } - a.ServersStr = fmt.Sprintf("%s:%d", fqdn, config.Common.SSHPort) - // I am trusting my own hostname. - a.TrustAllHosts = true - logger.Debug("Will connect to local server", a.ServersStr) - - cleanPath := func(dirtyPath string) string { - cleanPath, err := filepath.EvalSymlinks(dirtyPath) - if err != nil { - logger.FatalExit("Unable to evaluate symlinks", dirtyPath, err) - } - cleanPath, err = filepath.Abs(cleanPath) - if err != nil { - logger.FatalExit("Unable to make file path absolute", dirtyPath, cleanPath, err) - } - return cleanPath - } - - logger.Debug("Dirty file paths", a.What) - var filePaths []string - for _, dirtyPath := range strings.Split(a.What, ",") { - filePaths = append(filePaths, cleanPath(dirtyPath)) - } - - a.What = strings.Join(filePaths, ",") - logger.Debug("Clean file paths", a.What) +// TransformAfterConfigFile same as Transform, but after the config file has been read. +func (a *Args) TransformAfterConfigFile() { + // TODO: Remove this method. It's not used. } func (a *Args) SerializeOptions() string { diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d0631fc..5523052 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -113,12 +113,17 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con time.Sleep(time.Second * 2) logger.Debug(conn.Server(), "Reconnecting") - c.connections[i] = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) + conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) + c.connections[i] = conn } } func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { - return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) + if c.Args.Serverless { + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands()) + } + return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, + hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index fab2f87..0904ba1 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -16,31 +16,21 @@ import ( "golang.org/x/crypto/ssh" ) -// ServerConnection represents a client connection connection to a single server. +// ServerConnection represents a connection to a single remote dtail server via SSH protocol. type ServerConnection struct { - // The remote server's hostname connected to. - server string - // The remote server's port connected to. - port int - // The SSH client configuration used. - config *ssh.ClientConfig - // The SSH client handler to use. - handler handlers.Handler - // DTail commands sent from client to server. When client loses - // connection to the server it re-connects automatically and sends the - // same commands again. - commands []string - // Is it a persistent connection or a one-off? - isOneOff bool - // To deal with SSH server host keys + server string + port int + config *ssh.ClientConfig + handler handlers.Handler + commands []string + isOneOff bool hostKeyCallback client.HostKeyCallback - // To determine if connection throttling has finished or not - throttlingDone bool + throttlingDone bool } -// NewServerConnection returns a new connection. +// NewServerConnection returns a new DTail SSH server connection. func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - logger.Debug(server, "Creating new connection") + logger.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, @@ -77,12 +67,10 @@ func NewOneOffServerConnection(server string, userName string, authMethods []ssh return &c } -// Server hostname func (c *ServerConnection) Server() string { return c.server } -// Handler for the connection func (c *ServerConnection) Handler() handlers.Handler { return c.handler } @@ -103,7 +91,6 @@ func (c *ServerConnection) initServerPort() { } } -// Start the server connection. Build up SSH session and send some DTail commands. func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { // Throttle how many connections can be established concurrently (based on ch length) logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) @@ -161,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, // Create the SSH session. Close the session in case of an error. func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - logger.Debug(c.server, "session") + logger.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { @@ -173,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun } func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - logger.Debug(c.server, "handle") + logger.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { @@ -221,5 +208,6 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc <-ctx.Done() c.handler.Shutdown() + return nil } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go new file mode 100644 index 0000000..0500645 --- /dev/null +++ b/internal/clients/connectors/serverless.go @@ -0,0 +1,90 @@ +package connectors + +import ( + "context" + "io" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/logger" + serverHandlers "github.com/mimecast/dtail/internal/server/handlers" + user "github.com/mimecast/dtail/internal/user/server" +) + +// Serverless creates a server object directly without TCP. +type Serverless struct { + handler handlers.Handler + commands []string + userName string +} + +// NewServerConnection returns a new connection. +func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { + s := Serverless{ + userName: userName, + handler: handler, + commands: commands, + } + + logger.Debug("Creating new serverless connector", handler, commands) + return &s +} + +func (s *Serverless) Server() string { + return "local(serverless)" +} + +func (s *Serverless) Handler() handlers.Handler { + return s.handler +} + +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + go func() { + defer cancel() + + if err := s.handle(ctx, cancel); err != nil { + logger.Warn(err) + } + }() + + <-ctx.Done() +} + +func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { + logger.Debug("Creating server handler for a serverless session") + + serverHandler := serverHandlers.NewServerHandler( + user.New(s.userName, s.Server()), + make(chan struct{}, config.Server.MaxConcurrentCats), + make(chan struct{}, config.Server.MaxConcurrentTails), + ) + + go func() { + io.Copy(serverHandler, s.handler) + cancel() + }() + + go func() { + io.Copy(s.handler, serverHandler) + cancel() + }() + + go func() { + select { + case <-s.handler.Done(): + case <-ctx.Done(): + } + cancel() + }() + + // Send all commands to client. + for _, command := range s.commands { + logger.Debug(command) + s.handler.SendMessage(command) + } + + <-ctx.Done() + s.handler.Shutdown() + + return nil +} diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 0f2d1b5..af1ad62 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -22,6 +22,16 @@ type baseHandler struct { status int } +func (h *baseHandler) String() string { + return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p", + h.done, + h.server, + h.shellStarted, + h.status, + h, + ) +} + func (h *baseHandler) Server() string { return h.server } -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/clients/args.go | 63 ------------------------- internal/clients/baseclient.go | 28 +++++++---- internal/clients/catclient.go | 12 +++-- internal/clients/connectors/serverconnection.go | 34 ++++++------- internal/clients/connectors/serverless.go | 21 +++++---- internal/clients/grepclient.go | 13 +++-- internal/clients/handlers/basehandler.go | 6 +-- internal/clients/handlers/clienthandler.go | 4 +- internal/clients/handlers/maprhandler.go | 8 ++-- internal/clients/maprclient.go | 40 ++++++++-------- internal/clients/stats.go | 8 ++-- internal/clients/tailclient.go | 16 ++++--- 12 files changed, 107 insertions(+), 146 deletions(-) delete mode 100644 internal/clients/args.go (limited to 'internal/clients') diff --git a/internal/clients/args.go b/internal/clients/args.go deleted file mode 100644 index dc71e83..0000000 --- a/internal/clients/args.go +++ /dev/null @@ -1,63 +0,0 @@ -package clients - -import ( - "flag" - "fmt" - "strings" - - "github.com/mimecast/dtail/internal/omode" - - gossh "golang.org/x/crypto/ssh" -) - -// Args is a helper struct to summarize common client arguments. -type Args struct { - Mode omode.Mode - ServersStr string - UserName string - What string - Arguments []string - RegexStr string - RegexInvert bool - TrustAllHosts bool - Discovery string - ConnectionsPerCPU int - Timeout int - SSHAuthMethods []gossh.AuthMethod - SSHHostKeyCallback gossh.HostKeyCallback - PrivateKeyPathFile string - Quiet bool - Spartan bool - NoColor bool - Serverless bool -} - -// Transform the arguments based on certain conditions. -func (a *Args) Transform(args []string) { - // Interpret additional args as file list. - if a.What == "" { - var files []string - for _, file := range flag.Args() { - files = append(files, file) - } - a.What = strings.Join(files, ",") - } - - if a.Spartan { - a.Quiet = true - a.NoColor = true - } - - if a.Discovery == "" && a.ServersStr == "" { - a.Serverless = true - } -} - -// TransformAfterConfigFile same as Transform, but after the config file has been read. -func (a *Args) TransformAfterConfigFile() { - // TODO: Remove this method. It's not used. -} - -func (a *Args) SerializeOptions() string { - return fmt.Sprintf("quiet=%v:spartan=%v", a.Quiet, a.Spartan) -} diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 5523052..fc01955 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -6,8 +6,9 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/connectors" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/discovery" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" "github.com/mimecast/dtail/internal/ssh/client" @@ -17,7 +18,7 @@ import ( // This is the main client data structure. type baseClient struct { - Args + config.Args // To display client side stats stats *stats // List of remote servers to connect to. @@ -39,7 +40,8 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Debug("Initiating base client") + dlog.Client.Debug("Initiating base client") + dlog.Client.Debug(c.Args.String()) flag := regex.Default if c.Args.RegexInvert { @@ -47,11 +49,14 @@ func (c *baseClient) init() { } regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { - logger.FatalExit(c.Regex, "invalid regex!", err, regex) + dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex) } c.Regex = regex - logger.Debug("Regex", c.Regex) + dlog.Client.Debug("Regex", c.Regex) + if c.Args.Serverless { + return + } c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile) } @@ -67,8 +72,11 @@ func (c *baseClient) makeConnections(maker maker) { } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { - // Periodically check for unknown hosts, and ask the user whether to trust them or not. - go c.hostKeyCallback.PromptAddHosts(ctx) + // Can be nil when serverless. + if c.hostKeyCallback != nil { + // Periodically check for unknown hosts, and ask the user whether to trust them or not. + go c.hostKeyCallback.PromptAddHosts(ctx) + } // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) @@ -112,7 +120,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con } time.Sleep(time.Second * 2) - logger.Debug(conn.Server(), "Reconnecting") + dlog.Client.Debug(conn.Server(), "Reconnecting") conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) c.connections[i] = conn } @@ -127,7 +135,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Debug("Terminated connection") + defer dlog.Client.Debug("Terminated connection") // We want to have at least one active connection <-active @@ -143,7 +151,7 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { if numActive == 0 { return } - logger.Debug("Active connections", numActive) + dlog.Client.Debug("Active connections", numActive) time.Sleep(time.Second) } } diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index d14cdcc..2726e7e 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) @@ -16,7 +18,7 @@ type CatClient struct { } // NewCatClient returns a new cat client. -func NewCatClient(args Args) (*CatClient, error) { +func NewCatClient(args config.Args) (*CatClient, error) { if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } @@ -42,11 +44,13 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), - c.Args.SerializeOptions(), - file, c.Regex.Serialize())) + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } return } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 0904ba1..5bc63ee 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -10,7 +10,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/ssh/client" "golang.org/x/crypto/ssh" @@ -30,7 +30,7 @@ type ServerConnection struct { // NewServerConnection returns a new DTail SSH server connection. func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - logger.Debug(server, "Creating new connection", server, handler, commands) + dlog.Client.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, @@ -81,10 +81,10 @@ func (c *ServerConnection) initServerPort() { parts := strings.Split(c.server, ":") if len(parts) == 2 { - logger.Debug("Parsing port from hostname", parts) + dlog.Client.Debug("Parsing port from hostname", parts) port, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit("Unable to parse client port", c.server, parts, err) + dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err) } c.server = parts[0] c.port = port @@ -93,21 +93,21 @@ func (c *ServerConnection) initServerPort() { func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { // Throttle how many connections can be established concurrently (based on ch length) - logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) select { case throttleCh <- struct{}{}: case <-ctx.Done(): - logger.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) return } - logger.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) go func() { defer func() { if !c.throttlingDone { - logger.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } @@ -115,9 +115,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, }() if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { - logger.Warn(c.server, c.port, err) + dlog.Client.Warn(c.server, c.port, err) if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.server, c.port)) { - logger.Debug(c.server, "Not trusting host") + dlog.Client.Debug(c.server, "Not trusting host") } } }() @@ -127,14 +127,14 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, // Dail into a new SSH connection. Close connection in case of an error. func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { - logger.Debug(c.server, "Incrementing connection stats") + dlog.Client.Debug(c.server, "Incrementing connection stats") statsCh <- struct{}{} defer func() { - logger.Debug(c.server, "Decrementing connection stats") + dlog.Client.Debug(c.server, "Decrementing connection stats") <-statsCh }() - logger.Debug(c.server, "Dialing into the connection") + dlog.Client.Debug(c.server, "Dialing into the connection") address := fmt.Sprintf("%s:%d", c.server, c.port) client, err := ssh.Dial("tcp", address, c.config) @@ -148,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, // Create the SSH session. Close the session in case of an error. func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - logger.Debug(c.server, "Creating SSH session") + dlog.Client.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { @@ -160,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun } func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - logger.Debug(c.server, "Creating handler for SSH session") + dlog.Client.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { @@ -196,12 +196,12 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc // Send all commands to client. for _, command := range c.commands { - logger.Debug(command) + dlog.Client.Debug(command) c.handler.SendMessage(command) } if !c.throttlingDone { - logger.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 0500645..c7b5f62 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -6,7 +6,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" serverHandlers "github.com/mimecast/dtail/internal/server/handlers" user "github.com/mimecast/dtail/internal/user/server" ) @@ -26,7 +26,7 @@ func NewServerless(userName string, handler handlers.Handler, commands []string) commands: commands, } - logger.Debug("Creating new serverless connector", handler, commands) + dlog.Client.Debug("Creating new serverless connector", handler, commands) return &s } @@ -43,7 +43,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt defer cancel() if err := s.handle(ctx, cancel); err != nil { - logger.Warn(err) + dlog.Client.Warn(err) } }() @@ -51,7 +51,7 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt } func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { - logger.Debug("Creating server handler for a serverless session") + dlog.Client.Debug("Creating server handler for a serverless session") serverHandler := serverHandlers.NewServerHandler( user.New(s.userName, s.Server()), @@ -59,14 +59,19 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro make(chan struct{}, config.Server.MaxConcurrentTails), ) + terminate := func() { + serverHandler.Shutdown() + cancel() + } + go func() { io.Copy(serverHandler, s.handler) - cancel() + terminate() }() go func() { io.Copy(s.handler, serverHandler) - cancel() + terminate() }() go func() { @@ -74,12 +79,12 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro case <-s.handler.Done(): case <-ctx.Done(): } - cancel() + terminate() }() // Send all commands to client. for _, command := range s.commands { - logger.Debug(command) + dlog.Client.Debug(command) s.handler.SendMessage(command) } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index ea5022b..ae21ff2 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) @@ -16,7 +18,7 @@ type GrepClient struct { } // NewGrepClient creates a new grep client. -func NewGrepClient(args Args) (*GrepClient, error) { +func NewGrepClient(args config.Args) (*GrepClient, error) { if args.RegexStr == "" { return nil, errors.New("No regex specified, use '-regex' flag") } @@ -41,12 +43,13 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), - c.Args.SerializeOptions(), - file, - c.Regex.Serialize())) + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } return diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index af1ad62..3291b43 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "time" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -51,7 +51,7 @@ func (h *baseHandler) Shutdown() { // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) - logger.Debug("Sending command", h.server, command, encoded) + dlog.Client.Debug("Sending command", h.server, command, encoded) select { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): @@ -112,7 +112,7 @@ func (h *baseHandler) handleMessageType(message string) { return } - logger.Raw(message) + dlog.Client.Raw(message) } // Handle messages received from server which are not meant to be displayed diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 2bcb038..27ac85e 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -2,7 +2,7 @@ package handlers import ( "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ClientHandler is the basic client handler interface. @@ -12,7 +12,7 @@ type ClientHandler struct { // NewClientHandler creates a new client handler. func NewClientHandler(server string) *ClientHandler { - logger.Debug(server, "Creating new client handler") + dlog.Client.Debug(server, "Creating new client handler") return &ClientHandler{ baseHandler{ diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 65b1454..848e7f0 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,7 +4,7 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/protocol" @@ -40,7 +40,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { continue case protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() - logger.Debug(message) + dlog.Client.Debug(message) if message[0] == 'A' { h.handleAggregateMessage(message) } else { @@ -60,10 +60,10 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { - logger.Error("Unable to aggregate data", h.server, message, err) + dlog.Client.Error("Unable to aggregate data", h.server, message, err) } } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index e6ab96b..f23aa08 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -11,7 +11,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" ) @@ -44,14 +44,14 @@ type MaprClient struct { } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { +func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { if queryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } query, err := mapr.NewQuery(queryStr) if err != nil { - logger.FatalExit(queryStr, "Can't parse mapr query", err) + dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err) } // Don't retry connection if in tail mode and no outfile specified. @@ -68,7 +68,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* cumulative = args.Mode == omode.MapClient || query.HasOutfile() } - logger.Debug("Cumulative mapreduce mode?", cumulative) + dlog.Client.Debug("Cumulative mapreduce mode?", cumulative) c := MaprClient{ baseClient: baseClient{ @@ -103,7 +103,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Debug("Received final mapreduce result") + dlog.Client.Debug("Received final mapreduce result") c.reportResults() } @@ -123,15 +123,17 @@ func (c MaprClient) makeCommands() (commands []string) { } for _, file := range strings.Split(c.What, ",") { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, + modeStr, file, regex)) continue } commands = append(commands, fmt.Sprintf("%s:%s %s %s", - modeStr, - c.Args.SerializeOptions(), - file, - c.Regex.Serialize())) + modeStr, c.Args.SerializeOptions(), file, regex)) } return @@ -141,7 +143,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): - logger.Debug("Gathering interim mapreduce result") + dlog.Client.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -177,17 +179,17 @@ func (c *MaprClient) printResults() { } if err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } if result == c.lastResult { - logger.Debug("Result hasn't changed compared to last time...") + dlog.Client.Debug("Result hasn't changed compared to last time...") return } c.lastResult = result if numRows == 0 { - logger.Debug("Empty result set this time...") + dlog.Client.Debug("Empty result set this time...") return } @@ -198,24 +200,24 @@ func (c *MaprClient) printResults() { config.Client.TermColors.MaprTable.RawQueryBg, config.Client.TermColors.MaprTable.RawQueryAttr) } - logger.Raw(rawQuery) + dlog.Client.Raw(rawQuery) if rowsLimit > 0 && numRows > rowsLimit { - logger.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!", + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } - logger.Raw(result) + dlog.Client.Raw(result) } func (c *MaprClient) writeResultsToOutfile() { if c.cumulative { if err := c.globalGroup.WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } return } if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 6da443c..fbef572 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -10,7 +10,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -67,7 +67,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < s.printStatsDueInterrupt(messages) default: data := s.statsData(connected, newConnections, throttle) - logger.Mapreduce("STATS", data) + dlog.Client.Mapreduce("STATS", data) } connectedLast = connected @@ -78,7 +78,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < } func (s *stats) printStatsDueInterrupt(messages []string) { - logger.Pause() + dlog.Client.Pause() for i, message := range messages { if i > 0 && config.Client.TermColorsEnable { fmt.Println(color.PaintStrWithAttr(message, @@ -91,7 +91,7 @@ func (s *stats) printStatsDueInterrupt(messages []string) { fmt.Println(fmt.Sprintf(" %s", message)) } time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) - logger.Resume() + dlog.Client.Resume() } func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} { diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 360354b..d42a0e4 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -6,7 +6,8 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) @@ -16,7 +17,7 @@ type TailClient struct { } // NewTailClient returns a new TailClient. -func NewTailClient(args Args) (*TailClient, error) { +func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient c := TailClient{ @@ -38,14 +39,15 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), - c.Args.SerializeOptions(), - file, - c.Regex.Serialize())) + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - logger.Debug(commands) + dlog.Client.Debug(commands) return } -- cgit v1.2.3 From fcaa94c7453efa0d74e330128c0f5c2cde8f11b3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 26 Sep 2021 16:42:47 +0300 Subject: refactor config reader - also looks in additional search paths for config file unless NONE is specified --- internal/clients/connectors/serverless.go | 7 ++++++- internal/clients/handlers/basehandler.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index c7b5f62..7740aab 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -53,8 +53,13 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { dlog.Client.Debug("Creating server handler for a serverless session") + user, err := user.New(s.userName, s.Server()) + if err != nil { + return err + } + serverHandler := serverHandlers.NewServerHandler( - user.New(s.userName, s.Server()), + user, make(chan struct{}, config.Server.MaxConcurrentCats), make(chan struct{}, config.Server.MaxConcurrentTails), ) diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 3291b43..8acb45f 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { /* - // TODO: Next DTail version make it so that '\n' gets ignored. For now + // NEXT: Next DTail version make it so that '\n' gets ignored. For now // leave it for compatibility with older DTail server + ability to display // the protocol mismatch warn message. case '\n' { -- cgit v1.2.3 From 07e1470892beacf0722276f94bacbd822b002540 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 3 Oct 2021 13:09:32 +0300 Subject: add dmap tests --- internal/clients/maprclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index f23aa08..19fe119 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -203,7 +203,7 @@ func (c *MaprClient) printResults() { dlog.Client.Raw(rawQuery) if rowsLimit > 0 && numRows > rowsLimit { - dlog.Client.Warn(fmt.Sprintf("Got %d results but limited output to %d rows! Use 'limit' clause to override!", + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } dlog.Client.Raw(result) -- cgit v1.2.3 From f70622f307629a2542ea5eb128dea8c1043d3a40 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 10:00:38 +0300 Subject: more on this --- internal/clients/baseclient.go | 8 +- internal/clients/connectors/serverconnection.go | 19 ---- internal/clients/connectors/serverless.go | 22 +++-- internal/clients/handlers/healthhandler.go | 114 ++++++++++-------------- internal/clients/healthclient.go | 97 +++++--------------- 5 files changed, 86 insertions(+), 174 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index fc01955..5ac298f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -86,7 +86,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var mutex sync.Mutex for i, conn := range c.connections { go func(i int, conn connectors.Connector) { - connStatus := c.start(ctx, active, i, conn) + connStatus := c.startConnection(ctx, active, i, conn) // Update global status. mutex.Lock() @@ -97,11 +97,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } + time.Sleep(time.Second * 2) c.waitUntilDone(ctx, active) return } -func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { +func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { // Increment connection count active <- struct{}{} // Derement connection count @@ -146,12 +147,13 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { <-ctx.Done() } + // TODO: Rewrite this to use a wait group. for { numActive := len(active) if numActive == 0 { return } dlog.Client.Debug("Active connections", numActive) - time.Sleep(time.Second) + time.Sleep(time.Second * time.Millisecond * 100) } } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 5bc63ee..1666a79 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -23,7 +23,6 @@ type ServerConnection struct { config *ssh.ClientConfig handler handlers.Handler commands []string - isOneOff bool hostKeyCallback client.HostKeyCallback throttlingDone bool } @@ -49,24 +48,6 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM return &c } -// NewOneOffServerConnection creates new one-off connection (only for sending a series of commands and then quit). -func NewOneOffServerConnection(server string, userName string, authMethods []ssh.AuthMethod, handler handlers.Handler, commands []string) *ServerConnection { - c := ServerConnection{ - server: server, - handler: handler, - commands: commands, - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - }, - isOneOff: true, - } - - c.initServerPort() - return &c -} - func (c *ServerConnection) Server() string { return c.server } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 7740aab..ae72c9b 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -20,14 +20,12 @@ type Serverless struct { // NewServerConnection returns a new connection. func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { - s := Serverless{ + dlog.Client.Debug("Creating new serverless connector", handler, commands) + return &Serverless{ userName: userName, handler: handler, commands: commands, } - - dlog.Client.Debug("Creating new serverless connector", handler, commands) - return &s } func (s *Serverless) Server() string { @@ -58,11 +56,17 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro return err } - serverHandler := serverHandlers.NewServerHandler( - user, - make(chan struct{}, config.Server.MaxConcurrentCats), - make(chan struct{}, config.Server.MaxConcurrentTails), - ) + var serverHandler serverHandlers.Handler + switch s.userName { + case config.ControlUser: + serverHandler = serverHandlers.NewControlHandler(user) + default: + serverHandler = serverHandlers.NewServerHandler( + user, + make(chan struct{}, config.Server.MaxConcurrentCats), + make(chan struct{}, config.Server.MaxConcurrentTails), + ) + } terminate := func() { serverHandler.Shutdown() diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index eca0348..4949985 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,90 +1,72 @@ package handlers import ( - "bytes" - "errors" "fmt" - "time" + "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler implements the handler required for health checks. +// HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { - done *internal.Done - // Buffer of incoming data from server. - receiveBuf bytes.Buffer - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string - // The return status. - status int + baseHandler + HealthStatusCh chan<- int } -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - status: -1, - done: internal.NewDone(), +// NewHealthHandler returns a new health client handler. +func NewHealthHandler(server string) *HealthHandler { + dlog.Client.Debug(server, "Creating new health handler") + return &HealthHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + status: -1, + done: internal.NewDone(), + }, + HealthStatusCh: make(chan int), } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Status of the handler. -func (h *HealthHandler) Status() int { - return h.status -} - -// Done returns done channel of the handler. -func (h *HealthHandler) Done() <-chan struct{} { - return h.done.Done() } -// Shutdown the handler. -func (h *HealthHandler) Shutdown() { - h.done.Shutdown() -} - -// SendMessage sends a DTail command to the server. -func (h *HealthHandler) SendMessage(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - case <-h.Done(): - } - - return nil -} - -// Server writes byte stream to client. +// Read data from the dtail server via Writer interface. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf.WriteByte(b) - if b == protocol.MessageDelimiter { - h.receive <- h.receiveBuf.String() - h.receiveBuf.Reset() + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + h.handleHealthMessage(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return +func (h *HealthHandler) handleHealthMessage(message string) { + s := strings.Split(message, protocol.FieldDelimiter) + message = s[len(s)-1] + status := strings.Split(message, ":") + fmt.Println(status) + /* + switch status { + case "OK": + h.HealthStatusCh <- 0 + case "WARNING": + h.HealthStatusCh <- 1 + case "CRITICAL": + h.HealthStatusCh <- 2 + case "UNKNOWN": + h.HealthStatusCh <- 3 + default: + fmt.Println("CRITICAL: Unexpected server response: '%s'") + h.HealthStatusCh <- 2 + } + */ } diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 47007b6..df919ae 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -1,101 +1,44 @@ package clients import ( - "context" - "fmt" "runtime" - "strings" - "time" - "github.com/mimecast/dtail/internal/clients/connectors" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/protocol" gossh "golang.org/x/crypto/ssh" ) -// HealthClient is used for health checking (e.g. via Nagios) +// HealthClient is used to perform a basic server health check. type HealthClient struct { - // Client operating mode - mode omode.Mode - // The remote server address - server string - // SSH user name - userName string - // SSH auth methods to use to connect to the remote servers. - sshAuthMethods []gossh.AuthMethod + baseClient } -// NewHealthClient returns a new healh client. -func NewHealthClient(mode omode.Mode) (*HealthClient, error) { +// NewHealthClient returns a new health client. +func NewHealthClient(args config.Args) (*HealthClient, error) { + args.Mode = omode.HealthClient + args.UserName = config.ControlUser c := HealthClient{ - mode: mode, - server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort), - userName: config.ControlUser, + baseClient: baseClient{ + Args: args, + throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), + retry: false, + }, } - c.initSSHAuthMethods() + + c.init() + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + c.makeConnections(c) return &c, nil } -// Start the health client. -func (c *HealthClient) Start(ctx context.Context) (status int) { - receive := make(chan string) - - throttleCh := make(chan struct{}, runtime.NumCPU()) - statsCh := make(chan struct{}, 1) - - conn := connectors.NewOneOffServerConnection( - c.server, - c.userName, - c.sshAuthMethods, - handlers.NewHealthHandler(c.server, receive), - []string{c.mode.String()}, - ) - - connCtx, cancel := context.WithCancel(ctx) - go conn.Start(connCtx, cancel, throttleCh, statsCh) - - for { - select { - case data := <-receive: - // Parse recieved data. - s := strings.Split(data, protocol.FieldDelimiter) - message := s[len(s)-1] - if strings.HasPrefix(message, "done;") { - return - } - - // Set severity. - s = strings.Split(message, ":") - switch s[0] { - case "OK": - case "WARNING": - if status < 1 { - status = 1 - } - case "CRITICAL": - status = 2 - case "UNKNOWN": - status = 3 - default: - fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message) - status = 2 - return - } - fmt.Print(message) - - case <-time.After(time.Second * 2): - status = 2 - fmt.Println("CRITICAL: Could not communicate with DTail server") - return - } - } +func (c HealthClient) makeHandler(server string) handlers.Handler { + return handlers.NewHealthHandler(server) } -// Initialize SSH auth methods. -func (c *HealthClient) initSSHAuthMethods() { - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) +func (c HealthClient) makeCommands() (commands []string) { + commands = append(commands, "health") + return } -- cgit v1.2.3 From 9f395a03f25941d8ed98ec43035688daa1e8877f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 22:39:58 +0300 Subject: more on this --- internal/clients/baseclient.go | 47 +++++++------------------------ internal/clients/connectors/serverless.go | 1 + internal/clients/handlers/basehandler.go | 23 ++++++--------- internal/clients/maprclient.go | 2 ++ 4 files changed, 22 insertions(+), 51 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 5ac298f..9574e13 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -9,7 +9,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/discovery" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" "github.com/mimecast/dtail/internal/ssh/client" @@ -80,13 +79,14 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) - // Keep count of active connections - active := make(chan struct{}, len(c.connections)) + var wg sync.WaitGroup + wg.Add(len(c.connections)) var mutex sync.Mutex for i, conn := range c.connections { go func(i int, conn connectors.Connector) { - connStatus := c.startConnection(ctx, active, i, conn) + defer wg.Done() + connStatus := c.startConnection(ctx, i, conn) // Update global status. mutex.Lock() @@ -97,17 +97,11 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } - time.Sleep(time.Second * 2) - c.waitUntilDone(ctx, active) + wg.Wait() return } -func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { - // Increment connection count - active <- struct{}{} - // Derement connection count - defer func() { <-active }() - +func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { for { connCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -127,33 +121,12 @@ func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, } } -func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { +func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, + hostKeyCallback client.HostKeyCallback) connectors.Connector { if c.Args.Serverless { - return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands()) + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), + c.maker.makeCommands()) } return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } - -func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer dlog.Client.Debug("Terminated connection") - - // We want to have at least one active connection - <-active - // Put it back on the channel - active <- struct{}{} - - if c.Mode == omode.TailClient && c.retry { - <-ctx.Done() - } - - // TODO: Rewrite this to use a wait group. - for { - numActive := len(active) - if numActive == 0 { - return - } - dlog.Client.Debug("Active connections", numActive) - time.Sleep(time.Second * time.Millisecond * 100) - } -} diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index ae72c9b..2a1cec4 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -69,6 +69,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } terminate := func() { + dlog.Client.Debug("Terminating serverless connection") serverHandler.Shutdown() cancel() } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 8acb45f..04124e7 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -40,14 +40,6 @@ func (h *baseHandler) Status() int { return h.status } -func (h *baseHandler) Done() <-chan struct{} { - return h.done.Done() -} - -func (h *baseHandler) Shutdown() { - h.done.Shutdown() -} - // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -77,12 +69,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - /* - // dcat/grep should actually display empty lines. - if len(message) == 0 { - continue - } - */ h.handleMessageType(message) h.receiveBuf.Reset() default: @@ -121,5 +107,14 @@ func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") + h.Shutdown() } } + +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 19fe119..92bbe39 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -110,6 +110,8 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i return } +// NEXT: Make this a callback function rather trying to use polymorphism to call this. +// This applies to all clients. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -- cgit v1.2.3 From fab5dc3e70434ea0abc7a0976487a1973b662331 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 09:50:41 +0300 Subject: enable faster shutdown - useful for dgrep/dmap and dcat commands --- internal/clients/baseclient.go | 14 +++++++------- internal/clients/connectors/serverless.go | 15 +++++++++++---- internal/clients/handlers/basehandler.go | 8 +++----- internal/clients/handlers/healthhandler.go | 14 +++++++------- internal/clients/handlers/maprhandler.go | 2 +- internal/clients/healthclient.go | 4 ++-- 6 files changed, 31 insertions(+), 26 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 9574e13..b474208 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,8 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - dlog.Client.Debug("Initiating base client") - dlog.Client.Debug(c.Args.String()) + dlog.Client.Debug("Initiating base client", c.Args.String()) flag := regex.Default if c.Args.RegexInvert { @@ -48,15 +47,16 @@ func (c *baseClient) init() { } regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { - dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex) + dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex) } c.Regex = regex - dlog.Client.Debug("Regex", c.Regex) if c.Args.Serverless { return } - c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile) + c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods( + c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, + c.throttleCh, c.Args.PrivateKeyPathFile) } func (c *baseClient) makeConnections(maker maker) { @@ -71,6 +71,7 @@ func (c *baseClient) makeConnections(maker maker) { } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { + dlog.Client.Trace("Starting base client") // Can be nil when serverless. if c.hostKeyCallback != nil { // Periodically check for unknown hosts, and ask the user whether to trust them or not. @@ -81,13 +82,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var wg sync.WaitGroup wg.Add(len(c.connections)) - var mutex sync.Mutex + for i, conn := range c.connections { go func(i int, conn connectors.Connector) { defer wg.Done() connStatus := c.startConnection(ctx, i, conn) - // Update global status. mutex.Lock() defer mutex.Unlock() diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 2a1cec4..768a5ce 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -37,6 +37,7 @@ func (s *Serverless) Handler() handlers.Handler { } func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -44,7 +45,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt dlog.Client.Warn(err) } }() - <-ctx.Done() } @@ -58,9 +58,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro var serverHandler serverHandlers.Handler switch s.userName { - case config.ControlUser: - serverHandler = serverHandlers.NewControlHandler(user) + case config.HealthUser: + dlog.Client.Debug("Creating serverless health handler") + serverHandler = serverHandlers.NewHealthHandler(user) default: + dlog.Client.Debug("Creating serverless server handler") serverHandler = serverHandlers.NewServerHandler( user, make(chan struct{}, config.Server.MaxConcurrentCats), @@ -76,29 +78,34 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro go func() { io.Copy(serverHandler, s.handler) + dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() go func() { io.Copy(s.handler, serverHandler) + dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() go func() { select { case <-s.handler.Done(): + dlog.Client.Trace("<-s.handler.Done()") case <-ctx.Done(): + dlog.Client.Trace("<-ctx.Done()") } terminate() }() // Send all commands to client. for _, command := range s.commands { - dlog.Client.Debug(command) + dlog.Client.Debug("Sending command to serverless server", command) s.handler.SendMessage(command) } <-ctx.Done() + dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() return nil diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 04124e7..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - h.handleMessageType(message) + h.handleMessage(message) h.receiveBuf.Reset() default: h.receiveBuf.WriteByte(b) @@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - // Hidden server commands starti with a dot "." +func (h *baseHandler) handleMessage(message string) { if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return @@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) { func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): - h.SendMessage(".ack close connection") + go h.SendMessage(".ack close connection") h.Shutdown() } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4949985..4b16ce4 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -12,7 +12,6 @@ import ( // HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { baseHandler - HealthStatusCh chan<- int } // NewHealthHandler returns a new health client handler. @@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler { status: -1, done: internal.NewDone(), }, - HealthStatusCh: make(chan int), } } @@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { - case '\n': - continue - case protocol.MessageDelimiter: + case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() dlog.Client.Debug(message) - h.handleHealthMessage(message) + h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) @@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { return len(p), nil } -func (h *HealthHandler) handleHealthMessage(message string) { +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] status := strings.Split(message, ":") diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 848e7f0..d1acfbd 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { if message[0] == 'A' { h.handleAggregateMessage(message) } else { - h.baseHandler.handleMessageType(message) + h.baseHandler.handleMessage(message) } h.baseHandler.receiveBuf.Reset() default: diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index df919ae..e2c8ccb 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -18,7 +18,7 @@ type HealthClient struct { // NewHealthClient returns a new health client. func NewHealthClient(args config.Args) (*HealthClient, error) { args.Mode = omode.HealthClient - args.UserName = config.ControlUser + args.UserName = config.HealthUser c := HealthClient{ baseClient: baseClient{ Args: args, @@ -28,7 +28,7 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { } c.init() - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) return &c, nil -- cgit v1.2.3 From 7306afe9ab073c424ddca0ddc57950f237948118 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 10:55:50 +0300 Subject: move health check to separate client binary --- internal/clients/baseclient.go | 1 - internal/clients/handlers/healthhandler.go | 25 ++++--------------------- internal/clients/healthclient.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 22 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index b474208..d5d7c2c 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -88,7 +88,6 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i go func(i int, conn connectors.Connector) { defer wg.Done() connStatus := c.startConnection(ctx, i, conn) - // Update global status. mutex.Lock() defer mutex.Unlock() if connStatus > status { diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4b16ce4..10ba1f7 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,7 +1,6 @@ package handlers import ( - "fmt" "strings" "github.com/mimecast/dtail/internal" @@ -22,7 +21,7 @@ func NewHealthHandler(server string) *HealthHandler { server: server, shellStarted: false, commands: make(chan string), - status: -1, + status: 2, // Assume CRITICAL status by default. done: internal.NewDone(), }, } @@ -34,14 +33,12 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { switch b { case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() - dlog.Client.Debug(message) h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) } } - return len(p), nil } @@ -52,21 +49,7 @@ func (h *HealthHandler) handleMessage(message string) { } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] - status := strings.Split(message, ":") - fmt.Println(status) - /* - switch status { - case "OK": - h.HealthStatusCh <- 0 - case "WARNING": - h.HealthStatusCh <- 1 - case "CRITICAL": - h.HealthStatusCh <- 2 - case "UNKNOWN": - h.HealthStatusCh <- 3 - default: - fmt.Println("CRITICAL: Unexpected server response: '%s'") - h.HealthStatusCh <- 2 - } - */ + if message == "OK" { + h.baseHandler.status = 0 + } } diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index e2c8ccb..ac1dc20 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -1,6 +1,8 @@ package clients import ( + "context" + "fmt" "runtime" "github.com/mimecast/dtail/internal/clients/handlers" @@ -42,3 +44,30 @@ func (c HealthClient) makeCommands() (commands []string) { commands = append(commands, "health") return } + +func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int { + status := c.baseClient.Start(ctx, statsCh) + + switch status { + case 0: + if c.Serverless { + fmt.Printf("WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port\n") + return 1 + } + fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr) + case 2: + if c.Serverless { + fmt.Printf("CRITICAL: DTail server not operating properly (using serverless connction)!\n") + return 2 + } + fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", c.ServersStr) + default: + if c.Serverless { + fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless connection)\n", status) + return status + } + fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", status, c.ServersStr) + } + + return status +} -- cgit v1.2.3 From 2d7ddbeae8286373ac19787dc7dde598a7cb0598 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 8 Oct 2021 11:43:43 +0300 Subject: refactor --- internal/clients/maprclient.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 92bbe39..412a219 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -31,8 +31,6 @@ const ( // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient - // Query string for mapr aggregations - queryStr string // Global group set for merged mapr aggregation results globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) @@ -44,14 +42,14 @@ type MaprClient struct { } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { - if queryStr == "" { +func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) { + if args.QueryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } - query, err := mapr.NewQuery(queryStr) + query, err := mapr.NewQuery(args.QueryStr) if err != nil { - dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err) + dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err) } // Don't retry connection if in tail mode and no outfile specified. @@ -77,7 +75,6 @@ func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientM retry: retry, }, query: query, - queryStr: queryStr, cumulative: cumulative, } -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/clients/baseclient.go | 7 ++-- internal/clients/catclient.go | 2 -- internal/clients/connectors/serverconnection.go | 48 +++++++++++++++---------- internal/clients/connectors/serverless.go | 16 +++++---- internal/clients/grepclient.go | 5 ++- internal/clients/handlers/healthhandler.go | 3 +- internal/clients/handlers/maprhandler.go | 13 ++++--- internal/clients/healthclient.go | 17 +++++---- internal/clients/maprclient.go | 13 +++---- internal/clients/stats.go | 11 +++--- internal/clients/tailclient.go | 3 -- 11 files changed, 77 insertions(+), 61 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d5d7c2c..4a7bd84 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -64,7 +64,8 @@ func (c *baseClient) makeConnections(maker maker) { discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle) for _, server := range discoveryService.ServerList() { - c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) + c.connections = append(c.connections, c.makeConnection(server, + c.sshAuthMethods, c.hostKeyCallback)) } c.stats = newTailStats(len(c.connections)) @@ -100,7 +101,9 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { +func (c *baseClient) startConnection(ctx context.Context, i int, + conn connectors.Connector) (status int) { + for { connCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 2726e7e..bd65560 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -22,7 +22,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } - args.Mode = omode.CatClient c := CatClient{ @@ -35,7 +34,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { c.init() c.makeConnections(c) - return &c, nil } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 1666a79..2d7b45a 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -16,7 +16,8 @@ import ( "golang.org/x/crypto/ssh" ) -// ServerConnection represents a connection to a single remote dtail server via SSH protocol. +// ServerConnection represents a connection to a single remote dtail server via +// SSH protocol. type ServerConnection struct { server string port int @@ -28,9 +29,11 @@ type ServerConnection struct { } // NewServerConnection returns a new DTail SSH server connection. -func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - dlog.Client.Debug(server, "Creating new connection", server, handler, commands) +func NewServerConnection(server string, userName string, + authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, + handler handlers.Handler, commands []string) *ServerConnection { + dlog.Client.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, @@ -48,10 +51,12 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM return &c } +// Server returns the server hostname connected to. func (c *ServerConnection) Server() string { return c.server } +// Handler returns the handler used for the connection. func (c *ServerConnection) Handler() handlers.Handler { return c.handler } @@ -72,23 +77,29 @@ func (c *ServerConnection) initServerPort() { } } -func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the connection to the server. +func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + // Throttle how many connections can be established concurrently (based on ch length) dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) select { case throttleCh <- struct{}{}: case <-ctx.Done(): - dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Not establishing connection as context is done", + len(throttleCh), cap(throttleCh)) return } - dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Throttling says that the connection can be established", + len(throttleCh), cap(throttleCh)) go func() { defer func() { if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (1)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } @@ -107,7 +118,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, } // Dail into a new SSH connection. Close connection in case of an error. -func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { +func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) error { + dlog.Client.Debug(c.server, "Incrementing connection stats") statsCh <- struct{}{} defer func() { @@ -128,31 +141,30 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, } // Create the SSH session. Close the session in case of an error. -func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating SSH session") +func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, + client *ssh.Client, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { return err } defer session.Close() - return c.handle(ctx, cancel, session, throttleCh) } -func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating handler for SSH session") +func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, + session *ssh.Session, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { return err } - stdoutPipe, err := session.StdoutPipe() if err != nil { return err } - if err := session.Shell(); err != nil { return err } @@ -161,12 +173,10 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc io.Copy(stdinPipe, c.handler) cancel() }() - go func() { io.Copy(c.handler, stdoutPipe) cancel() }() - go func() { select { case <-c.handler.Done(): @@ -182,13 +192,13 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc } if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (2)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } <-ctx.Done() c.handler.Shutdown() - return nil } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 768a5ce..2ff490a 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -18,8 +18,10 @@ type Serverless struct { userName string } -// NewServerConnection returns a new connection. -func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { +// NewServerless starts a new serverless session. +func NewServerless(userName string, handler handlers.Handler, + commands []string) *Serverless { + dlog.Client.Debug("Creating new serverless connector", handler, commands) return &Serverless{ userName: userName, @@ -28,15 +30,20 @@ func NewServerless(userName string, handler handlers.Handler, commands []string) } } +// Server returns serverless server indicator. func (s *Serverless) Server() string { return "local(serverless)" } +// Handler returns the handler used for the serverless connection. func (s *Serverless) Handler() handlers.Handler { return s.handler } -func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the serverless connection. +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -81,13 +88,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() - go func() { io.Copy(s.handler, serverHandler) dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() - go func() { select { case <-s.handler.Done(): @@ -107,6 +112,5 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro <-ctx.Done() dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() - return nil } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index ae21ff2..7521c67 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -12,7 +12,8 @@ import ( "github.com/mimecast/dtail/internal/omode" ) -// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. +// GrepClient searches a remote file for all lines matching a regular +// expression. Only the matching lines are displayed. type GrepClient struct { baseClient } @@ -34,7 +35,6 @@ func NewGrepClient(args config.Args) (*GrepClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -51,6 +51,5 @@ func (c GrepClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - return } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 10ba1f7..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,7 +8,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler is the handler used on the client side for running mapreduce aggregations. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { baseHandler } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d1acfbd..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -10,7 +10,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate @@ -18,7 +19,9 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index ac1dc20..1a02827 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -32,7 +32,6 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { c.init() c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) - return &c, nil } @@ -45,28 +44,34 @@ func (c HealthClient) makeCommands() (commands []string) { return } +// Start the health client. func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int { status := c.baseClient.Start(ctx, statsCh) switch status { case 0: if c.Serverless { - fmt.Printf("WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port\n") + fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" + + ", please specify a remote server via --server hostname:port\n") return 1 } fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr) case 2: if c.Serverless { - fmt.Printf("CRITICAL: DTail server not operating properly (using serverless connction)!\n") + fmt.Printf("CRITICAL: DTail server not operating properly (using " + + "serverless connction)!\n") return 2 } - fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", c.ServersStr) + fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", + c.ServersStr) default: if c.Serverless { - fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless connection)\n", status) + fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+ + "connection)\n", status) return status } - fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", status, c.ServersStr) + fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", + status, c.ServersStr) } return status diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 412a219..04f258d 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -107,15 +107,14 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -// NEXT: Make this a callback function rather trying to use polymorphism to call this. -// This applies to all clients. +// NEXT: Make this a callback function rather trying to use polymorphism to call +// this. This applies to all clients. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" @@ -134,7 +133,6 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, c.Args.SerializeOptions(), file, regex)) } - return } @@ -155,7 +153,6 @@ func (c *MaprClient) reportResults() { c.writeResultsToOutfile() return } - c.printResults() } @@ -176,7 +173,6 @@ func (c *MaprClient) printResults() { } else { result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } - if err != nil { dlog.Client.FatalPanic(err) } @@ -202,8 +198,8 @@ func (c *MaprClient) printResults() { dlog.Client.Raw(rawQuery) if rowsLimit > 0 && numRows > rowsLimit { - dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output to %d rows! Use 'limit' clause to override!", - numRows, rowsLimit)) + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+ + "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } dlog.Client.Raw(result) } @@ -215,7 +211,6 @@ func (c *MaprClient) writeResultsToOutfile() { } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { dlog.Client.FatalPanic(err) } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index fbef572..1315aea 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -36,9 +36,10 @@ func newTailStats(servers int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { - var connectedLast int +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, + statsCh <-chan string, quiet bool) { + var connectedLast int for { var force bool var messages []string @@ -94,7 +95,9 @@ func (s *stats) printStatsDueInterrupt(messages []string) { dlog.Client.Resume() } -func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} { +func (s *stats) statsData(connected, newConnections int, + throttle int) map[string]interface{} { + percConnected := percentOf(float64(s.servers), float64(connected)) data := make(map[string]interface{}) @@ -112,7 +115,6 @@ func (s *stats) statsData(connected, newConnections int, throttle int) map[strin func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb := strings.Builder{} - i := 0 for k, v := range s.statsData(connected, newConnections, throttle) { if i > 0 { @@ -123,7 +125,6 @@ func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb.WriteString(fmt.Sprintf("%v", v)) i++ } - return sb.String() } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index d42a0e4..35c01d4 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -19,7 +19,6 @@ type TailClient struct { // NewTailClient returns a new TailClient. func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient - c := TailClient{ baseClient: baseClient{ Args: args, @@ -30,7 +29,6 @@ func NewTailClient(args config.Args) (*TailClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -48,6 +46,5 @@ func (c TailClient) makeCommands() (commands []string) { c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } dlog.Client.Debug(commands) - return } -- cgit v1.2.3 From f44792c9102488774c9993b080f35c65287a64b1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 10 Oct 2021 14:02:12 +0300 Subject: add another dmap test - reading 100 source files at once fix a data race when reading multiple files on one server from the same session at once --- internal/clients/maprclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 04f258d..074494c 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -108,7 +108,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i } // NEXT: Make this a callback function rather trying to use polymorphism to call -// this. This applies to all clients. +// this. This applies to all clients. It will make the code easier to read. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -- cgit v1.2.3 From 71f89dc7ec7cf993d1eca98771212afe6310e9c8 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 10 Oct 2021 19:42:48 +0300 Subject: refactor --- internal/clients/connectors/serverconnection.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 2d7b45a..2737ede 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -37,6 +37,7 @@ func NewServerConnection(server string, userName string, c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, + port: config.Common.SSHPort, handler: handler, commands: commands, config: &ssh.ClientConfig{ @@ -47,25 +48,20 @@ func NewServerConnection(server string, userName string, }, } + // TODO: After reconnecting the port is wrong! Due to string slicing? c.initServerPort() return &c } // Server returns the server hostname connected to. -func (c *ServerConnection) Server() string { - return c.server -} +func (c *ServerConnection) Server() string { return c.server } // Handler returns the handler used for the connection. -func (c *ServerConnection) Handler() handlers.Handler { - return c.handler -} +func (c *ServerConnection) Handler() handlers.Handler { return c.handler } // Attempt to parse the server port address from the provided server FQDN. func (c *ServerConnection) initServerPort() { - c.port = config.Common.SSHPort parts := strings.Split(c.server, ":") - if len(parts) == 2 { dlog.Client.Debug("Parsing port from hostname", parts) port, err := strconv.Atoi(parts[1]) -- cgit v1.2.3 From c0f4ebc9b3773c37e22c686024c8cc7ce4e71f9f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 11 Oct 2021 11:51:30 +0300 Subject: add dtail integration test --- internal/clients/connectors/serverconnection.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 2737ede..1df4d73 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -19,7 +19,11 @@ import ( // ServerConnection represents a connection to a single remote dtail server via // SSH protocol. type ServerConnection struct { - server string + // The full server string as received from the server discovery (can be with port number) + server string + // Only the hostname or FQDN (without the port number) + hostname string + // Only the port number. port int config *ssh.ClientConfig handler handlers.Handler @@ -37,7 +41,6 @@ func NewServerConnection(server string, userName string, c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, - port: config.Common.SSHPort, handler: handler, commands: commands, config: &ssh.ClientConfig{ @@ -48,7 +51,6 @@ func NewServerConnection(server string, userName string, }, } - // TODO: After reconnecting the port is wrong! Due to string slicing? c.initServerPort() return &c } @@ -61,6 +63,7 @@ func (c *ServerConnection) Handler() handlers.Handler { return c.handler } // Attempt to parse the server port address from the provided server FQDN. func (c *ServerConnection) initServerPort() { + c.port = config.Common.SSHPort parts := strings.Split(c.server, ":") if len(parts) == 2 { dlog.Client.Debug("Parsing port from hostname", parts) @@ -68,7 +71,7 @@ func (c *ServerConnection) initServerPort() { if err != nil { dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err) } - c.server = parts[0] + c.hostname = parts[0] c.port = port } } @@ -103,8 +106,8 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, }() if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { - dlog.Client.Warn(c.server, c.port, err) - if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.server, c.port)) { + dlog.Client.Warn(c.server, err) + if c.hostKeyCallback.Untrusted(c.server) { dlog.Client.Debug(c.server, "Not trusting host") } } @@ -125,7 +128,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, }() dlog.Client.Debug(c.server, "Dialing into the connection") - address := fmt.Sprintf("%s:%d", c.server, c.port) + address := fmt.Sprintf("%s:%d", c.hostname, c.port) client, err := ssh.Dial("tcp", address, c.config) if err != nil { -- cgit v1.2.3 From 3c16894e997bdb235f1c0ce1291cc85a7b56118c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 19 Oct 2021 20:33:59 +0300 Subject: Bugfix: Use correct hostname when no port specified --- internal/clients/connectors/serverconnection.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 1df4d73..aeb2a41 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -63,17 +63,20 @@ func (c *ServerConnection) Handler() handlers.Handler { return c.handler } // Attempt to parse the server port address from the provided server FQDN. func (c *ServerConnection) initServerPort() { - c.port = config.Common.SSHPort parts := strings.Split(c.server, ":") - if len(parts) == 2 { - dlog.Client.Debug("Parsing port from hostname", parts) - port, err := strconv.Atoi(parts[1]) - if err != nil { - dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err) - } - c.hostname = parts[0] - c.port = port + if len(parts) == 1 { + c.hostname = c.server + c.port = config.Common.SSHPort + return + } + + dlog.Client.Debug("Parsing port from hostname", parts) + port, err := strconv.Atoi(parts[1]) + if err != nil { + dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err) } + c.hostname = parts[0] + c.port = port } // Start the connection to the server. @@ -127,8 +130,8 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, <-statsCh }() - dlog.Client.Debug(c.server, "Dialing into the connection") address := fmt.Sprintf("%s:%d", c.hostname, c.port) + dlog.Client.Debug(c.server, "Dialing into the connection", address) client, err := ssh.Dial("tcp", address, c.config) if err != nil { -- cgit v1.2.3 From 739205206d63bf42f4e843b39d04d4c8cd8207c3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 21 Oct 2021 21:02:58 +0300 Subject: backport mapreduce reporter rampup from master --- internal/clients/maprclient.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'internal/clients') diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 074494c..246946f 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -137,6 +137,10 @@ func (c MaprClient) makeCommands() (commands []string) { } func (c *MaprClient) periodicReportResults(ctx context.Context) { + rampUpSleep := c.query.Interval / 2 + dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) + time.Sleep(rampUpSleep) + for { select { case <-time.After(c.query.Interval): -- cgit v1.2.3