diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:42:33 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:42:33 +0200 |
| commit | a426a2f9f33b1125a05d3aac29e7b98afdc36a99 (patch) | |
| tree | 13d21a5aef7ec1e586e364ce5bebabb65fd77523 /internal/server | |
| parent | 3002bdcaa4ec22aa46b6c98eefda2f926dfff618 (diff) | |
server: use auth strategy registry and stabilize turbo EOF sync
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 36 | ||||
| -rw-r--r-- | internal/server/server.go | 64 |
4 files changed, 79 insertions, 24 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 6078e37..7cd9a63 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -147,6 +147,8 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, timeout := r.server.TurboEOFAckTimeout() if r.server.WaitForTurboEOFAck(timeout) { dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged") + // Allow transport buffers to flush after acknowledgement. + time.Sleep(r.server.ShutdownTurboSerializeWait()) } else { dlog.Server.Warn( r.server.LogContext(), diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 2c0a616..650dcf2 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -192,5 +192,6 @@ func (h *ServerHandler) turboManagerConfig() turboManagerConfig { flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout), flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval), readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval), + eofAckQuietPeriod: durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, defaultTurboEOFAckQuietPeriod), } } diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index e5cbf1a..deed383 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -12,6 +12,7 @@ const ( defaultTurboFlushTimeout = 2 * time.Second defaultTurboFlushPollInterval = 10 * time.Millisecond defaultTurboReadRetryInterval = time.Millisecond + defaultTurboEOFAckQuietPeriod = 50 * time.Millisecond ) type turboManagerConfig struct { @@ -19,6 +20,7 @@ type turboManagerConfig struct { flushTimeout time.Duration flushPollInterval time.Duration readRetryInterval time.Duration + eofAckQuietPeriod time.Duration } type turboManager struct { @@ -32,6 +34,9 @@ type turboManager struct { flushTimeout time.Duration flushPollInterval time.Duration readRetryInterval time.Duration + eofAckQuietPeriod time.Duration + + eofEmptySince time.Time } func (t *turboManager) configure(cfg turboManagerConfig) { @@ -47,6 +52,9 @@ func (t *turboManager) configure(cfg turboManagerConfig) { if cfg.readRetryInterval > 0 { t.readRetryInterval = cfg.readRetryInterval } + if cfg.eofAckQuietPeriod > 0 { + t.eofAckQuietPeriod = cfg.eofAckQuietPeriod + } } func (t *turboManager) resolvedChannelBufferSize() int { @@ -77,6 +85,13 @@ func (t *turboManager) resolvedReadRetryInterval() time.Duration { return defaultTurboReadRetryInterval } +func (t *turboManager) resolvedEOFAckQuietPeriod() time.Duration { + if t.eofAckQuietPeriod > 0 { + return t.eofAckQuietPeriod + } + return defaultTurboEOFAckQuietPeriod +} + func (t *turboManager) enable() { t.mode = true if t.lines == nil { @@ -85,6 +100,7 @@ func (t *turboManager) enable() { // Always create a new EOF channel for each batch of files. t.eof = make(chan struct{}) t.eofAck = make(chan struct{}) + t.eofEmptySince = time.Time{} } func (t *turboManager) enabled() bool { @@ -202,6 +218,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) select { case turboData := <-t.lines: dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) + t.eofEmptySince = time.Time{} n = copy(p, turboData) if n < len(turboData) { t.buffer = turboData[n:] @@ -215,6 +232,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) select { case turboData := <-t.lines: dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + t.eofEmptySince = time.Time{} n = copy(p, turboData) if n < len(turboData) { t.buffer = turboData[n:] @@ -228,9 +246,21 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) if t.eof != nil { select { case <-t.eof: - dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") - t.mode = false - t.signalEOFAck() + if len(t.lines) > 0 { + t.eofEmptySince = time.Time{} + break + } + + if t.eofEmptySince.IsZero() { + t.eofEmptySince = time.Now() + break + } + + if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { + dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") + t.mode = false + t.signalEOFAck() + } default: } } diff --git a/internal/server/server.go b/internal/server/server.go index 53aeec6..e00dba9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -36,8 +36,12 @@ type Server struct { sched *scheduler // Mointor log files for pattern (if configured) cont *continuous + // Authentication strategies keyed by SSH username. + authStrategies map[string]authStrategy } +type authStrategy func(*user.User, string, string) bool + // New returns a new server. func New(cfg config.RuntimeConfig) *Server { if cfg.Server == nil || cfg.Common == nil { @@ -61,6 +65,7 @@ func New(cfg config.RuntimeConfig) *Server { sched: newScheduler(cfg), cont: newContinuous(cfg), } + s.authStrategies = s.newAuthStrategies() s.sshServerConfig.PasswordCallback = s.Callback s.sshServerConfig.PublicKeyCallback = server.PublicKeyCallback @@ -279,32 +284,49 @@ func (s *Server) Callback(c gossh.ConnMetadata, splitted := strings.Split(c.RemoteAddr().String(), ":") remoteIP := splitted[0] - switch user.Name { - case config.HealthUser: - if authInfo == config.HealthUser { - dlog.Server.Debug(user, "Granting permissions to health user") - return nil, nil - } - case config.ScheduleUser: - for _, job := range s.cfg.Server.Schedule { - if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { - dlog.Server.Debug(user, "Granting SSH connection") - return nil, nil - } - } - case config.ContinuousUser: - for _, job := range s.cfg.Server.Continuous { - if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { - dlog.Server.Debug(user, "Granting SSH connection") - return nil, nil - } - } - default: + if strategy, found := s.authStrategies[user.Name]; found && strategy(user, authInfo, remoteIP) { + return nil, nil } return nil, fmt.Errorf("user %s not authorized", user) } +func (s *Server) newAuthStrategies() map[string]authStrategy { + return map[string]authStrategy{ + config.HealthUser: s.authorizeHealthUser, + config.ScheduleUser: s.authorizeScheduleUser, + config.ContinuousUser: s.authorizeContinuousUser, + } +} + +func (s *Server) authorizeHealthUser(user *user.User, authInfo, _ string) bool { + if authInfo != config.HealthUser { + return false + } + dlog.Server.Debug(user, "Granting permissions to health user") + return true +} + +func (s *Server) authorizeScheduleUser(user *user.User, authInfo, remoteIP string) bool { + for _, job := range s.cfg.Server.Schedule { + if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { + dlog.Server.Debug(user, "Granting SSH connection") + return true + } + } + return false +} + +func (s *Server) authorizeContinuousUser(user *user.User, authInfo, remoteIP string) bool { + for _, job := range s.cfg.Server.Continuous { + if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { + dlog.Server.Debug(user, "Granting SSH connection") + return true + } + } + return false +} + func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool { |
