summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
committerPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
commit5d1b9f1062d38c301c0995ec6da980bdf5e48332 (patch)
tree81e1a8963ea66cf06164e89beb6cd2da0ee325f7 /internal
parentbb46cfbccea301721fb93485ea7169f5841feda3 (diff)
Improve lint/vet reliability and refactor client runtime/bootstrap
Diffstat (limited to 'internal')
-rw-r--r--internal/cli/runtime.go84
-rw-r--r--internal/config/initializer.go17
-rw-r--r--internal/config/initializer_test.go68
-rw-r--r--internal/mapr/logformat/custom1.go1
-rw-r--r--internal/mapr/logformat/custom2.go1
-rw-r--r--internal/server/handlers/authkeycommand_test.go6
-rw-r--r--internal/server/handlers/basehandler.go10
-rw-r--r--internal/server/handlers/readcommand_server.go65
-rw-r--r--internal/server/handlers/serverhandler.go2
-rw-r--r--internal/ssh/client/knownhostscallback.go10
-rw-r--r--internal/ssh/server/authkeystore.go4
-rw-r--r--internal/tools/common/data_generator.go32
-rw-r--r--internal/tools/profile/analyze.go48
13 files changed, 282 insertions, 66 deletions
diff --git a/internal/cli/runtime.go b/internal/cli/runtime.go
new file mode 100644
index 0000000..5e577c2
--- /dev/null
+++ b/internal/cli/runtime.go
@@ -0,0 +1,84 @@
+package cli
+
+import (
+ "context"
+ "net/http"
+ _ "net/http/pprof" // Register pprof handlers when runtime pprof endpoint is enabled.
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/profiling"
+ "github.com/mimecast/dtail/internal/source"
+)
+
+// ClientRuntime owns common client command runtime components.
+type ClientRuntime struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+ profiler *profiling.Profiler
+ profileEnabled bool
+}
+
+// NewClientRuntime starts logging and profiling for a client command.
+func NewClientRuntime(parent context.Context, profileFlags profiling.Flags, profileName string) *ClientRuntime {
+ if parent == nil {
+ parent = context.Background()
+ }
+ ctx, cancel := context.WithCancel(parent)
+ runtime := &ClientRuntime{
+ ctx: ctx,
+ cancel: cancel,
+ profiler: profiling.NewProfiler(profileFlags.ToConfig(profileName)),
+ profileEnabled: profileFlags.Enabled(),
+ }
+
+ runtime.wg.Add(1)
+ dlog.Start(ctx, &runtime.wg, source.Client)
+ return runtime
+}
+
+// Context returns the runtime context.
+func (r *ClientRuntime) Context() context.Context {
+ return r.ctx
+}
+
+// Cancel cancels the runtime context.
+func (r *ClientRuntime) Cancel() {
+ r.cancel()
+}
+
+// StartPProf starts the pprof server if an address is provided.
+func (r *ClientRuntime) StartPProf(address string) {
+ if address == "" {
+ return
+ }
+
+ dlog.Client.Info("Starting PProf", address)
+ go func() {
+ if err := http.ListenAndServe(address, nil); err != nil {
+ dlog.Client.Error("PProf server exited", err)
+ }
+ }()
+}
+
+// LogStartupMetrics logs startup profiling metrics when enabled.
+func (r *ClientRuntime) LogStartupMetrics() {
+ if r.profileEnabled {
+ r.profiler.LogMetrics("startup")
+ }
+}
+
+// LogShutdownMetrics logs shutdown profiling metrics when enabled.
+func (r *ClientRuntime) LogShutdownMetrics() {
+ if r.profileEnabled {
+ r.profiler.LogMetrics("shutdown")
+ }
+}
+
+// Stop stops profiling and logging runtime goroutines.
+func (r *ClientRuntime) Stop() {
+ r.profiler.Stop()
+ r.cancel()
+ r.wg.Wait()
+}
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index b540457..6038705 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -29,13 +29,18 @@ func (in *initializer) parseConfig(args *Args) error {
return in.parseSpecificConfig(args.ConfigFile)
}
- if homeDir, err := os.UserHomeDir(); err != nil {
- var paths []string
- paths = append(paths, fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir))
- paths = append(paths, fmt.Sprintf("%s/.dtail.conf", homeDir))
+ homeDir, err := os.UserHomeDir()
+ if err == nil && homeDir != "" {
+ paths := []string{
+ fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir),
+ fmt.Sprintf("%s/.dtail.conf", homeDir),
+ }
for _, configPath := range paths {
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- continue
+ if _, err := os.Stat(configPath); err != nil {
+ if os.IsNotExist(err) {
+ continue
+ }
+ return err
}
if err := in.parseSpecificConfig(configPath); err != nil {
return err
diff --git a/internal/config/initializer_test.go b/internal/config/initializer_test.go
new file mode 100644
index 0000000..ea6b229
--- /dev/null
+++ b/internal/config/initializer_test.go
@@ -0,0 +1,68 @@
+package config
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestParseConfigLoadsDefaultXDGConfig(t *testing.T) {
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ configPath := filepath.Join(home, ".config", "dtail", "dtail.conf")
+ if err := os.MkdirAll(filepath.Dir(configPath), 0o755); err != nil {
+ t.Fatalf("mkdir failed: %v", err)
+ }
+ writeTestConfig(t, configPath, `{"Common":{"LogLevel":"debug"}}`)
+
+ in := initializer{
+ Common: newDefaultCommonConfig(),
+ Server: newDefaultServerConfig(),
+ Client: newDefaultClientConfig(),
+ }
+
+ if err := in.parseConfig(&Args{}); err != nil {
+ t.Fatalf("parseConfig failed: %v", err)
+ }
+ if in.Common.LogLevel != "debug" {
+ t.Fatalf("expected log level debug, got %q", in.Common.LogLevel)
+ }
+}
+
+func TestParseConfigLoadsDefaultConfigsInOrder(t *testing.T) {
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ xdgPath := filepath.Join(home, ".config", "dtail", "dtail.conf")
+ if err := os.MkdirAll(filepath.Dir(xdgPath), 0o755); err != nil {
+ t.Fatalf("mkdir failed: %v", err)
+ }
+ writeTestConfig(t, xdgPath, `{"Common":{"Logger":"file","LogLevel":"warn"}}`)
+
+ homePath := filepath.Join(home, ".dtail.conf")
+ writeTestConfig(t, homePath, `{"Common":{"LogLevel":"error"}}`)
+
+ in := initializer{
+ Common: newDefaultCommonConfig(),
+ Server: newDefaultServerConfig(),
+ Client: newDefaultClientConfig(),
+ }
+
+ if err := in.parseConfig(&Args{}); err != nil {
+ t.Fatalf("parseConfig failed: %v", err)
+ }
+ if in.Common.LogLevel != "error" {
+ t.Fatalf("expected final log level error, got %q", in.Common.LogLevel)
+ }
+ if in.Common.Logger != "file" {
+ t.Fatalf("expected logger file from first config, got %q", in.Common.Logger)
+ }
+}
+
+func writeTestConfig(t *testing.T, path, body string) {
+ t.Helper()
+ if err := os.WriteFile(path, []byte(body), 0o644); err != nil {
+ t.Fatalf("write config failed: %v", err)
+ }
+}
diff --git a/internal/mapr/logformat/custom1.go b/internal/mapr/logformat/custom1.go
index 05e0867..e340dbf 100644
--- a/internal/mapr/logformat/custom1.go
+++ b/internal/mapr/logformat/custom1.go
@@ -2,6 +2,7 @@ package logformat
import "errors"
+// ErrCustom1NotImplemented indicates custom1 parser is only a template.
var ErrCustom1NotImplemented error = errors.New("custom1 log format is not implemented")
// Template for creating a custom log format.
diff --git a/internal/mapr/logformat/custom2.go b/internal/mapr/logformat/custom2.go
index cc8d5b9..4fddfcd 100644
--- a/internal/mapr/logformat/custom2.go
+++ b/internal/mapr/logformat/custom2.go
@@ -2,6 +2,7 @@ package logformat
import "errors"
+// ErrCustom2NotImplemented indicates custom2 parser is only a template.
var ErrCustom2NotImplemented error = errors.New("custom2 log format is not implemented")
// Template for creating a custom log format.
diff --git a/internal/server/handlers/authkeycommand_test.go b/internal/server/handlers/authkeycommand_test.go
index bb9488b..f510038 100644
--- a/internal/server/handlers/authkeycommand_test.go
+++ b/internal/server/handlers/authkeycommand_test.go
@@ -33,11 +33,11 @@ func TestHandleAuthKeyCommandSuccess(t *testing.T) {
if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY OK\n" {
t.Fatalf("Unexpected response: %q", message)
}
- if !sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) {
+ if !sshserver.AuthKeys().Has(handler.user.Name, key) {
t.Fatalf("Expected key to be stored for user")
}
- sshserver.ServerAuthKeyStore().Remove(handler.user.Name, key)
+ sshserver.AuthKeys().Remove(handler.user.Name, key)
}
func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) {
@@ -51,7 +51,7 @@ func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) {
if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY ERR feature disabled\n" {
t.Fatalf("Unexpected response: %q", message)
}
- if sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) {
+ if sshserver.AuthKeys().Has(handler.user.Name, key) {
t.Fatalf("Expected no key to be stored while feature is disabled")
}
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index f21262e..d510139 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -128,13 +128,9 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
pool.RecycleBytesBuffer(line.Content)
line.Recycle()
- case <-time.After(time.Second):
- select {
- case <-h.done.Done():
- err = io.EOF
- return
- default:
- }
+ case <-h.done.Done():
+ err = io.EOF
+ return
}
return
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 650dcf2..6d7a095 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -8,25 +8,40 @@ import (
"github.com/mimecast/dtail/internal/mapr/server"
)
-type readCommandServer interface {
+type readCommandContext interface {
LogContext() interface{}
- SendServerMessage(message string)
+}
+
+type readCommandFiles interface {
CanReadFile(path string) bool
- ServerMessagesChannel() chan string
CatLimiter() chan struct{}
TailLimiter() chan struct{}
+}
+
+type readCommandMessages interface {
+ SendServerMessage(message string)
+ ServerMessagesChannel() chan string
Hostname() string
PlainOutput() bool
Serverless() bool
- TurboBoostDisabled() bool
- HasRegularAggregate() bool
RegisterAggregateLines(lines chan *line.Line)
SharedLinesChannel() chan *line.Line
+}
+
+type readCommandAggregates interface {
+ HasRegularAggregate() bool
TurboAggregate() *server.TurboAggregate
+}
+
+type readCommandLifecycle interface {
AddPendingFiles(delta int32) int32
CompletePendingFile() (remaining int32, activeCommands int32)
PendingAndActive() (pending int32, activeCommands int32)
TriggerShutdown()
+}
+
+type readCommandTurbo interface {
+ TurboBoostDisabled() bool
IsTurboMode() bool
EnableTurboMode()
HasTurboEOF() bool
@@ -35,6 +50,9 @@ type readCommandServer interface {
GetTurboChannel() chan []byte
TurboChannelLen() int
WaitForTurboEOFAck(timeout time.Duration) bool
+}
+
+type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
AggregateLinesChannelBufferSize() int
@@ -45,90 +63,120 @@ type readCommandServer interface {
TurboEOFAckTimeout() time.Duration
}
+type readCommandServer interface {
+ readCommandContext
+ readCommandFiles
+ readCommandMessages
+ readCommandAggregates
+ readCommandLifecycle
+ readCommandTurbo
+ readCommandTiming
+}
+
var _ readCommandServer = (*ServerHandler)(nil)
+// LogContext returns the logger context associated with the current user/session.
func (h *ServerHandler) LogContext() interface{} {
return h.user
}
+// SendServerMessage sends a formatted server message to the client.
func (h *ServerHandler) SendServerMessage(message string) {
h.sendln(h.serverMessages, message)
}
+// CanReadFile reports whether the current user can read the given path.
func (h *ServerHandler) CanReadFile(path string) bool {
return h.user.HasFilePermission(path, "readfiles")
}
+// ServerMessagesChannel returns the server message channel.
func (h *ServerHandler) ServerMessagesChannel() chan string {
return h.serverMessages
}
+// CatLimiter returns the concurrency limiter for cat/grep style reads.
func (h *ServerHandler) CatLimiter() chan struct{} {
return h.catLimiter
}
+// TailLimiter returns the concurrency limiter for tail reads.
func (h *ServerHandler) TailLimiter() chan struct{} {
return h.tailLimiter
}
+// Hostname returns the short hostname used for response formatting.
func (h *ServerHandler) Hostname() string {
return h.hostname
}
+// PlainOutput reports whether plain output mode is enabled.
func (h *ServerHandler) PlainOutput() bool {
return h.plain
}
+// Serverless reports whether the current session is running in serverless mode.
func (h *ServerHandler) Serverless() bool {
return h.serverless
}
+// TurboBoostDisabled reports whether turbo mode is disabled by configuration.
func (h *ServerHandler) TurboBoostDisabled() bool {
return h.serverCfg.TurboBoostDisable
}
+// HasRegularAggregate reports whether the regular map-reduce aggregate is active.
func (h *ServerHandler) HasRegularAggregate() bool {
return h.aggregate != nil
}
+// RegisterAggregateLines attaches a file line channel to the active aggregate.
func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) {
if h.aggregate != nil {
h.aggregate.NextLinesCh <- lines
}
}
+// SharedLinesChannel returns the shared outbound line channel.
func (h *ServerHandler) SharedLinesChannel() chan *line.Line {
return h.lines
}
+// TurboAggregate returns the turbo aggregate if enabled for the session.
func (h *ServerHandler) TurboAggregate() *server.TurboAggregate {
return h.turboAggregate
}
+// AddPendingFiles increments or decrements the pending file counter.
func (h *ServerHandler) AddPendingFiles(delta int32) int32 {
return atomic.AddInt32(&h.pendingFiles, delta)
}
+// CompletePendingFile marks one file as completed and returns pending/active counters.
func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) {
remaining = atomic.AddInt32(&h.pendingFiles, -1)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return remaining, activeCommands
}
+// PendingAndActive returns the current pending file and active command counts.
func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) {
pending = atomic.LoadInt32(&h.pendingFiles)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return pending, activeCommands
}
+// TriggerShutdown starts the handler shutdown sequence.
func (h *ServerHandler) TriggerShutdown() {
h.shutdown()
}
+// FlushTurboData drains pending turbo data to the underlying writer.
func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
+// TurboEOFAckTimeout returns the timeout used while waiting for turbo EOF ACK.
func (h *ServerHandler) TurboEOFAckTimeout() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second)
}
@@ -147,22 +195,27 @@ func positiveIntOrDefault(value int, fallback int) int {
return value
}
+// ReadGlobRetryInterval returns the retry interval for glob expansion failures.
func (h *ServerHandler) ReadGlobRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second)
}
+// ReadRetryInterval returns the retry interval for repeated file reads.
func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
+// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
}
+// TurboDataTransmissionDelay returns the delay used after turbo flushes.
func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond)
}
+// TurboEOFWaitDuration returns the wait duration used before signaling turbo EOF.
func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond)
if fileCount <= 10 {
@@ -178,10 +231,12 @@ func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
return wait
}
+// ShutdownTurboSerializeWait returns the wait before final turbo shutdown checks.
func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond)
}
+// ShutdownIdleRecheckWait returns the wait used for the final idle recheck.
func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond)
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 5d5a78c..078fd27 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -180,6 +180,6 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont
return
}
- sshserver.ServerAuthKeyStore().Add(h.user.Name, pubKey)
+ sshserver.AuthKeys().Add(h.user.Name, pubKey)
h.sendln(h.serverMessages, "AUTHKEY OK")
}
diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go
index 9c73864..26ab245 100644
--- a/internal/ssh/client/knownhostscallback.go
+++ b/internal/ssh/client/knownhostscallback.go
@@ -243,8 +243,7 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) {
}
// Read old known hosts file, to see which are old and new entries
- os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0666)
- oldFd, err := os.Open(c.knownHostsPath)
+ oldFd, err := os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
@@ -257,9 +256,14 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) {
address := strings.SplitN(line, " ", 2)[0]
if _, ok := addresses[address]; !ok {
- newFd.WriteString(fmt.Sprintf("%s\n", line))
+ if _, err := newFd.WriteString(fmt.Sprintf("%s\n", line)); err != nil {
+ panic(err)
+ }
}
}
+ if err := scanner.Err(); err != nil {
+ panic(err)
+ }
// Now, replace old known hosts file
if err := os.Rename(tmpKnownHostsPath, c.knownHostsPath); err != nil {
diff --git a/internal/ssh/server/authkeystore.go b/internal/ssh/server/authkeystore.go
index c4b89fe..c96b207 100644
--- a/internal/ssh/server/authkeystore.go
+++ b/internal/ssh/server/authkeystore.go
@@ -28,8 +28,8 @@ type AuthKeyStore struct {
now func() time.Time
}
-// ServerAuthKeyStore returns the process-wide auth key cache used by the SSH server.
-func ServerAuthKeyStore() *AuthKeyStore {
+// AuthKeys returns the process-wide auth key cache used by the SSH server.
+func AuthKeys() *AuthKeyStore {
return authKeyStore
}
diff --git a/internal/tools/common/data_generator.go b/internal/tools/common/data_generator.go
index 9446d8a..d3d4225 100644
--- a/internal/tools/common/data_generator.go
+++ b/internal/tools/common/data_generator.go
@@ -12,11 +12,13 @@ import (
// DataFormat represents the format of generated data
type DataFormat string
+// Supported data generator output formats.
const (
- FormatLog DataFormat = "log"
- FormatCSV DataFormat = "csv"
- FormatDTail DataFormat = "dtail"
- FormatMapReduce DataFormat = "mapreduce"
+ // FormatLog generates generic log lines.
+ FormatLog DataFormat = "log"
+ FormatCSV DataFormat = "csv"
+ FormatDTail DataFormat = "dtail"
+ FormatMapReduce DataFormat = "mapreduce"
)
// DataGenerator generates test data for profiling and benchmarking
@@ -112,7 +114,7 @@ func (g *DataGenerator) generateLogFile(filename string, targetSize int64) error
line := fmt.Sprintf("[%s] %s - User %s performed %s action (duration: %dms, status: %s)\n",
timestamp, level, user, action, duration, status)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -157,7 +159,7 @@ func (g *DataGenerator) generateCSVFile(filename string, targetSize int64) error
}
line := fmt.Sprintf("%s,%s,%s,%d,%s\n", timestamp, user, action, duration, status)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -180,14 +182,14 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6
var currentSize int64
lineNum := 0
- hostnames := []string{"server01", "server02", "server03", "server04", "server05",
+ hostnames := []string{"server01", "server02", "server03", "server04", "server05",
"server06", "server07", "server08", "server09", "server10"}
for currentSize < targetSize {
lineNum++
hostname := hostnames[lineNum%len(hostnames)]
- timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
- 10+(lineNum/86400)%12, (lineNum/3600)%30+1,
+ timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
+ 10+(lineNum/86400)%12, (lineNum/3600)%30+1,
(lineNum/3600)%24, (lineNum/60)%60, lineNum%60)
goroutines := 10 + (lineNum % 50)
cgocalls := lineNum % 100
@@ -199,7 +201,7 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6
line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n",
timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -220,13 +222,13 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines
writer := bufio.NewWriter(file)
defer writer.Flush()
- hostnames := []string{"server01", "server02", "server03", "server04", "server05",
+ hostnames := []string{"server01", "server02", "server03", "server04", "server05",
"server06", "server07", "server08", "server09", "server10"}
for i := 1; i <= lines; i++ {
hostname := hostnames[i%len(hostnames)]
- timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
- 10+(i/86400)%12, (i/3600)%30+1,
+ timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
+ 10+(i/86400)%12, (i/3600)%30+1,
(i/3600)%24, (i/60)%60, i%60)
goroutines := 10 + (i % 50)
cgocalls := i % 100
@@ -238,7 +240,7 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines
line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n",
timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections)
-
+
if _, err := writer.WriteString(line); err != nil {
return err
}
@@ -263,4 +265,4 @@ func GenerateCSVFile(filename string, lines int) error {
// Estimate size based on average line length (about 50 bytes per line)
estimatedSize := int64(lines * 50)
return g.generateCSVFile(filename, estimatedSize)
-} \ No newline at end of file
+}
diff --git a/internal/tools/profile/analyze.go b/internal/tools/profile/analyze.go
index f27841a..59503b2 100644
--- a/internal/tools/profile/analyze.go
+++ b/internal/tools/profile/analyze.go
@@ -13,13 +13,13 @@ import (
"github.com/mimecast/dtail/internal/tools/common"
)
-// ProfileInfo holds information about a profile file
-type ProfileInfo struct {
- Path string
- Tool string
- Type string // cpu, mem, alloc
- ModTime string
- Size int64
+// Info holds information about a profile file.
+type Info struct {
+ Path string
+ Tool string
+ Type string // cpu, mem, alloc
+ ModTime string
+ Size int64
}
func runAnalyze(cfg *Config) error {
@@ -58,7 +58,7 @@ func listProfiles(cfg *Config) error {
}
// Group by tool
- byTool := make(map[string][]ProfileInfo)
+ byTool := make(map[string][]Info)
for _, p := range profiles {
byTool[p.Tool] = append(byTool[p.Tool], p)
}
@@ -74,26 +74,26 @@ func listProfiles(cfg *Config) error {
for _, tool := range tools {
fmt.Printf("\n%s profiles:\n", tool)
toolProfiles := byTool[tool]
-
+
// Sort by modification time (newest first)
sort.Slice(toolProfiles, func(i, j int) bool {
return toolProfiles[i].ModTime > toolProfiles[j].ModTime
})
for _, p := range toolProfiles {
- fmt.Printf(" %-8s %s %8s %s\n",
+ fmt.Printf(" %-8s %s %8s %s\n",
p.Type, p.ModTime, common.FormatSize(p.Size), filepath.Base(p.Path))
}
}
fmt.Printf("\nTotal: %d profiles\n", len(profiles))
fmt.Printf("\nUsage: dtail-tools profile -mode analyze <profile_file>\n")
-
+
return nil
}
-func findProfiles(dir string) ([]ProfileInfo, error) {
- var profiles []ProfileInfo
+func findProfiles(dir string) ([]Info, error) {
+ var profiles []Info
pattern := filepath.Join(dir, "*.prof")
matches, err := filepath.Glob(pattern)
@@ -117,7 +117,7 @@ func findProfiles(dir string) ([]ProfileInfo, error) {
tool := parts[0]
profType := parts[1]
- profiles = append(profiles, ProfileInfo{
+ profiles = append(profiles, Info{
Path: path,
Tool: tool,
Type: profType,
@@ -158,11 +158,11 @@ func analyzeProfile(profilePath string, args ...string) error {
func showTopFunctions(profilePath string, count int, isMemProfile bool) error {
args := []string{"tool", "pprof", "-top", fmt.Sprintf("-nodecount=%d", count)}
-
+
if isMemProfile {
args = append(args, "-alloc_space")
}
-
+
args = append(args, profilePath)
cmd := exec.Command("go", args...)
@@ -178,22 +178,22 @@ func showTopFunctions(profilePath string, count int, isMemProfile bool) error {
fmt.Printf("Top %d functions (sorted by flat):\n", count)
fmt.Println("================================================================")
-
+
for scanner.Scan() {
line := scanner.Text()
-
+
// Skip header lines
- if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") ||
- strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") {
+ if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") ||
+ strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") {
continue
}
-
+
// Start printing from the table header
if strings.Contains(line, "flat") && strings.Contains(line, "cum") {
inTop = true
fmt.Println("# Command: go " + strings.Join(args[1:], " "))
}
-
+
if inTop {
fmt.Println(line)
if line != "" {
@@ -216,6 +216,6 @@ func openWebProfile(profilePath string) error {
cmd := exec.Command("go", "tool", "pprof", "-http=:8080", profilePath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
-
+
return cmd.Run()
-} \ No newline at end of file
+}