diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:22:11 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:22:11 +0200 |
| commit | 8c08e4e60219782e50c3a5f20a051e706196f48c (patch) | |
| tree | ccc953967f594eb423c4567579ba5f1eb63d1b45 /internal/server/handlers/readcommand_server.go | |
| parent | 3389e64c2fc2d7bdafb8d1d48118bdaae94a8190 (diff) | |
refactor: add readcommand facade for server dependencies
Diffstat (limited to 'internal/server/handlers/readcommand_server.go')
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go new file mode 100644 index 0000000..5160c5c --- /dev/null +++ b/internal/server/handlers/readcommand_server.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "sync/atomic" + + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/mapr/server" +) + +type readCommandServer interface { + LogContext() interface{} + SendServerMessage(message string) + CanReadFile(path string) bool + ServerMessagesChannel() chan string + CatLimiter() chan struct{} + TailLimiter() chan struct{} + Hostname() string + PlainOutput() bool + Serverless() bool + TurboBoostDisabled() bool + HasRegularAggregate() bool + RegisterAggregateLines(lines chan *line.Line) + SharedLinesChannel() chan *line.Line + TurboAggregate() *server.TurboAggregate + AddPendingFiles(delta int32) int32 + CompletePendingFile() (remaining int32, activeCommands int32) + PendingAndActive() (pending int32, activeCommands int32) + TriggerShutdown() + IsTurboMode() bool + EnableTurboMode() + HasTurboEOF() bool + FlushTurboData() + SignalTurboEOF() + GetTurboChannel() chan []byte +} + +var _ readCommandServer = (*ServerHandler)(nil) + +func (h *ServerHandler) LogContext() interface{} { + return h.user +} + +func (h *ServerHandler) SendServerMessage(message string) { + h.sendln(h.serverMessages, message) +} + +func (h *ServerHandler) CanReadFile(path string) bool { + return h.user.HasFilePermission(path, "readfiles") +} + +func (h *ServerHandler) ServerMessagesChannel() chan string { + return h.serverMessages +} + +func (h *ServerHandler) CatLimiter() chan struct{} { + return h.catLimiter +} + +func (h *ServerHandler) TailLimiter() chan struct{} { + return h.tailLimiter +} + +func (h *ServerHandler) Hostname() string { + return h.hostname +} + +func (h *ServerHandler) PlainOutput() bool { + return h.plain +} + +func (h *ServerHandler) Serverless() bool { + return h.serverless +} + +func (h *ServerHandler) TurboBoostDisabled() bool { + return h.serverCfg.TurboBoostDisable +} + +func (h *ServerHandler) HasRegularAggregate() bool { + return h.aggregate != nil +} + +func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) { + if h.aggregate != nil { + h.aggregate.NextLinesCh <- lines + } +} + +func (h *ServerHandler) SharedLinesChannel() chan *line.Line { + return h.lines +} + +func (h *ServerHandler) TurboAggregate() *server.TurboAggregate { + return h.turboAggregate +} + +func (h *ServerHandler) AddPendingFiles(delta int32) int32 { + return atomic.AddInt32(&h.pendingFiles, delta) +} + +func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) { + remaining = atomic.AddInt32(&h.pendingFiles, -1) + activeCommands = atomic.LoadInt32(&h.activeCommands) + return remaining, activeCommands +} + +func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) { + pending = atomic.LoadInt32(&h.pendingFiles) + activeCommands = atomic.LoadInt32(&h.activeCommands) + return pending, activeCommands +} + +func (h *ServerHandler) TriggerShutdown() { + h.shutdown() +} + +func (h *ServerHandler) FlushTurboData() { + h.flushTurboData() +} |
