diff options
| author | Paul Buetow <paul@buetow.org> | 2026-01-24 00:10:15 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-01-24 00:10:15 +0200 |
| commit | 65ec49a97b1fcf633c1c6ba92e3db71ecd477196 (patch) | |
| tree | 48b47389e1384f86ff81f849fd526a09ab9c4e0d | |
| parent | ecb963eb98dd9174d017dd504a48fb2ea048c42d (diff) | |
refactor: split large functions for maintainability
Split functions exceeding 50 lines into smaller, focused helpers:
- DirectTurboWriter.WriteLineData (~97 lines) split into:
- WriteLineData (dispatcher, 9 lines)
- writeServerlessLine (serverless mode, 48 lines)
- writeNetworkLine (network mode, 40 lines)
- TurboNetworkWriter.WriteLineData (~60 lines) split into:
- WriteLineData (builds protocol line, 33 lines)
- sendToTurboChannel (channel send with retry, 28 lines)
- Server.handleRequests (~67 lines) split into:
- handleRequests (request loop, 23 lines)
- handleShellRequest (shell session setup, 57 lines)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 166 | ||||
| -rw-r--r-- | internal/server/server.go | 107 |
2 files changed, 150 insertions, 123 deletions
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index 2d3131c..d8ee2ad 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -51,67 +51,73 @@ func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless b } } -// WriteLineData writes formatted line data directly to output +// WriteLineData writes formatted line data directly to output. +// Dispatches to serverless or network mode handlers based on configuration. func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { w.mutex.Lock() defer w.mutex.Unlock() - // Optimized serverless mode path if w.serverless { - // In serverless mode, we still need protocol formatting for consistency - // but we can optimize by batching writes - - if w.plain { - // For plain serverless mode, just write the line content - w.writeBuf.Write(lineContent) - - // Ensure line has a newline if it doesn't already - if len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' { - w.writeBuf.WriteByte('\n') - } - } else { - // For colored serverless mode with test compatibility - // We need to maintain the protocol formatting for integration tests - // Build the complete line in a temporary buffer - var lineBuf bytes.Buffer - lineBuf.WriteString("REMOTE") - lineBuf.WriteString(protocol.FieldDelimiter) - lineBuf.WriteString(w.hostname) - lineBuf.WriteString(protocol.FieldDelimiter) - lineBuf.WriteString("100") - lineBuf.WriteString(protocol.FieldDelimiter) - lineBuf.WriteString(fmt.Sprintf("%v", lineNum)) - lineBuf.WriteString(protocol.FieldDelimiter) - lineBuf.WriteString(sourceID) - lineBuf.WriteString(protocol.FieldDelimiter) - - // Remove trailing newline if present (it will be added back after coloring) - content := lineContent - if len(content) > 0 && content[len(content)-1] == '\n' { - content = content[:len(content)-1] - } - lineBuf.Write(content) - - // Apply color formatting - coloredLine := brush.Colorfy(lineBuf.String()) - w.writeBuf.WriteString(coloredLine) + return w.writeServerlessLine(lineContent, lineNum, sourceID) + } + return w.writeNetworkLine(lineContent, lineNum, sourceID) +} + +// writeServerlessLine handles serverless mode output with buffered writes. +// Supports both plain and colored output modes. Must be called with mutex held. +func (w *DirectTurboWriter) writeServerlessLine(lineContent []byte, lineNum uint64, sourceID string) error { + if w.plain { + // For plain serverless mode, just write the line content + w.writeBuf.Write(lineContent) + + // Ensure line has a newline if it doesn't already + if len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' { w.writeBuf.WriteByte('\n') } - - // Update stats - w.linesWritten++ - w.bytesWritten += uint64(w.writeBuf.Len()) - - // Buffer writes for better performance - only flush when buffer is full - // This is a key optimization: we don't force immediate flush in serverless mode - if w.writeBuf.Len() >= w.bufSize { - return w.flushBuffer() + } else { + // For colored serverless mode with test compatibility + // Build the complete line with protocol formatting for integration tests + var lineBuf bytes.Buffer + lineBuf.WriteString("REMOTE") + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(w.hostname) + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString("100") + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(fmt.Sprintf("%v", lineNum)) + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(sourceID) + lineBuf.WriteString(protocol.FieldDelimiter) + + // Remove trailing newline if present (it will be added back after coloring) + content := lineContent + if len(content) > 0 && content[len(content)-1] == '\n' { + content = content[:len(content)-1] } - - return nil + lineBuf.Write(content) + + // Apply color formatting + coloredLine := brush.Colorfy(lineBuf.String()) + w.writeBuf.WriteString(coloredLine) + w.writeBuf.WriteByte('\n') + } + + // Update stats + w.linesWritten++ + w.bytesWritten += uint64(w.writeBuf.Len()) + + // Buffer writes for better performance - only flush when buffer is full + if w.writeBuf.Len() >= w.bufSize { + return w.flushBuffer() } - // Non-serverless mode: include protocol formatting for network transmission + return nil +} + +// writeNetworkLine handles network mode output with protocol formatting. +// Adds protocol headers for non-plain mode. Must be called with mutex held. +func (w *DirectTurboWriter) writeNetworkLine(lineContent []byte, lineNum uint64, sourceID string) error { + // Include protocol formatting for network transmission if !w.plain { w.writeBuf.WriteString("REMOTE") w.writeBuf.WriteString(protocol.FieldDelimiter) @@ -372,14 +378,15 @@ type TurboNetworkWriter struct { bytesWritten uint64 } -// WriteLineData formats and writes line data directly +// WriteLineData formats and writes line data directly to the turbo channel. +// Builds the protocol-formatted line and sends it via sendToTurboChannel. func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { w.mutex.Lock() defer w.mutex.Unlock() dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "lineNum", lineNum, "sourceID", sourceID, "contentLen", len(lineContent)) - // Build the output line + // Build the output line with protocol formatting if !w.plain && !w.serverless { w.writeBuf.WriteString("REMOTE") w.writeBuf.WriteString(protocol.FieldDelimiter) @@ -404,33 +411,38 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "linesWritten", w.linesWritten, "bytesWritten", w.bytesWritten, "bufSize", w.writeBuf.Len()) - // Write directly to the turbo channel - if w.handler.turboLines != nil { - data := make([]byte, w.writeBuf.Len()) - copy(data, w.writeBuf.Bytes()) - - dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sending to turboLines channel", "dataLen", len(data)) + return w.sendToTurboChannel() +} - // Send data to turbo channel with a larger buffer - select { - case w.handler.turboLines <- data: - dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel successfully") - w.writeBuf.Reset() - return nil - default: - // Channel full, wait a bit and retry - dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "channel full, waiting before retry") - time.Sleep(time.Millisecond) - w.handler.turboLines <- data - dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel after retry") - w.writeBuf.Reset() - return nil - } +// sendToTurboChannel sends buffered data to the turbo channel with retry logic. +// Handles channel backpressure by waiting and retrying. Must be called with mutex held. +func (w *TurboNetworkWriter) sendToTurboChannel() error { + if w.handler.turboLines == nil { + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil") + w.writeBuf.Reset() + return nil } - dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "turboLines channel is nil") - w.writeBuf.Reset() - return nil + data := make([]byte, w.writeBuf.Len()) + copy(data, w.writeBuf.Bytes()) + + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sending to turboLines channel", "dataLen", len(data)) + + // Send data to turbo channel, retry once if full + select { + case w.handler.turboLines <- data: + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") + w.writeBuf.Reset() + return nil + default: + // Channel full, wait a bit and retry + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") + time.Sleep(time.Millisecond) + w.handler.turboLines <- data + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry") + w.writeBuf.Reset() + return nil + } } // WriteServerMessage writes a server message diff --git a/internal/server/server.go b/internal/server/server.go index 8b581b1..38b042f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -170,52 +170,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, switch req.Type { case "shell": - var handler handlers.Handler - switch user.Name { - case config.HealthUser: - handler = handlers.NewHealthHandler(user) - default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) - } - terminate := func() { - handler.Shutdown() - sshConn.Close() - } - - go func() { - defer terminate() - // Broken pipe, cancel - if _, err := io.Copy(channel, handler); err != nil { - dlog.Server.Trace(user, fmt.Errorf("channel->handler: %w", err)) - } - }() - go func() { - defer terminate() - // Broken pipe, cancel - if _, err := io.Copy(handler, channel); err != nil { - dlog.Server.Trace(user, fmt.Errorf("handler->channel: %w", err)) - } - }() - go func() { - select { - case <-ctx.Done(): - case <-handler.Done(): - } - terminate() - }() - go func() { - if err := sshConn.Wait(); err != nil && err != io.EOF { - dlog.Server.Error(user, err) - } - s.stats.decrementConnections() - dlog.Server.Info(user, "Good bye Mister!") - terminate() - }() - - // Only serving shell type - if err := req.Reply(true, nil); err != nil { - dlog.Server.Trace(user, fmt.Errorf("reply(true): %w", err)) - } + s.handleShellRequest(ctx, sshConn, channel, user, req) default: if err := req.Reply(false, nil); err != nil { dlog.Server.Trace(user, fmt.Errorf("reply(false): %w", err)) @@ -227,6 +182,66 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, return nil } +// handleShellRequest sets up the shell session with handler goroutines for I/O, +// context cancellation, and connection lifecycle management. +func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn, + channel gossh.Channel, user *user.User, req *gossh.Request) { + + // Create the appropriate handler based on user type + var handler handlers.Handler + switch user.Name { + case config.HealthUser: + handler = handlers.NewHealthHandler(user) + default: + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) + } + + terminate := func() { + handler.Shutdown() + sshConn.Close() + } + + // Start goroutine to copy data from channel to handler + go func() { + defer terminate() + if _, err := io.Copy(channel, handler); err != nil { + dlog.Server.Trace(user, fmt.Errorf("channel->handler: %w", err)) + } + }() + + // Start goroutine to copy data from handler to channel + go func() { + defer terminate() + if _, err := io.Copy(handler, channel); err != nil { + dlog.Server.Trace(user, fmt.Errorf("handler->channel: %w", err)) + } + }() + + // Start goroutine to handle context or handler completion + go func() { + select { + case <-ctx.Done(): + case <-handler.Done(): + } + terminate() + }() + + // Start goroutine to handle connection lifecycle and cleanup + go func() { + if err := sshConn.Wait(); err != nil && err != io.EOF { + dlog.Server.Error(user, err) + } + s.stats.decrementConnections() + dlog.Server.Info(user, "Good bye Mister!") + terminate() + }() + + // Reply to indicate shell request was accepted + if err := req.Reply(true, nil); err != nil { + dlog.Server.Trace(user, fmt.Errorf("reply(true): %w", err)) + } +} + // Callback for SSH authentication. func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { |
