summaryrefslogtreecommitdiff
path: root/internal/server/handlers/serverhandler.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
-rw-r--r--internal/server/handlers/serverhandler.go86
1 files changed, 30 insertions, 56 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index e74e686..ed19412 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -32,36 +32,35 @@ const (
// the Bi-directional communication between SSH client and server.
// This handler implements the handler of the SSH server.
type ServerHandler struct {
- done *internal.Done
- lines chan line.Line
- regex string
- aggregate *server.Aggregate
- aggregatedMessages chan string
- serverMessages chan string
- hostname string
- user *user.User
- catLimiter chan struct{}
- tailLimiter chan struct{}
- ackCloseReceived chan struct{}
- activeCommands int32
- activeReaders int32
- quiet bool
- readBuf bytes.Buffer
- writeBuf bytes.Buffer
+ done *internal.Done
+ lines chan line.Line
+ regex string
+ aggregate *server.Aggregate
+ maprMessages chan string
+ serverMessages chan string
+ hostname string
+ user *user.User
+ catLimiter chan struct{}
+ tailLimiter chan struct{}
+ ackCloseReceived chan struct{}
+ activeCommands int32
+ quiet bool
+ readBuf bytes.Buffer
+ writeBuf bytes.Buffer
}
// NewServerHandler returns the server handler.
func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
h := ServerHandler{
- done: internal.NewDone(),
- lines: make(chan line.Line, 100),
- serverMessages: make(chan string, 10),
- aggregatedMessages: make(chan string, 10),
- ackCloseReceived: make(chan struct{}),
- catLimiter: catLimiter,
- tailLimiter: tailLimiter,
- regex: ".",
- user: user,
+ done: internal.NewDone(),
+ lines: make(chan line.Line, 100),
+ serverMessages: make(chan string, 10),
+ maprMessages: make(chan string, 10),
+ ackCloseReceived: make(chan struct{}),
+ catLimiter: catLimiter,
+ tailLimiter: tailLimiter,
+ regex: ".",
+ user: user,
}
fqdn, err := os.Hostname()
@@ -108,7 +107,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) {
h.readBuf.WriteByte(protocol.MessageDelimiter)
n = copy(p, h.readBuf.Bytes())
- case message := <-h.aggregatedMessages:
+ case message := <-h.maprMessages:
// Send mapreduce-aggregated data as a message.
h.readBuf.WriteString("AGGREGATE")
h.readBuf.WriteString(protocol.FieldDelimiter)
@@ -260,14 +259,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
h.shutdown()
}
}
- readerFinished := func() {
- if h.decrementActiveReaders() == 0 {
- if h.aggregate == nil {
- return
- }
- h.aggregate.Shutdown()
- }
- }
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
@@ -289,18 +280,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
go func() {
- h.incrementActiveReaders()
command.Start(ctx, argc, args, 1)
- readerFinished()
commandFinished()
}()
case "tail":
command := newReadCommand(h, omode.TailClient)
go func() {
- h.incrementActiveReaders()
command.Start(ctx, argc, args, 10)
- readerFinished()
commandFinished()
}()
@@ -315,7 +302,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
h.aggregate = aggregate
go func() {
- command.Start(ctx, h.aggregatedMessages)
+ command.Start(ctx, h.maprMessages)
commandFinished()
}()
@@ -361,15 +348,11 @@ func (h *ServerHandler) serverMessageC() chan<- string {
return h.serverMessages
}
-func (h *ServerHandler) flush() {
- logger.Debug(h.user, "flush()")
-
- if h.aggregate != nil {
- h.aggregate.Flush()
- }
+func (h *ServerHandler) flushMessages() {
+ logger.Debug(h.user, "flushMessages()")
unsentMessages := func() int {
- return len(h.lines) + len(h.serverMessages) + len(h.aggregatedMessages)
+ return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
for i := 0; i < 3; i++ {
if unsentMessages() == 0 {
@@ -385,7 +368,7 @@ func (h *ServerHandler) flush() {
func (h *ServerHandler) shutdown() {
logger.Debug(h.user, "shutdown()")
- h.flush()
+ h.flushMessages()
go func() {
select {
@@ -413,15 +396,6 @@ func (h *ServerHandler) decrementActiveCommands() int32 {
return atomic.LoadInt32(&h.activeCommands)
}
-func (h *ServerHandler) incrementActiveReaders() {
- atomic.AddInt32(&h.activeReaders, 1)
-}
-
-func (h *ServerHandler) decrementActiveReaders() int32 {
- atomic.AddInt32(&h.activeReaders, -1)
- return atomic.LoadInt32(&h.activeReaders)
-}
-
func readOptions(opts []string) (map[string]string, error) {
options := make(map[string]string, len(opts))