summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
commitf6e23930da2900c43a5389a2e7d1e38d8221a76f (patch)
tree3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/server
parent1fc24f9affed5128702e4de80572cac8c82d399e (diff)
Refactor server-side config singleton reads
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/mapcommand.go8
-rw-r--r--internal/server/handlers/readcommand.go4
-rw-r--r--internal/server/handlers/readcommand_server.go6
-rw-r--r--internal/server/server.go6
4 files changed, 17 insertions, 7 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 36d9ef3..920c8dd 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -21,13 +21,17 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
+ defaultLogFormat := ""
+ if serverHandler.serverCfg != nil {
+ defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat
+ }
// If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate
// Turbo boost is enabled by default and is a server-side optimization
dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless)
if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
- turboAggregate, err := server.NewTurboAggregate(queryStr)
+ turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
@@ -36,7 +40,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
}
// Otherwise, create a regular Aggregate
- aggregate, err := server.NewAggregate(queryStr)
+ aggregate, err := server.NewAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index c03900f..493f4b7 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -204,13 +204,13 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
switch r.mode {
case omode.GrepClient, omode.CatClient:
- catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel())
+ catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
reader = &catFile
limiter = r.server.CatLimiter()
case omode.TailClient:
fallthrough
default:
- tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel())
+ tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
reader = &tailFile
limiter = r.server.TailLimiter()
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 6d7a095..d073682 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -55,6 +55,7 @@ type readCommandTurbo interface {
type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
+ MaxLineLength() int
AggregateLinesChannelBufferSize() int
TurboDataTransmissionDelay() time.Duration
TurboEOFWaitDuration(fileCount int) time.Duration
@@ -205,6 +206,11 @@ func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
+// MaxLineLength returns the configured max line length for file readers.
+func (h *ServerHandler) MaxLineLength() int {
+ return positiveIntOrDefault(h.serverCfg.MaxLineLength, 1024*1024)
+}
+
// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
diff --git a/internal/server/server.go b/internal/server/server.go
index 72094ef..a788ba0 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -79,7 +79,7 @@ func New(cfg config.RuntimeConfig) *Server {
s.authKeyStore,
)
- private, err := gossh.ParsePrivateKey(server.PrivateHostKey())
+ private, err := gossh.ParsePrivateKey(server.PrivateHostKey(cfg.Server.HostKeyFile, cfg.Server.HostKeyBits))
if err != nil {
dlog.Server.FatalPanic(err)
}
@@ -165,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn,
newChannel gossh.NewChannel) {
- user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String())
+ user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String(), s.cfg.Server.UserPermissions)
if err != nil {
dlog.Server.Error(user, err)
if err := newChannel.Reject(gossh.Prohibited, err.Error()); err != nil {
@@ -290,7 +290,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn,
func (s *Server) Callback(c gossh.ConnMetadata,
authPayload []byte) (*gossh.Permissions, error) {
- user, err := user.New(c.User(), c.RemoteAddr().String())
+ user, err := user.New(c.User(), c.RemoteAddr().String(), s.cfg.Server.UserPermissions)
if err != nil {
return nil, err
}