diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
| commit | f6e23930da2900c43a5389a2e7d1e38d8221a76f (patch) | |
| tree | 3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/server | |
| parent | 1fc24f9affed5128702e4de80572cac8c82d399e (diff) | |
Refactor server-side config singleton reads
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 8 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 6 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
4 files changed, 17 insertions, 7 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 36d9ef3..920c8dd 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -21,13 +21,17 @@ func newMapCommand(serverHandler *ServerHandler, argc int, m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") + defaultLogFormat := "" + if serverHandler.serverCfg != nil { + defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat + } // If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate // Turbo boost is enabled by default and is a server-side optimization dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless) if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless { dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) - turboAggregate, err := server.NewTurboAggregate(queryStr) + turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat) if err != nil { return m, nil, nil, err } @@ -36,7 +40,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int, } // Otherwise, create a regular Aggregate - aggregate, err := server.NewAggregate(queryStr) + aggregate, err := server.NewAggregate(queryStr, defaultLogFormat) if err != nil { return m, nil, nil, err } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index c03900f..493f4b7 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -204,13 +204,13 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, switch r.mode { case omode.GrepClient, omode.CatClient: - catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel()) + catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) reader = &catFile limiter = r.server.CatLimiter() case omode.TailClient: fallthrough default: - tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel()) + tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) reader = &tailFile limiter = r.server.TailLimiter() } diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 6d7a095..d073682 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -55,6 +55,7 @@ type readCommandTurbo interface { type readCommandTiming interface { ReadGlobRetryInterval() time.Duration ReadRetryInterval() time.Duration + MaxLineLength() int AggregateLinesChannelBufferSize() int TurboDataTransmissionDelay() time.Duration TurboEOFWaitDuration(fileCount int) time.Duration @@ -205,6 +206,11 @@ func (h *ServerHandler) ReadRetryInterval() time.Duration { return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second) } +// MaxLineLength returns the configured max line length for file readers. +func (h *ServerHandler) MaxLineLength() int { + return positiveIntOrDefault(h.serverCfg.MaxLineLength, 1024*1024) +} + // AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size. func (h *ServerHandler) AggregateLinesChannelBufferSize() int { return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000) diff --git a/internal/server/server.go b/internal/server/server.go index 72094ef..a788ba0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -79,7 +79,7 @@ func New(cfg config.RuntimeConfig) *Server { s.authKeyStore, ) - private, err := gossh.ParsePrivateKey(server.PrivateHostKey()) + private, err := gossh.ParsePrivateKey(server.PrivateHostKey(cfg.Server.HostKeyFile, cfg.Server.HostKeyBits)) if err != nil { dlog.Server.FatalPanic(err) } @@ -165,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) { func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) { - user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String()) + user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String(), s.cfg.Server.UserPermissions) if err != nil { dlog.Server.Error(user, err) if err := newChannel.Reject(gossh.Prohibited, err.Error()); err != nil { @@ -290,7 +290,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn, func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { - user, err := user.New(c.User(), c.RemoteAddr().String()) + user, err := user.New(c.User(), c.RemoteAddr().String(), s.cfg.Server.UserPermissions) if err != nil { return nil, err } |
