package handlers import ( "strings" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce // aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate removedNl bool } // NewMaprHandler returns a new mapreduce client handler. func NewMaprHandler(server string, session *client.SessionState) *MaprHandler { return &MaprHandler{ baseHandler: baseHandler{ server: server, shellStarted: false, commands: make(chan string), status: -1, done: internal.NewDone(), capabilities: make(map[string]struct{}), capabilitiesCh: make(chan struct{}), sessionAcks: make(chan SessionAck, 4), }, aggregate: client.NewAggregate(server, session), } } // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { case '\n': h.removedNl = true case protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() dlog.Client.Debug(message) if message[0] == 'A' { h.handleAggregateMessage(message) } else { if h.removedNl { h.baseHandler.handleMessage(message + "\n") } else { h.baseHandler.handleMessage(message) } } h.baseHandler.receiveBuf.Reset() h.removedNl = false default: h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } // Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { dlog.Client.Error("Unable to aggregate data", h.server, message, err) } } // Shutdown flushes any pending aggregate state before marking the handler done. func (h *MaprHandler) Shutdown() { if h.aggregate != nil { if err := h.aggregate.Flush(); err != nil { dlog.Client.Error("Unable to flush aggregate data on shutdown", h.server, err) } } h.baseHandler.Shutdown() }