diff options
| author | Paul Buetow <paul@buetow.org> | 2021-09-08 19:10:50 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-02 12:26:29 +0300 |
| commit | 16dc57e1e1c28e9d762424e596223a980770e059 (patch) | |
| tree | ea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/server/handlers/serverhandler.go | |
| parent | c83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff) | |
mapreduce tables are in colors now too
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 118 |
1 files changed, 58 insertions, 60 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 14f46a3..e74e686 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -38,7 +38,6 @@ type ServerHandler struct { aggregate *server.Aggregate aggregatedMessages chan string serverMessages chan string - payload []byte hostname string user *user.User catLimiter chan struct{} @@ -47,6 +46,8 @@ type ServerHandler struct { activeCommands int32 activeReaders int32 quiet bool + readBuf bytes.Buffer + writeBuf bytes.Buffer } // NewServerHandler returns the server handler. @@ -86,77 +87,74 @@ func (h *ServerHandler) Done() <-chan struct{} { // Read is to send data to the dtail client via Reader interface. func (h *ServerHandler) Read(p []byte) (n int, err error) { - for { - select { - case message := <-h.serverMessages: - if len(message) == 0 { - logger.Warn(h.user, "Empty message recieved") - return - } - if message[0] == '.' { - // Handle hidden message (don't display to the user, interpreted by dtail client) - payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return - } - - // Handle normal server message (display to the user) - payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return + defer h.readBuf.Reset() - case message := <-h.aggregatedMessages: - // Send mapreduce-aggregated data as a message. - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("AGGREGATE") - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(message) - buf.WriteByte(protocol.MessageDelimiter) - n = copy(p, buf.Bytes()) - pool.RecycleBytesBuffer(buf) + select { + case message := <-h.serverMessages: + if message[0] == '.' { + // Handle hidden message (don't display to the user, interpreted by dtail client) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) return + } - case line := <-h.lines: - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("REMOTE") - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%v", line.Count)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(line.SourceID) - buf.WriteString(protocol.FieldDelimiter) - payload := append(buf.Bytes(), line.Content.Bytes()...) - n = copy(p, payload) - pool.RecycleBytesBuffer(buf) - pool.RecycleBytesBuffer(line.Content) + // Handle normal server message (display to the user) + h.readBuf.WriteString("SERVER") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case message := <-h.aggregatedMessages: + // Send mapreduce-aggregated data as a message. + h.readBuf.WriteString("AGGREGATE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case line := <-h.lines: + h.readBuf.WriteString("REMOTE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.SourceID) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.Content.String()) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + pool.RecycleBytesBuffer(line.Content) + + case <-time.After(time.Second): + // Once in a while check whether we are done. + select { + case <-h.done.Done(): + err = io.EOF return - - case <-time.After(time.Second): - // Once in a while check whether we are done. - select { - case <-h.done.Done(): - return 0, io.EOF - default: - } + default: } } + return } // Write is to receive data from the dtail client via Writer interface. func (h *ServerHandler) Write(p []byte) (n int, err error) { - for _, c := range p { - switch c { + for _, b := range p { + switch b { case ';': - commandStr := strings.TrimSpace(string(h.payload)) - h.handleCommand(commandStr) - h.payload = nil + h.handleCommand(string(h.writeBuf.Bytes())) + h.writeBuf.Reset() default: - h.payload = append(h.payload, c) + h.writeBuf.WriteByte(b) } } |
