summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:42:33 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:42:33 +0200
commita426a2f9f33b1125a05d3aac29e7b98afdc36a99 (patch)
tree13d21a5aef7ec1e586e364ce5bebabb65fd77523 /internal/server
parent3002bdcaa4ec22aa46b6c98eefda2f926dfff618 (diff)
server: use auth strategy registry and stabilize turbo EOF sync
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go2
-rw-r--r--internal/server/handlers/readcommand_server.go1
-rw-r--r--internal/server/handlers/turbo_manager.go36
-rw-r--r--internal/server/server.go64
4 files changed, 79 insertions, 24 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 6078e37..7cd9a63 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -147,6 +147,8 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
timeout := r.server.TurboEOFAckTimeout()
if r.server.WaitForTurboEOFAck(timeout) {
dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged")
+ // Allow transport buffers to flush after acknowledgement.
+ time.Sleep(r.server.ShutdownTurboSerializeWait())
} else {
dlog.Server.Warn(
r.server.LogContext(),
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 2c0a616..650dcf2 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -192,5 +192,6 @@ func (h *ServerHandler) turboManagerConfig() turboManagerConfig {
flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout),
flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval),
readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval),
+ eofAckQuietPeriod: durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, defaultTurboEOFAckQuietPeriod),
}
}
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
index e5cbf1a..deed383 100644
--- a/internal/server/handlers/turbo_manager.go
+++ b/internal/server/handlers/turbo_manager.go
@@ -12,6 +12,7 @@ const (
defaultTurboFlushTimeout = 2 * time.Second
defaultTurboFlushPollInterval = 10 * time.Millisecond
defaultTurboReadRetryInterval = time.Millisecond
+ defaultTurboEOFAckQuietPeriod = 50 * time.Millisecond
)
type turboManagerConfig struct {
@@ -19,6 +20,7 @@ type turboManagerConfig struct {
flushTimeout time.Duration
flushPollInterval time.Duration
readRetryInterval time.Duration
+ eofAckQuietPeriod time.Duration
}
type turboManager struct {
@@ -32,6 +34,9 @@ type turboManager struct {
flushTimeout time.Duration
flushPollInterval time.Duration
readRetryInterval time.Duration
+ eofAckQuietPeriod time.Duration
+
+ eofEmptySince time.Time
}
func (t *turboManager) configure(cfg turboManagerConfig) {
@@ -47,6 +52,9 @@ func (t *turboManager) configure(cfg turboManagerConfig) {
if cfg.readRetryInterval > 0 {
t.readRetryInterval = cfg.readRetryInterval
}
+ if cfg.eofAckQuietPeriod > 0 {
+ t.eofAckQuietPeriod = cfg.eofAckQuietPeriod
+ }
}
func (t *turboManager) resolvedChannelBufferSize() int {
@@ -77,6 +85,13 @@ func (t *turboManager) resolvedReadRetryInterval() time.Duration {
return defaultTurboReadRetryInterval
}
+func (t *turboManager) resolvedEOFAckQuietPeriod() time.Duration {
+ if t.eofAckQuietPeriod > 0 {
+ return t.eofAckQuietPeriod
+ }
+ return defaultTurboEOFAckQuietPeriod
+}
+
func (t *turboManager) enable() {
t.mode = true
if t.lines == nil {
@@ -85,6 +100,7 @@ func (t *turboManager) enable() {
// Always create a new EOF channel for each batch of files.
t.eof = make(chan struct{})
t.eofAck = make(chan struct{})
+ t.eofEmptySince = time.Time{}
}
func (t *turboManager) enabled() bool {
@@ -202,6 +218,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
select {
case turboData := <-t.lines:
dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ t.eofEmptySince = time.Time{}
n = copy(p, turboData)
if n < len(turboData) {
t.buffer = turboData[n:]
@@ -215,6 +232,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
select {
case turboData := <-t.lines:
dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ t.eofEmptySince = time.Time{}
n = copy(p, turboData)
if n < len(turboData) {
t.buffer = turboData[n:]
@@ -228,9 +246,21 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
if t.eof != nil {
select {
case <-t.eof:
- dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
- t.mode = false
- t.signalEOFAck()
+ if len(t.lines) > 0 {
+ t.eofEmptySince = time.Time{}
+ break
+ }
+
+ if t.eofEmptySince.IsZero() {
+ t.eofEmptySince = time.Now()
+ break
+ }
+
+ if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
+ dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
+ t.mode = false
+ t.signalEOFAck()
+ }
default:
}
}
diff --git a/internal/server/server.go b/internal/server/server.go
index 53aeec6..e00dba9 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -36,8 +36,12 @@ type Server struct {
sched *scheduler
// Mointor log files for pattern (if configured)
cont *continuous
+ // Authentication strategies keyed by SSH username.
+ authStrategies map[string]authStrategy
}
+type authStrategy func(*user.User, string, string) bool
+
// New returns a new server.
func New(cfg config.RuntimeConfig) *Server {
if cfg.Server == nil || cfg.Common == nil {
@@ -61,6 +65,7 @@ func New(cfg config.RuntimeConfig) *Server {
sched: newScheduler(cfg),
cont: newContinuous(cfg),
}
+ s.authStrategies = s.newAuthStrategies()
s.sshServerConfig.PasswordCallback = s.Callback
s.sshServerConfig.PublicKeyCallback = server.PublicKeyCallback
@@ -279,32 +284,49 @@ func (s *Server) Callback(c gossh.ConnMetadata,
splitted := strings.Split(c.RemoteAddr().String(), ":")
remoteIP := splitted[0]
- switch user.Name {
- case config.HealthUser:
- if authInfo == config.HealthUser {
- dlog.Server.Debug(user, "Granting permissions to health user")
- return nil, nil
- }
- case config.ScheduleUser:
- for _, job := range s.cfg.Server.Schedule {
- if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
- dlog.Server.Debug(user, "Granting SSH connection")
- return nil, nil
- }
- }
- case config.ContinuousUser:
- for _, job := range s.cfg.Server.Continuous {
- if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
- dlog.Server.Debug(user, "Granting SSH connection")
- return nil, nil
- }
- }
- default:
+ if strategy, found := s.authStrategies[user.Name]; found && strategy(user, authInfo, remoteIP) {
+ return nil, nil
}
return nil, fmt.Errorf("user %s not authorized", user)
}
+func (s *Server) newAuthStrategies() map[string]authStrategy {
+ return map[string]authStrategy{
+ config.HealthUser: s.authorizeHealthUser,
+ config.ScheduleUser: s.authorizeScheduleUser,
+ config.ContinuousUser: s.authorizeContinuousUser,
+ }
+}
+
+func (s *Server) authorizeHealthUser(user *user.User, authInfo, _ string) bool {
+ if authInfo != config.HealthUser {
+ return false
+ }
+ dlog.Server.Debug(user, "Granting permissions to health user")
+ return true
+}
+
+func (s *Server) authorizeScheduleUser(user *user.User, authInfo, remoteIP string) bool {
+ for _, job := range s.cfg.Server.Schedule {
+ if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
+ dlog.Server.Debug(user, "Granting SSH connection")
+ return true
+ }
+ }
+ return false
+}
+
+func (s *Server) authorizeContinuousUser(user *user.User, authInfo, remoteIP string) bool {
+ for _, job := range s.cfg.Server.Continuous {
+ if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
+ dlog.Server.Debug(user, "Granting SSH connection")
+ return true
+ }
+ }
+ return false
+}
+
func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP,
allowedJobName string, allowFrom []string) bool {