summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-01-24 00:10:15 +0200
committerPaul Buetow <paul@buetow.org>2026-01-24 00:10:15 +0200
commit65ec49a97b1fcf633c1c6ba92e3db71ecd477196 (patch)
tree48b47389e1384f86ff81f849fd526a09ab9c4e0d
parentecb963eb98dd9174d017dd504a48fb2ea048c42d (diff)
refactor: split large functions for maintainability
Split functions exceeding 50 lines into smaller, focused helpers: - DirectTurboWriter.WriteLineData (~97 lines) split into: - WriteLineData (dispatcher, 9 lines) - writeServerlessLine (serverless mode, 48 lines) - writeNetworkLine (network mode, 40 lines) - TurboNetworkWriter.WriteLineData (~60 lines) split into: - WriteLineData (builds protocol line, 33 lines) - sendToTurboChannel (channel send with retry, 28 lines) - Server.handleRequests (~67 lines) split into: - handleRequests (request loop, 23 lines) - handleShellRequest (shell session setup, 57 lines) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
-rw-r--r--internal/server/handlers/turbo_writer.go166
-rw-r--r--internal/server/server.go107
2 files changed, 150 insertions, 123 deletions
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index 2d3131c..d8ee2ad 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -51,67 +51,73 @@ func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless b
}
}
-// WriteLineData writes formatted line data directly to output
+// WriteLineData writes formatted line data directly to output.
+// Dispatches to serverless or network mode handlers based on configuration.
func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
w.mutex.Lock()
defer w.mutex.Unlock()
- // Optimized serverless mode path
if w.serverless {
- // In serverless mode, we still need protocol formatting for consistency
- // but we can optimize by batching writes
-
- if w.plain {
- // For plain serverless mode, just write the line content
- w.writeBuf.Write(lineContent)
-
- // Ensure line has a newline if it doesn't already
- if len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' {
- w.writeBuf.WriteByte('\n')
- }
- } else {
- // For colored serverless mode with test compatibility
- // We need to maintain the protocol formatting for integration tests
- // Build the complete line in a temporary buffer
- var lineBuf bytes.Buffer
- lineBuf.WriteString("REMOTE")
- lineBuf.WriteString(protocol.FieldDelimiter)
- lineBuf.WriteString(w.hostname)
- lineBuf.WriteString(protocol.FieldDelimiter)
- lineBuf.WriteString("100")
- lineBuf.WriteString(protocol.FieldDelimiter)
- lineBuf.WriteString(fmt.Sprintf("%v", lineNum))
- lineBuf.WriteString(protocol.FieldDelimiter)
- lineBuf.WriteString(sourceID)
- lineBuf.WriteString(protocol.FieldDelimiter)
-
- // Remove trailing newline if present (it will be added back after coloring)
- content := lineContent
- if len(content) > 0 && content[len(content)-1] == '\n' {
- content = content[:len(content)-1]
- }
- lineBuf.Write(content)
-
- // Apply color formatting
- coloredLine := brush.Colorfy(lineBuf.String())
- w.writeBuf.WriteString(coloredLine)
+ return w.writeServerlessLine(lineContent, lineNum, sourceID)
+ }
+ return w.writeNetworkLine(lineContent, lineNum, sourceID)
+}
+
+// writeServerlessLine handles serverless mode output with buffered writes.
+// Supports both plain and colored output modes. Must be called with mutex held.
+func (w *DirectTurboWriter) writeServerlessLine(lineContent []byte, lineNum uint64, sourceID string) error {
+ if w.plain {
+ // For plain serverless mode, just write the line content
+ w.writeBuf.Write(lineContent)
+
+ // Ensure line has a newline if it doesn't already
+ if len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' {
w.writeBuf.WriteByte('\n')
}
-
- // Update stats
- w.linesWritten++
- w.bytesWritten += uint64(w.writeBuf.Len())
-
- // Buffer writes for better performance - only flush when buffer is full
- // This is a key optimization: we don't force immediate flush in serverless mode
- if w.writeBuf.Len() >= w.bufSize {
- return w.flushBuffer()
+ } else {
+ // For colored serverless mode with test compatibility
+ // Build the complete line with protocol formatting for integration tests
+ var lineBuf bytes.Buffer
+ lineBuf.WriteString("REMOTE")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(w.hostname)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString("100")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(sourceID)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+
+ // Remove trailing newline if present (it will be added back after coloring)
+ content := lineContent
+ if len(content) > 0 && content[len(content)-1] == '\n' {
+ content = content[:len(content)-1]
}
-
- return nil
+ lineBuf.Write(content)
+
+ // Apply color formatting
+ coloredLine := brush.Colorfy(lineBuf.String())
+ w.writeBuf.WriteString(coloredLine)
+ w.writeBuf.WriteByte('\n')
+ }
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ // Buffer writes for better performance - only flush when buffer is full
+ if w.writeBuf.Len() >= w.bufSize {
+ return w.flushBuffer()
}
- // Non-serverless mode: include protocol formatting for network transmission
+ return nil
+}
+
+// writeNetworkLine handles network mode output with protocol formatting.
+// Adds protocol headers for non-plain mode. Must be called with mutex held.
+func (w *DirectTurboWriter) writeNetworkLine(lineContent []byte, lineNum uint64, sourceID string) error {
+ // Include protocol formatting for network transmission
if !w.plain {
w.writeBuf.WriteString("REMOTE")
w.writeBuf.WriteString(protocol.FieldDelimiter)
@@ -372,14 +378,15 @@ type TurboNetworkWriter struct {
bytesWritten uint64
}
-// WriteLineData formats and writes line data directly
+// WriteLineData formats and writes line data directly to the turbo channel.
+// Builds the protocol-formatted line and sends it via sendToTurboChannel.
func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
w.mutex.Lock()
defer w.mutex.Unlock()
dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "lineNum", lineNum, "sourceID", sourceID, "contentLen", len(lineContent))
- // Build the output line
+ // Build the output line with protocol formatting
if !w.plain && !w.serverless {
w.writeBuf.WriteString("REMOTE")
w.writeBuf.WriteString(protocol.FieldDelimiter)
@@ -404,33 +411,38 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s
dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "linesWritten", w.linesWritten, "bytesWritten", w.bytesWritten, "bufSize", w.writeBuf.Len())
- // Write directly to the turbo channel
- if w.handler.turboLines != nil {
- data := make([]byte, w.writeBuf.Len())
- copy(data, w.writeBuf.Bytes())
-
- dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sending to turboLines channel", "dataLen", len(data))
+ return w.sendToTurboChannel()
+}
- // Send data to turbo channel with a larger buffer
- select {
- case w.handler.turboLines <- data:
- dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel successfully")
- w.writeBuf.Reset()
- return nil
- default:
- // Channel full, wait a bit and retry
- dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "channel full, waiting before retry")
- time.Sleep(time.Millisecond)
- w.handler.turboLines <- data
- dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel after retry")
- w.writeBuf.Reset()
- return nil
- }
+// sendToTurboChannel sends buffered data to the turbo channel with retry logic.
+// Handles channel backpressure by waiting and retrying. Must be called with mutex held.
+func (w *TurboNetworkWriter) sendToTurboChannel() error {
+ if w.handler.turboLines == nil {
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil")
+ w.writeBuf.Reset()
+ return nil
}
- dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "turboLines channel is nil")
- w.writeBuf.Reset()
- return nil
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sending to turboLines channel", "dataLen", len(data))
+
+ // Send data to turbo channel, retry once if full
+ select {
+ case w.handler.turboLines <- data:
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
+ w.writeBuf.Reset()
+ return nil
+ default:
+ // Channel full, wait a bit and retry
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
+ time.Sleep(time.Millisecond)
+ w.handler.turboLines <- data
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
+ w.writeBuf.Reset()
+ return nil
+ }
}
// WriteServerMessage writes a server message
diff --git a/internal/server/server.go b/internal/server/server.go
index 8b581b1..38b042f 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -170,52 +170,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn,
switch req.Type {
case "shell":
- var handler handlers.Handler
- switch user.Name {
- case config.HealthUser:
- handler = handlers.NewHealthHandler(user)
- default:
- handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
- }
- terminate := func() {
- handler.Shutdown()
- sshConn.Close()
- }
-
- go func() {
- defer terminate()
- // Broken pipe, cancel
- if _, err := io.Copy(channel, handler); err != nil {
- dlog.Server.Trace(user, fmt.Errorf("channel->handler: %w", err))
- }
- }()
- go func() {
- defer terminate()
- // Broken pipe, cancel
- if _, err := io.Copy(handler, channel); err != nil {
- dlog.Server.Trace(user, fmt.Errorf("handler->channel: %w", err))
- }
- }()
- go func() {
- select {
- case <-ctx.Done():
- case <-handler.Done():
- }
- terminate()
- }()
- go func() {
- if err := sshConn.Wait(); err != nil && err != io.EOF {
- dlog.Server.Error(user, err)
- }
- s.stats.decrementConnections()
- dlog.Server.Info(user, "Good bye Mister!")
- terminate()
- }()
-
- // Only serving shell type
- if err := req.Reply(true, nil); err != nil {
- dlog.Server.Trace(user, fmt.Errorf("reply(true): %w", err))
- }
+ s.handleShellRequest(ctx, sshConn, channel, user, req)
default:
if err := req.Reply(false, nil); err != nil {
dlog.Server.Trace(user, fmt.Errorf("reply(false): %w", err))
@@ -227,6 +182,66 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn,
return nil
}
+// handleShellRequest sets up the shell session with handler goroutines for I/O,
+// context cancellation, and connection lifecycle management.
+func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn,
+ channel gossh.Channel, user *user.User, req *gossh.Request) {
+
+ // Create the appropriate handler based on user type
+ var handler handlers.Handler
+ switch user.Name {
+ case config.HealthUser:
+ handler = handlers.NewHealthHandler(user)
+ default:
+ handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
+ }
+
+ terminate := func() {
+ handler.Shutdown()
+ sshConn.Close()
+ }
+
+ // Start goroutine to copy data from channel to handler
+ go func() {
+ defer terminate()
+ if _, err := io.Copy(channel, handler); err != nil {
+ dlog.Server.Trace(user, fmt.Errorf("channel->handler: %w", err))
+ }
+ }()
+
+ // Start goroutine to copy data from handler to channel
+ go func() {
+ defer terminate()
+ if _, err := io.Copy(handler, channel); err != nil {
+ dlog.Server.Trace(user, fmt.Errorf("handler->channel: %w", err))
+ }
+ }()
+
+ // Start goroutine to handle context or handler completion
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-handler.Done():
+ }
+ terminate()
+ }()
+
+ // Start goroutine to handle connection lifecycle and cleanup
+ go func() {
+ if err := sshConn.Wait(); err != nil && err != io.EOF {
+ dlog.Server.Error(user, err)
+ }
+ s.stats.decrementConnections()
+ dlog.Server.Info(user, "Good bye Mister!")
+ terminate()
+ }()
+
+ // Reply to indicate shell request was accepted
+ if err := req.Reply(true, nil); err != nil {
+ dlog.Server.Trace(user, fmt.Errorf("reply(true): %w", err))
+ }
+}
+
// Callback for SSH authentication.
func (s *Server) Callback(c gossh.ConnMetadata,
authPayload []byte) (*gossh.Permissions, error) {