From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/server/handlers/serverhandler.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 185e7c2..23e3aeb 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -17,8 +17,8 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/protocol" user "github.com/mimecast/dtail/internal/user/server" - "github.com/mimecast/dtail/internal/version" ) const ( @@ -92,24 +92,27 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { } if message[0] == '.' { // Handle hidden message (don't display to the user, interpreted by dtail client) - wholePayload := []byte(fmt.Sprintf("%s\n", message)) + wholePayload := []byte(fmt.Sprintf("%s%b", message, protocol.MessageDelimiter)) n = copy(p, wholePayload) return } // Handle normal server message (display to the user) - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) + wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter)) n = copy(p, wholePayload) return case message := <-h.aggregatedMessages: // Send mapreduce-aggregated data as a message. - data := fmt.Sprintf("AGGREGATE➔%s➔%s\n", h.hostname, message) + data := fmt.Sprintf("AGGREGATE%s%s%s%s%b", + protocol.AggregateDelimiter, h.hostname, + protocol.AggregateDelimiter, message, protocol.MessageDelimiter) wholePayload := []byte(data) n = copy(p, wholePayload) return case line := <-h.lines: + //fmt.Printf("<<<%d,%s>>>\n", len(line.Content), line.Content) // Send normal file content data as a message. serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|", h.hostname, line.TransmittedPerc, line.Count, line.SourceID)) @@ -182,8 +185,8 @@ func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, err return args, argc, errors.New("unable to determine protocol version") } - if args[1] != version.ProtocolCompat { - err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", version.ProtocolCompat, args[1]) + if args[1] != protocol.ProtocolCompat { + err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", protocol.ProtocolCompat, args[1]) return args, argc, err } -- cgit v1.2.3 From 9883a190109623b64e6d311dc2b462a6eae68003 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 22 Aug 2021 10:07:00 +0300 Subject: introduces the protocol package --- internal/server/handlers/serverhandler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 23e3aeb..9541a34 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -92,13 +92,13 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { } if message[0] == '.' { // Handle hidden message (don't display to the user, interpreted by dtail client) - wholePayload := []byte(fmt.Sprintf("%s%b", message, protocol.MessageDelimiter)) + wholePayload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) n = copy(p, wholePayload) return } // Handle normal server message (display to the user) - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter)) + wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) n = copy(p, wholePayload) return @@ -112,7 +112,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { return case line := <-h.lines: - //fmt.Printf("<<<%d,%s>>>\n", len(line.Content), line.Content) + //fmt.Printf("\t<<<%d,%s>>>\n", len(line.Content), line.Content) // Send normal file content data as a message. serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|", h.hostname, line.TransmittedPerc, line.Count, line.SourceID)) -- cgit v1.2.3 From 6d727b9bdbc387c8a5c34406a2c4de9140face38 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 19:36:46 +0100 Subject: use a byte.Buffer in the file reader --- internal/server/handlers/serverhandler.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 9541a34..62f3c2b 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -15,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/protocol" @@ -114,10 +115,10 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case line := <-h.lines: //fmt.Printf("\t<<<%d,%s>>>\n", len(line.Content), line.Content) // Send normal file content data as a message. - serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|", - h.hostname, line.TransmittedPerc, line.Count, line.SourceID)) - wholePayload := append(serverInfo, line.Content[:]...) - n = copy(p, wholePayload) + payload := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|%s", + h.hostname, line.TransmittedPerc, line.Count, line.SourceID, line.Content.String())) + n = copy(p, payload) + pool.RecycleBytesBuffer(line.Content) return case <-time.After(time.Second): -- cgit v1.2.3 From 8c2e94030d0e31289c35fcfb56499707fd4a7ccd Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 20:10:54 +0100 Subject: make use of more buffers on server side --- internal/server/handlers/serverhandler.go | 40 +++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 13 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 62f3c2b..f5aefa2 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "context" "encoding/base64" "errors" @@ -93,31 +94,44 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { } if message[0] == '.' { // Handle hidden message (don't display to the user, interpreted by dtail client) - wholePayload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) - n = copy(p, wholePayload) + payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) + n = copy(p, payload) return } // Handle normal server message (display to the user) - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) - n = copy(p, wholePayload) + payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) + n = copy(p, payload) return case message := <-h.aggregatedMessages: // Send mapreduce-aggregated data as a message. - data := fmt.Sprintf("AGGREGATE%s%s%s%s%b", - protocol.AggregateDelimiter, h.hostname, - protocol.AggregateDelimiter, message, protocol.MessageDelimiter) - wholePayload := []byte(data) - n = copy(p, wholePayload) + buf := pool.BytesBuffer.Get().(*bytes.Buffer) + buf.WriteString("AGGREGATE") + buf.WriteString(protocol.AggregateDelimiter) + buf.WriteString(h.hostname) + buf.WriteString(protocol.AggregateDelimiter) + buf.WriteString(message) + buf.WriteByte(protocol.MessageDelimiter) + n = copy(p, buf.Bytes()) + pool.RecycleBytesBuffer(buf) return case line := <-h.lines: - //fmt.Printf("\t<<<%d,%s>>>\n", len(line.Content), line.Content) - // Send normal file content data as a message. - payload := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|%s", - h.hostname, line.TransmittedPerc, line.Count, line.SourceID, line.Content.String())) + buf := pool.BytesBuffer.Get().(*bytes.Buffer) + buf.WriteString("REMOTE") + buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(h.hostname) + buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) + buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(fmt.Sprintf("%v", line.Count)) + buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(line.SourceID) + buf.WriteByte(protocol.FieldDelimiter) + payload := append(buf.Bytes(), line.Content.Bytes()...) n = copy(p, payload) + pool.RecycleBytesBuffer(buf) pool.RecycleBytesBuffer(line.Content) return -- cgit v1.2.3 From 23982f331c2154a66b86d596226c24454fd06be5 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 20:26:32 +0100 Subject: 1. Major performance gain by not checking for file truncation aftter each bytes read. 2. Introduce field separator to the protocol package. --- internal/server/handlers/serverhandler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index f5aefa2..14fc5d0 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -120,15 +120,15 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case line := <-h.lines: buf := pool.BytesBuffer.Get().(*bytes.Buffer) buf.WriteString("REMOTE") - buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(protocol.FieldDelimiter) buf.WriteString(h.hostname) - buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(protocol.FieldDelimiter) buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(protocol.FieldDelimiter) buf.WriteString(fmt.Sprintf("%v", line.Count)) - buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(protocol.FieldDelimiter) buf.WriteString(line.SourceID) - buf.WriteByte(protocol.FieldDelimiter) + buf.WriteString(protocol.FieldDelimiter) payload := append(buf.Bytes(), line.Content.Bytes()...) n = copy(p, payload) pool.RecycleBytesBuffer(buf) -- cgit v1.2.3 From cc89d3fb8be2465af276d7ef03ea2a8affd87b2e Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 6 Sep 2021 13:48:55 +0300 Subject: Print out client/server update notice even from dtail server 4 to dtail client 3. --- internal/server/handlers/serverhandler.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 14fc5d0..14f46a3 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "sync/atomic" "time" @@ -167,9 +168,9 @@ func (h *ServerHandler) handleCommand(commandStr string) { logger.Debug(h.user, commandStr) ctx := context.Background() - args, argc, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) + args, argc, add, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) if err != nil { - h.send(h.serverMessages, logger.Error(h.user, err)) + h.send(h.serverMessages, logger.Error(h.user, err)+add) return } @@ -193,19 +194,34 @@ func (h *ServerHandler) handleCommand(commandStr string) { h.handleUserCommand(ctx, argc, args) } -func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, error) { +func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { argc := len(args) + var add string if argc <= 2 || args[0] != "protocol" { - return args, argc, errors.New("unable to determine protocol version") + return args, argc, add, errors.New("unable to determine protocol version") } if args[1] != protocol.ProtocolCompat { - err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", protocol.ProtocolCompat, args[1]) - return args, argc, err + clientCompat, _ := strconv.Atoi(args[1]) + serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat) + if clientCompat <= 3 { + // Protocol version 3 or lower expect a newline as message separator + // One day (after 2 major versions) this exception may be removed! + add = "\n" + } + + toUpdate := "client" + if clientCompat > serverCompat { + toUpdate = "server" + } + + err := fmt.Errorf("DTail server protocol version '%s' does not match client protocol version '%s', please update DTail %s!", + protocol.ProtocolCompat, args[1], toUpdate) + return args, argc, add, err } - return args[2:], argc - 2, nil + return args[2:], argc - 2, add, nil } func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, error) { -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/server/handlers/serverhandler.go | 118 +++++++++++++++--------------- 1 file changed, 58 insertions(+), 60 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 14f46a3..e74e686 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -38,7 +38,6 @@ type ServerHandler struct { aggregate *server.Aggregate aggregatedMessages chan string serverMessages chan string - payload []byte hostname string user *user.User catLimiter chan struct{} @@ -47,6 +46,8 @@ type ServerHandler struct { activeCommands int32 activeReaders int32 quiet bool + readBuf bytes.Buffer + writeBuf bytes.Buffer } // NewServerHandler returns the server handler. @@ -86,77 +87,74 @@ func (h *ServerHandler) Done() <-chan struct{} { // Read is to send data to the dtail client via Reader interface. func (h *ServerHandler) Read(p []byte) (n int, err error) { - for { - select { - case message := <-h.serverMessages: - if len(message) == 0 { - logger.Warn(h.user, "Empty message recieved") - return - } - if message[0] == '.' { - // Handle hidden message (don't display to the user, interpreted by dtail client) - payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return - } - - // Handle normal server message (display to the user) - payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return + defer h.readBuf.Reset() - case message := <-h.aggregatedMessages: - // Send mapreduce-aggregated data as a message. - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("AGGREGATE") - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(message) - buf.WriteByte(protocol.MessageDelimiter) - n = copy(p, buf.Bytes()) - pool.RecycleBytesBuffer(buf) + select { + case message := <-h.serverMessages: + if message[0] == '.' { + // Handle hidden message (don't display to the user, interpreted by dtail client) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) return + } - case line := <-h.lines: - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("REMOTE") - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%v", line.Count)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(line.SourceID) - buf.WriteString(protocol.FieldDelimiter) - payload := append(buf.Bytes(), line.Content.Bytes()...) - n = copy(p, payload) - pool.RecycleBytesBuffer(buf) - pool.RecycleBytesBuffer(line.Content) + // Handle normal server message (display to the user) + h.readBuf.WriteString("SERVER") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case message := <-h.aggregatedMessages: + // Send mapreduce-aggregated data as a message. + h.readBuf.WriteString("AGGREGATE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case line := <-h.lines: + h.readBuf.WriteString("REMOTE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.SourceID) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.Content.String()) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + pool.RecycleBytesBuffer(line.Content) + + case <-time.After(time.Second): + // Once in a while check whether we are done. + select { + case <-h.done.Done(): + err = io.EOF return - - case <-time.After(time.Second): - // Once in a while check whether we are done. - select { - case <-h.done.Done(): - return 0, io.EOF - default: - } + default: } } + return } // Write is to receive data from the dtail client via Writer interface. func (h *ServerHandler) Write(p []byte) (n int, err error) { - for _, c := range p { - switch c { + for _, b := range p { + switch b { case ';': - commandStr := strings.TrimSpace(string(h.payload)) - h.handleCommand(commandStr) - h.payload = nil + h.handleCommand(string(h.writeBuf.Bytes())) + h.writeBuf.Reset() default: - h.payload = append(h.payload, c) + h.writeBuf.WriteByte(b) } } -- cgit v1.2.3 From 2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:04:42 +0300 Subject: bugfix: dmap skipped the last couple of mapreduce lines --- internal/server/handlers/serverhandler.go | 86 +++++++++++-------------------- 1 file changed, 30 insertions(+), 56 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index e74e686..ed19412 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -32,36 +32,35 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - activeReaders int32 - quiet bool - readBuf bytes.Buffer - writeBuf bytes.Buffer + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + maprMessages chan string + serverMessages chan string + hostname string + user *user.User + catLimiter chan struct{} + tailLimiter chan struct{} + ackCloseReceived chan struct{} + activeCommands int32 + quiet bool + readBuf bytes.Buffer + writeBuf bytes.Buffer } // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - aggregatedMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - regex: ".", - user: user, + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + maprMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + user: user, } fqdn, err := os.Hostname() @@ -108,7 +107,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { h.readBuf.WriteByte(protocol.MessageDelimiter) n = copy(p, h.readBuf.Bytes()) - case message := <-h.aggregatedMessages: + case message := <-h.maprMessages: // Send mapreduce-aggregated data as a message. h.readBuf.WriteString("AGGREGATE") h.readBuf.WriteString(protocol.FieldDelimiter) @@ -260,14 +259,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] h.shutdown() } } - readerFinished := func() { - if h.decrementActiveReaders() == 0 { - if h.aggregate == nil { - return - } - h.aggregate.Shutdown() - } - } splitted := strings.Split(args[0], ":") commandName := splitted[0] @@ -289,18 +280,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "grep", "cat": command := newReadCommand(h, omode.CatClient) go func() { - h.incrementActiveReaders() command.Start(ctx, argc, args, 1) - readerFinished() commandFinished() }() case "tail": command := newReadCommand(h, omode.TailClient) go func() { - h.incrementActiveReaders() command.Start(ctx, argc, args, 10) - readerFinished() commandFinished() }() @@ -315,7 +302,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] h.aggregate = aggregate go func() { - command.Start(ctx, h.aggregatedMessages) + command.Start(ctx, h.maprMessages) commandFinished() }() @@ -361,15 +348,11 @@ func (h *ServerHandler) serverMessageC() chan<- string { return h.serverMessages } -func (h *ServerHandler) flush() { - logger.Debug(h.user, "flush()") - - if h.aggregate != nil { - h.aggregate.Flush() - } +func (h *ServerHandler) flushMessages() { + logger.Debug(h.user, "flushMessages()") unsentMessages := func() int { - return len(h.lines) + len(h.serverMessages) + len(h.aggregatedMessages) + return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } for i := 0; i < 3; i++ { if unsentMessages() == 0 { @@ -385,7 +368,7 @@ func (h *ServerHandler) flush() { func (h *ServerHandler) shutdown() { logger.Debug(h.user, "shutdown()") - h.flush() + h.flushMessages() go func() { select { @@ -413,15 +396,6 @@ func (h *ServerHandler) decrementActiveCommands() int32 { return atomic.LoadInt32(&h.activeCommands) } -func (h *ServerHandler) incrementActiveReaders() { - atomic.AddInt32(&h.activeReaders, 1) -} - -func (h *ServerHandler) decrementActiveReaders() int32 { - atomic.AddInt32(&h.activeReaders, -1) - return atomic.LoadInt32(&h.activeReaders) -} - func readOptions(opts []string) (map[string]string, error) { options := make(map[string]string, len(opts)) -- cgit v1.2.3 From 6506e20f6c80f4acb7434eb9dd14f784a67189cd Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 14:41:25 +0300 Subject: add spartan mode --- internal/server/handlers/serverhandler.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index ed19412..2f3b73b 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -45,6 +45,7 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 quiet bool + spartan bool readBuf bytes.Buffer writeBuf bytes.Buffer } @@ -118,16 +119,18 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { n = copy(p, h.readBuf.Bytes()) case line := <-h.lines: - h.readBuf.WriteString("REMOTE") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(line.SourceID) - h.readBuf.WriteString(protocol.FieldDelimiter) + if !h.spartan { + h.readBuf.WriteString("REMOTE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.SourceID) + h.readBuf.WriteString(protocol.FieldDelimiter) + } h.readBuf.WriteString(line.Content.String()) h.readBuf.WriteByte(protocol.MessageDelimiter) n = copy(p, h.readBuf.Bytes()) @@ -275,6 +278,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] h.quiet = true } } + if spartan, ok := options["spartan"]; ok { + if spartan == "true" { + logger.Debug(h.user, "Enabling spartan mode") + h.spartan = true + } + } switch commandName { case "grep", "cat": @@ -397,6 +406,7 @@ func (h *ServerHandler) decrementActiveCommands() int32 { } func readOptions(opts []string) (map[string]string, error) { + logger.Debug("Parsing options", opts) options := make(map[string]string, len(opts)) for _, o := range opts { @@ -416,6 +426,7 @@ func readOptions(opts []string) (map[string]string, error) { val = string(decoded) } + logger.Debug("Setting option", key, val) options[key] = val } -- cgit v1.2.3 From abeac87aec44249bf67f1b0eca471a31086265ca Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 19:27:50 +0300 Subject: fix auto reconnect --- internal/server/handlers/serverhandler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 2f3b73b..4820476 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -331,7 +331,11 @@ func (h *ServerHandler) handleAckCommand(argc int, args []string) { return } if args[1] == "close" && args[2] == "connection" { - close(h.ackCloseReceived) + select { + case <-h.ackCloseReceived: + default: + close(h.ackCloseReceived) + } } } -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/server/handlers/serverhandler.go | 46 +++++++++++++++---------------- 1 file changed, 23 insertions(+), 23 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 4820476..b664566 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -16,7 +16,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" @@ -66,7 +66,7 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S fqdn, err := os.Hostname() if err != nil { - logger.FatalExit(err) + dlog.Server.FatalPanic(err) } s := strings.Split(fqdn, ".") @@ -165,18 +165,18 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { } func (h *ServerHandler) handleCommand(commandStr string) { - logger.Debug(h.user, commandStr) + dlog.Server.Debug(h.user, commandStr) ctx := context.Background() args, argc, add, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) if err != nil { - h.send(h.serverMessages, logger.Error(h.user, err)+add) + h.send(h.serverMessages, dlog.Server.Error(h.user, err)+add) return } args, argc, err = h.handleBase64(args, argc) if err != nil { - h.send(h.serverMessages, logger.Error(h.user, err)) + h.send(h.serverMessages, dlog.Server.Error(h.user, err)) return } @@ -239,7 +239,7 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er args = strings.Split(decodedStr, " ") argc = len(decodedStr) - logger.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args) + dlog.Server.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args) return args, argc, nil } @@ -247,14 +247,14 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er func (h *ServerHandler) handleControlCommand(argc int, args []string) { switch args[0] { case "debug": - h.send(h.serverMessages, logger.Debug(h.user, "Receiving debug command", argc, args)) + h.send(h.serverMessages, dlog.Server.Debug(h.user, "Receiving debug command", argc, args)) default: - logger.Warn(h.user, "Received unknown control command", argc, args) + dlog.Server.Warn(h.user, "Received unknown control command", argc, args) } } func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { - logger.Debug(h.user, "handleUserCommand", argc, args) + dlog.Server.Debug(h.user, "handleUserCommand", argc, args) h.incrementActiveCommands() commandFinished := func() { @@ -268,19 +268,19 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] options, err := readOptions(splitted[1:]) if err != nil { - h.sendServerMessage(logger.Error(h.user, err)) + h.sendServerMessage(dlog.Server.Error(h.user, err)) commandFinished() return } if quiet, ok := options["quiet"]; ok { if quiet == "true" { - logger.Debug(h.user, "Enabling quiet mode") + dlog.Server.Debug(h.user, "Enabling quiet mode") h.quiet = true } } if spartan, ok := options["spartan"]; ok { if spartan == "true" { - logger.Debug(h.user, "Enabling spartan mode") + dlog.Server.Debug(h.user, "Enabling spartan mode") h.spartan = true } } @@ -304,7 +304,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command, aggregate, err := newMapCommand(h, argc, args) if err != nil { h.sendServerMessage(err.Error()) - logger.Error(h.user, err) + dlog.Server.Error(h.user, err) commandFinished() return } @@ -320,14 +320,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() default: - h.sendServerMessage(logger.Error(h.user, "Received unknown user command", commandName, argc, args, options)) + h.sendServerMessage(dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options)) commandFinished() } } func (h *ServerHandler) handleAckCommand(argc int, args []string) { if argc < 3 { - h.sendServerWarnMessage(logger.Warn(h.user, commandParseWarning, args, argc)) + h.sendServerWarnMessage(dlog.Server.Warn(h.user, commandParseWarning, args, argc)) return } if args[1] == "close" && args[2] == "connection" { @@ -362,25 +362,25 @@ func (h *ServerHandler) serverMessageC() chan<- string { } func (h *ServerHandler) flushMessages() { - logger.Debug(h.user, "flushMessages()") + dlog.Server.Debug(h.user, "flushMessages()") unsentMessages := func() int { return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } for i := 0; i < 3; i++ { if unsentMessages() == 0 { - logger.Debug(h.user, "All lines sent") + dlog.Server.Debug(h.user, "All lines sent") return } - logger.Debug(h.user, "Still lines to be sent") + dlog.Server.Debug(h.user, "Still lines to be sent") time.Sleep(time.Second) } - logger.Warn(h.user, "Some lines remain unsent", unsentMessages()) + dlog.Server.Warn(h.user, "Some lines remain unsent", unsentMessages()) } func (h *ServerHandler) shutdown() { - logger.Debug(h.user, "shutdown()") + dlog.Server.Debug(h.user, "shutdown()") h.flushMessages() go func() { @@ -393,7 +393,7 @@ func (h *ServerHandler) shutdown() { select { case <-h.ackCloseReceived: case <-time.After(time.Second * 5): - logger.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") + dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") case <-h.done.Done(): } @@ -410,7 +410,7 @@ func (h *ServerHandler) decrementActiveCommands() int32 { } func readOptions(opts []string) (map[string]string, error) { - logger.Debug("Parsing options", opts) + dlog.Server.Debug("Parsing options", opts) options := make(map[string]string, len(opts)) for _, o := range opts { @@ -430,7 +430,7 @@ func readOptions(opts []string) (map[string]string, error) { val = string(decoded) } - logger.Debug("Setting option", key, val) + dlog.Server.Debug("Setting option", key, val) options[key] = val } -- cgit v1.2.3 From 12c79f68bb5bda6673819d7b754820ecfe6d08ff Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 11:54:07 +0300 Subject: reduce logging in serverless mode --- internal/server/handlers/serverhandler.go | 93 ++++++++++--------------------- 1 file changed, 30 insertions(+), 63 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index b664566..ace2626 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -15,8 +15,8 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" @@ -46,6 +46,7 @@ type ServerHandler struct { activeCommands int32 quiet bool spartan bool + serverless bool readBuf bytes.Buffer writeBuf bytes.Buffer } @@ -99,6 +100,12 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { return } + if h.serverless { + // In serverless mode we have logged the server message already via the + // dlog logger, no need to send the message again to the client part. + return + } + // Handle normal server message (display to the user) h.readBuf.WriteString("SERVER") h.readBuf.WriteString(protocol.FieldDelimiter) @@ -266,23 +273,24 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] splitted := strings.Split(args[0], ":") commandName := splitted[0] - options, err := readOptions(splitted[1:]) + options, err := config.DeserializeOptions(splitted[1:]) if err != nil { - h.sendServerMessage(dlog.Server.Error(h.user, err)) + h.send(h.serverMessages, dlog.Server.Error(h.user, err)) commandFinished() return } - if quiet, ok := options["quiet"]; ok { - if quiet == "true" { - dlog.Server.Debug(h.user, "Enabling quiet mode") - h.quiet = true - } + + if quiet, _ := options["quiet"]; quiet == "true" { + dlog.Server.Debug(h.user, "Enabling quiet mode") + h.quiet = true } - if spartan, ok := options["spartan"]; ok { - if spartan == "true" { - dlog.Server.Debug(h.user, "Enabling spartan mode") - h.spartan = true - } + if spartan, _ := options["spartan"]; spartan == "true" { + dlog.Server.Debug(h.user, "Enabling spartan mode") + h.spartan = true + } + if serverless, _ := options["serverless"]; serverless == "true" { + dlog.Server.Debug(h.user, "Enabling serverless mode") + h.serverless = true } switch commandName { @@ -303,7 +311,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "map": command, aggregate, err := newMapCommand(h, argc, args) if err != nil { - h.sendServerMessage(err.Error()) + h.send(h.serverMessages, err.Error()) dlog.Server.Error(h.user, err) commandFinished() return @@ -320,14 +328,16 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() default: - h.sendServerMessage(dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options)) + h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options)) commandFinished() } } func (h *ServerHandler) handleAckCommand(argc int, args []string) { if argc < 3 { - h.sendServerWarnMessage(dlog.Server.Warn(h.user, commandParseWarning, args, argc)) + if !h.quiet { + h.send(h.serverMessages, dlog.Server.Warn(h.user, commandParseWarning, args, argc)) + } return } if args[1] == "close" && args[2] == "connection" { @@ -346,23 +356,8 @@ func (h *ServerHandler) send(ch chan<- string, message string) { } } -func (h *ServerHandler) sendServerMessage(message string) { - h.send(h.serverMessageC(), message) -} - -func (h *ServerHandler) sendServerWarnMessage(message string) { - if h.quiet { - return - } - h.send(h.serverMessageC(), message) -} - -func (h *ServerHandler) serverMessageC() chan<- string { - return h.serverMessages -} - -func (h *ServerHandler) flushMessages() { - dlog.Server.Debug(h.user, "flushMessages()") +func (h *ServerHandler) flush() { + dlog.Server.Debug(h.user, "flush()") unsentMessages := func() int { return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) @@ -381,11 +376,11 @@ func (h *ServerHandler) flushMessages() { func (h *ServerHandler) shutdown() { dlog.Server.Debug(h.user, "shutdown()") - h.flushMessages() + h.flush() go func() { select { - case h.serverMessageC() <- ".syn close connection": + case h.serverMessages <- ".syn close connection": case <-h.done.Done(): } }() @@ -408,31 +403,3 @@ func (h *ServerHandler) decrementActiveCommands() int32 { atomic.AddInt32(&h.activeCommands, -1) return atomic.LoadInt32(&h.activeCommands) } - -func readOptions(opts []string) (map[string]string, error) { - dlog.Server.Debug("Parsing options", opts) - options := make(map[string]string, len(opts)) - - for _, o := range opts { - kv := strings.SplitN(o, "=", 2) - if len(kv) != 2 { - return options, fmt.Errorf("Unable to parse options: %v", kv) - } - key := kv[0] - val := kv[1] - - if strings.HasPrefix(val, "base64%") { - s := strings.SplitN(val, "%", 2) - decoded, err := base64.StdEncoding.DecodeString(s[1]) - if err != nil { - return options, err - } - val = string(decoded) - } - - dlog.Server.Debug("Setting option", key, val) - options[key] = val - } - - return options, nil -} -- cgit v1.2.3 From f70622f307629a2542ea5eb128dea8c1043d3a40 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 10:00:38 +0300 Subject: more on this --- internal/server/handlers/serverhandler.go | 332 +++--------------------------- 1 file changed, 34 insertions(+), 298 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index ace2626..2ec4fbf 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -1,69 +1,60 @@ package handlers import ( - "bytes" "context" - "encoding/base64" - "errors" - "fmt" - "io" "os" - "strconv" "strings" - "sync/atomic" - "time" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/protocol" user "github.com/mimecast/dtail/internal/user/server" ) -const ( - commandParseWarning string = "Unable to parse command" -) - // ServerHandler implements the Reader and Writer interfaces to handle // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - maprMessages chan string - serverMessages chan string - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - quiet bool - spartan bool - serverless bool - readBuf bytes.Buffer - writeBuf bytes.Buffer + baseHandler + catLimiter chan struct{} + tailLimiter chan struct{} + regex string + /* + done *internal.Done + lines chan line.Line + aggregate *server.Aggregate + maprMessages chan string + serverMessages chan string + hostname string + user *user.User + ackCloseReceived chan struct{} + activeCommands int32 + quiet bool + spartan bool + serverless bool + readBuf bytes.Buffer + writeBuf bytes.Buffer + */ } // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - maprMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - regex: ".", - user: user, - } + baseHandler: baseHandler{ + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + maprMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + user: user, + }, + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + } + h.handleCommandCb = h.handleUserCommand fqdn, err := os.Hostname() if err != nil { @@ -76,192 +67,8 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } -// Shutdown the handler. -func (h *ServerHandler) Shutdown() { - h.done.Shutdown() -} - -// Done channel of the handler. -func (h *ServerHandler) Done() <-chan struct{} { - return h.done.Done() -} - -// Read is to send data to the dtail client via Reader interface. -func (h *ServerHandler) Read(p []byte) (n int, err error) { - defer h.readBuf.Reset() - - select { - case message := <-h.serverMessages: - if message[0] == '.' { - // Handle hidden message (don't display to the user, interpreted by dtail client) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - return - } - - if h.serverless { - // In serverless mode we have logged the server message already via the - // dlog logger, no need to send the message again to the client part. - return - } - - // Handle normal server message (display to the user) - h.readBuf.WriteString("SERVER") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - - case message := <-h.maprMessages: - // Send mapreduce-aggregated data as a message. - h.readBuf.WriteString("AGGREGATE") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - - case line := <-h.lines: - if !h.spartan { - h.readBuf.WriteString("REMOTE") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(line.SourceID) - h.readBuf.WriteString(protocol.FieldDelimiter) - } - h.readBuf.WriteString(line.Content.String()) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - pool.RecycleBytesBuffer(line.Content) - - case <-time.After(time.Second): - // Once in a while check whether we are done. - select { - case <-h.done.Done(): - err = io.EOF - return - default: - } - } - return -} - -// Write is to receive data from the dtail client via Writer interface. -func (h *ServerHandler) Write(p []byte) (n int, err error) { - for _, b := range p { - switch b { - case ';': - h.handleCommand(string(h.writeBuf.Bytes())) - h.writeBuf.Reset() - default: - h.writeBuf.WriteByte(b) - } - } - - n = len(p) - return -} - -func (h *ServerHandler) handleCommand(commandStr string) { - dlog.Server.Debug(h.user, commandStr) - ctx := context.Background() - - args, argc, add, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) - if err != nil { - h.send(h.serverMessages, dlog.Server.Error(h.user, err)+add) - return - } - - args, argc, err = h.handleBase64(args, argc) - if err != nil { - h.send(h.serverMessages, dlog.Server.Error(h.user, err)) - return - } - - if h.user.Name == config.ControlUser { - h.handleControlCommand(argc, args) - return - } - - ctx, cancel := context.WithCancel(ctx) - go func() { - <-h.done.Done() - cancel() - }() - - h.handleUserCommand(ctx, argc, args) -} - -func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { - argc := len(args) - var add string - - if argc <= 2 || args[0] != "protocol" { - return args, argc, add, errors.New("unable to determine protocol version") - } - - if args[1] != protocol.ProtocolCompat { - clientCompat, _ := strconv.Atoi(args[1]) - serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat) - if clientCompat <= 3 { - // Protocol version 3 or lower expect a newline as message separator - // One day (after 2 major versions) this exception may be removed! - add = "\n" - } - - toUpdate := "client" - if clientCompat > serverCompat { - toUpdate = "server" - } - - err := fmt.Errorf("DTail server protocol version '%s' does not match client protocol version '%s', please update DTail %s!", - protocol.ProtocolCompat, args[1], toUpdate) - return args, argc, add, err - } - - return args[2:], argc - 2, add, nil -} - -func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, error) { - err := errors.New("Unable to decode client message, DTail server and client versions may not be compatible") - - if argc != 2 || args[0] != "base64" { - return args, argc, err - } - - decoded, err := base64.StdEncoding.DecodeString(args[1]) - if err != nil { - return args, argc, err - } - decodedStr := string(decoded) - - args = strings.Split(decodedStr, " ") - argc = len(decodedStr) - dlog.Server.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args) - - return args, argc, nil -} - -func (h *ServerHandler) handleControlCommand(argc int, args []string) { - switch args[0] { - case "debug": - h.send(h.serverMessages, dlog.Server.Debug(h.user, "Receiving debug command", argc, args)) - default: - dlog.Server.Warn(h.user, "Received unknown control command", argc, args) - } -} - func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { - dlog.Server.Debug(h.user, "handleUserCommand", argc, args) + dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() commandFinished := func() { @@ -332,74 +139,3 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() } } - -func (h *ServerHandler) handleAckCommand(argc int, args []string) { - if argc < 3 { - if !h.quiet { - h.send(h.serverMessages, dlog.Server.Warn(h.user, commandParseWarning, args, argc)) - } - return - } - if args[1] == "close" && args[2] == "connection" { - select { - case <-h.ackCloseReceived: - default: - close(h.ackCloseReceived) - } - } -} - -func (h *ServerHandler) send(ch chan<- string, message string) { - select { - case ch <- message: - case <-h.done.Done(): - } -} - -func (h *ServerHandler) flush() { - dlog.Server.Debug(h.user, "flush()") - - unsentMessages := func() int { - return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) - } - for i := 0; i < 3; i++ { - if unsentMessages() == 0 { - dlog.Server.Debug(h.user, "All lines sent") - return - } - dlog.Server.Debug(h.user, "Still lines to be sent") - time.Sleep(time.Second) - } - - dlog.Server.Warn(h.user, "Some lines remain unsent", unsentMessages()) -} - -func (h *ServerHandler) shutdown() { - dlog.Server.Debug(h.user, "shutdown()") - h.flush() - - go func() { - select { - case h.serverMessages <- ".syn close connection": - case <-h.done.Done(): - } - }() - - select { - case <-h.ackCloseReceived: - case <-time.After(time.Second * 5): - dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") - case <-h.done.Done(): - } - - h.done.Shutdown() -} - -func (h *ServerHandler) incrementActiveCommands() { - atomic.AddInt32(&h.activeCommands, 1) -} - -func (h *ServerHandler) decrementActiveCommands() int32 { - atomic.AddInt32(&h.activeCommands, -1) - return atomic.LoadInt32(&h.activeCommands) -} -- cgit v1.2.3 From 9f395a03f25941d8ed98ec43035688daa1e8877f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 22:39:58 +0300 Subject: more on this --- internal/server/handlers/serverhandler.go | 16 ---------------- 1 file changed, 16 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 2ec4fbf..25cb8ba 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -21,22 +21,6 @@ type ServerHandler struct { catLimiter chan struct{} tailLimiter chan struct{} regex string - /* - done *internal.Done - lines chan line.Line - aggregate *server.Aggregate - maprMessages chan string - serverMessages chan string - hostname string - user *user.User - ackCloseReceived chan struct{} - activeCommands int32 - quiet bool - spartan bool - serverless bool - readBuf bytes.Buffer - writeBuf bytes.Buffer - */ } // NewServerHandler returns the server handler. -- cgit v1.2.3 From fab5dc3e70434ea0abc7a0976487a1973b662331 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 09:50:41 +0300 Subject: enable faster shutdown - useful for dgrep/dmap and dcat commands --- internal/server/handlers/serverhandler.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 25cb8ba..aaffe14 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/omode" @@ -25,6 +24,7 @@ type ServerHandler struct { // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { + dlog.Server.Debug(user, "Creating new server handler") h := ServerHandler{ baseHandler: baseHandler{ done: internal.NewDone(), @@ -51,7 +51,9 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, + commandName string, options map[string]string) { + dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -61,16 +63,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] } } - splitted := strings.Split(args[0], ":") - commandName := splitted[0] - - options, err := config.DeserializeOptions(splitted[1:]) - if err != nil { - h.send(h.serverMessages, dlog.Server.Error(h.user, err)) - commandFinished() - return - } - if quiet, _ := options["quiet"]; quiet == "true" { dlog.Server.Debug(h.user, "Enabling quiet mode") h.quiet = true -- cgit v1.2.3 From 7306afe9ab073c424ddca0ddc57950f237948118 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 10:55:50 +0300 Subject: move health check to separate client binary --- internal/server/handlers/serverhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index aaffe14..aed8956 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -106,7 +106,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() }() - case "ack", ".ack": + case ".ack": h.handleAckCommand(argc, args) commandFinished() -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/server/handlers/serverhandler.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index aed8956..f12d590 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,6 +4,7 @@ import ( "context" "os" "strings" + "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" @@ -23,7 +24,9 @@ type ServerHandler struct { } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, + tailLimiter chan struct{}) *ServerHandler { + dlog.Server.Debug(user, "Creating new server handler") h := ServerHandler{ baseHandler: baseHandler{ @@ -51,11 +54,10 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, - commandName string, options map[string]string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, + args []string, commandName string, options map[string]string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) - h.incrementActiveCommands() commandFinished := func() { if h.decrementActiveCommands() == 0 { @@ -73,7 +75,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] } if serverless, _ := options["serverless"]; serverless == "true" { dlog.Server.Debug(h.user, "Enabling serverless mode") - h.serverless = true + atomic.AddInt32(&h.serverless, 1) } switch commandName { @@ -83,14 +85,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command.Start(ctx, argc, args, 1) commandFinished() }() - case "tail": command := newReadCommand(h, omode.TailClient) go func() { command.Start(ctx, argc, args, 10) commandFinished() }() - case "map": command, aggregate, err := newMapCommand(h, argc, args) if err != nil { @@ -99,19 +99,17 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() return } - h.aggregate = aggregate go func() { command.Start(ctx, h.maprMessages) commandFinished() }() - case ".ack": h.handleAckCommand(argc, args) commandFinished() - default: - h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options)) + h.send(h.serverMessages, dlog.Server.Error(h.user, + "Received unknown user command", commandName, argc, args, options)) commandFinished() } } -- cgit v1.2.3 From f44792c9102488774c9993b080f35c65287a64b1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 10 Oct 2021 14:02:12 +0300 Subject: add another dmap test - reading 100 source files at once fix a data race when reading multiple files on one server from the same session at once --- internal/server/handlers/serverhandler.go | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index f12d590..52c4570 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,7 +4,6 @@ import ( "context" "os" "strings" - "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" @@ -55,7 +54,7 @@ func NewServerHandler(user *user.User, catLimiter, } func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, - args []string, commandName string, options map[string]string) { + args []string, commandName string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -65,19 +64,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, } } - if quiet, _ := options["quiet"]; quiet == "true" { - dlog.Server.Debug(h.user, "Enabling quiet mode") - h.quiet = true - } - if spartan, _ := options["spartan"]; spartan == "true" { - dlog.Server.Debug(h.user, "Enabling spartan mode") - h.spartan = true - } - if serverless, _ := options["serverless"]; serverless == "true" { - dlog.Server.Debug(h.user, "Enabling serverless mode") - atomic.AddInt32(&h.serverless, 1) - } - switch commandName { case "grep", "cat": command := newReadCommand(h, omode.CatClient) @@ -109,7 +95,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, commandFinished() default: h.send(h.serverMessages, dlog.Server.Error(h.user, - "Received unknown user command", commandName, argc, args, options)) + "Received unknown user command", commandName, argc, args)) commandFinished() } } -- cgit v1.2.3 From 1dead22129a26e4f532e68c2c63fe4122b519506 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 13 Oct 2021 21:10:28 +0300 Subject: Merging grep context from master --- internal/server/handlers/serverhandler.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'internal/server/handlers/serverhandler.go') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 52c4570..36574a9 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" user "github.com/mimecast/dtail/internal/user/server" ) @@ -53,8 +54,8 @@ func NewServerHandler(user *user.User, catLimiter, return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, - args []string, commandName string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LContext, + argc int, args []string, commandName string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -68,13 +69,13 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, case "grep", "cat": command := newReadCommand(h, omode.CatClient) go func() { - command.Start(ctx, argc, args, 1) + command.Start(ctx, ltx, argc, args, 1) commandFinished() }() case "tail": command := newReadCommand(h, omode.TailClient) go func() { - command.Start(ctx, argc, args, 10) + command.Start(ctx, ltx, argc, args, 10) commandFinished() }() case "map": -- cgit v1.2.3