summaryrefslogtreecommitdiff
path: root/internal/server/handlers/serverhandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-08 19:10:50 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit16dc57e1e1c28e9d762424e596223a980770e059 (patch)
treeea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/server/handlers/serverhandler.go
parentc83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff)
mapreduce tables are in colors now too
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
-rw-r--r--internal/server/handlers/serverhandler.go118
1 files changed, 58 insertions, 60 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 14f46a3..e74e686 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -38,7 +38,6 @@ type ServerHandler struct {
aggregate *server.Aggregate
aggregatedMessages chan string
serverMessages chan string
- payload []byte
hostname string
user *user.User
catLimiter chan struct{}
@@ -47,6 +46,8 @@ type ServerHandler struct {
activeCommands int32
activeReaders int32
quiet bool
+ readBuf bytes.Buffer
+ writeBuf bytes.Buffer
}
// NewServerHandler returns the server handler.
@@ -86,77 +87,74 @@ func (h *ServerHandler) Done() <-chan struct{} {
// Read is to send data to the dtail client via Reader interface.
func (h *ServerHandler) Read(p []byte) (n int, err error) {
- for {
- select {
- case message := <-h.serverMessages:
- if len(message) == 0 {
- logger.Warn(h.user, "Empty message recieved")
- return
- }
- if message[0] == '.' {
- // Handle hidden message (don't display to the user, interpreted by dtail client)
- payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter)))
- n = copy(p, payload)
- return
- }
-
- // Handle normal server message (display to the user)
- payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter)))
- n = copy(p, payload)
- return
+ defer h.readBuf.Reset()
- case message := <-h.aggregatedMessages:
- // Send mapreduce-aggregated data as a message.
- buf := pool.BytesBuffer.Get().(*bytes.Buffer)
- buf.WriteString("AGGREGATE")
- buf.WriteString(protocol.AggregateDelimiter)
- buf.WriteString(h.hostname)
- buf.WriteString(protocol.AggregateDelimiter)
- buf.WriteString(message)
- buf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, buf.Bytes())
- pool.RecycleBytesBuffer(buf)
+ select {
+ case message := <-h.serverMessages:
+ if message[0] == '.' {
+ // Handle hidden message (don't display to the user, interpreted by dtail client)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
return
+ }
- case line := <-h.lines:
- buf := pool.BytesBuffer.Get().(*bytes.Buffer)
- buf.WriteString("REMOTE")
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(h.hostname)
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc))
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(fmt.Sprintf("%v", line.Count))
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(line.SourceID)
- buf.WriteString(protocol.FieldDelimiter)
- payload := append(buf.Bytes(), line.Content.Bytes()...)
- n = copy(p, payload)
- pool.RecycleBytesBuffer(buf)
- pool.RecycleBytesBuffer(line.Content)
+ // Handle normal server message (display to the user)
+ h.readBuf.WriteString("SERVER")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+
+ case message := <-h.aggregatedMessages:
+ // Send mapreduce-aggregated data as a message.
+ h.readBuf.WriteString("AGGREGATE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+
+ case line := <-h.lines:
+ h.readBuf.WriteString("REMOTE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc))
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(fmt.Sprintf("%v", line.Count))
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(line.SourceID)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(line.Content.String())
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+ pool.RecycleBytesBuffer(line.Content)
+
+ case <-time.After(time.Second):
+ // Once in a while check whether we are done.
+ select {
+ case <-h.done.Done():
+ err = io.EOF
return
-
- case <-time.After(time.Second):
- // Once in a while check whether we are done.
- select {
- case <-h.done.Done():
- return 0, io.EOF
- default:
- }
+ default:
}
}
+ return
}
// Write is to receive data from the dtail client via Writer interface.
func (h *ServerHandler) Write(p []byte) (n int, err error) {
- for _, c := range p {
- switch c {
+ for _, b := range p {
+ switch b {
case ';':
- commandStr := strings.TrimSpace(string(h.payload))
- h.handleCommand(commandStr)
- h.payload = nil
+ h.handleCommand(string(h.writeBuf.Bytes()))
+ h.writeBuf.Reset()
default:
- h.payload = append(h.payload, c)
+ h.writeBuf.WriteByte(b)
}
}