diff options
Diffstat (limited to 'internal/collector')
| -rw-r--r-- | internal/collector/collector.go | 152 | ||||
| -rw-r--r-- | internal/collector/script_embed.go | 9 |
2 files changed, 88 insertions, 73 deletions
diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 91f3d7b..0107d61 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -20,13 +20,13 @@ type StatsStore interface { SetNet(host, iface string, net NetLine, stamp float64) } -// Run starts a collector for one host: runs the embedded remote script (local or over SSH) and parses the stream into store. -// Host may be "host" or "host:user". It runs until ctx is cancelled or the command exits. -// The script is embedded in the binary; no external script file is required. +// Run starts a collector for one host: runs the embedded remote script (local or over SSH) +// and parses the output stream into store. Host may be "host" or "host:user". +// It runs until ctx is cancelled or the command exits. func Run(ctx context.Context, host string, cfg *config.Config, store StatsStore) error { hostKey, user := splitHostUser(host) - // Select script: Only Linux supported for local monitoring + // Select script: only Linux is supported for local monitoring. scriptBytes := LinuxScript if isLocal(hostKey) { scriptBytes = getLocalScript() @@ -35,45 +35,67 @@ func Run(ctx context.Context, host string, cfg *config.Config, store StatsStore) } } - script := bytes.NewReader(scriptBytes) - var scanner *bufio.Scanner + var ( + scanner *bufio.Scanner + cmd *exec.Cmd + err error + ) if isLocal(hostKey) { - cmd := exec.CommandContext(ctx, "bash", "-s") - cmd.Stdin = script - stdout, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("%s: %w", hostKey, err) - } - if err := cmd.Start(); err != nil { - return fmt.Errorf("%s: %w", hostKey, err) - } - defer cmd.Wait() - scanner = bufio.NewScanner(stdout) + scanner, cmd, err = startLocalScanner(ctx, scriptBytes) } else { - args := []string{"-o", "StrictHostKeyChecking=no"} - if cfg.SSHOpts != "" { - args = append(args, strings.Fields(cfg.SSHOpts)...) - } - if user != "" { - args = append(args, "-l", user) - } - args = append(args, hostKey, "bash -s") - cmd := exec.CommandContext(ctx, "ssh", args...) - cmd.Stdin = script - stdout, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("%s: %w", hostKey, err) - } - if err := cmd.Start(); err != nil { - return fmt.Errorf("%s: %w", hostKey, err) - } - defer cmd.Wait() - scanner = bufio.NewScanner(stdout) + scanner, cmd, err = startRemoteScanner(ctx, hostKey, user, scriptBytes, cfg) + } + if err != nil { + return fmt.Errorf("%s: %w", hostKey, err) } + defer cmd.Wait() + + return parseCollectorStream(ctx, scanner, hostKey, store) +} +// startLocalScanner spawns "bash -s" with scriptBytes on stdin and returns a scanner +// over its stdout along with the Cmd for deferred Wait. +func startLocalScanner(ctx context.Context, scriptBytes []byte) (*bufio.Scanner, *exec.Cmd, error) { + cmd := exec.CommandContext(ctx, "bash", "-s") + cmd.Stdin = bytes.NewReader(scriptBytes) + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, err + } + if err := cmd.Start(); err != nil { + return nil, nil, err + } + return bufio.NewScanner(stdout), cmd, nil +} + +// startRemoteScanner spawns "ssh host bash -s" with scriptBytes on stdin and returns +// a scanner over its stdout along with the Cmd for deferred Wait. +func startRemoteScanner(ctx context.Context, hostKey, user string, scriptBytes []byte, cfg *config.Config) (*bufio.Scanner, *exec.Cmd, error) { + args := []string{"-o", "StrictHostKeyChecking=no"} + if cfg.SSHOpts != "" { + args = append(args, strings.Fields(cfg.SSHOpts)...) + } + if user != "" { + args = append(args, "-l", user) + } + args = append(args, hostKey, "bash -s") + cmd := exec.CommandContext(ctx, "ssh", args...) + cmd.Stdin = bytes.NewReader(scriptBytes) + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, err + } + if err := cmd.Start(); err != nil { + return nil, nil, err + } + return bufio.NewScanner(stdout), cmd, nil +} + +// parseCollectorStream reads lines from scanner and dispatches parsed stats to store. +// Always collects all CPU lines (cpu, cpu0, cpu1, ...) so display can toggle per-core +// view with key 1 without a reconnect. +func parseCollectorStream(ctx context.Context, scanner *bufio.Scanner, hostKey string, store StatsStore) error { mode := "" - // Always collect all CPU lines (cpu, cpu0, cpu1, ...) so display can toggle per-core view with key 1 - cpustring := "cpu" for scanner.Scan() { select { case <-ctx.Done(): @@ -85,38 +107,40 @@ func Run(ctx context.Context, host string, cfg *config.Config, store StatsStore) mode = line continue } - switch mode { - case ModeLoadAvg: - l := ParseLoadAvg(line) - store.SetLoadAvg(hostKey, l.Load1, l.Load5, l.Load15) - case ModeMemStats: - if mem, ok := ParseMemLine(line); ok { - store.SetMem(hostKey, mem.Key, mem.Value) - } - case ModeNetStats: - if idx := strings.Index(line, ":"); idx >= 0 { - iface := strings.TrimSpace(line[:idx]) - rest := line[idx+1:] - net, err := ParseNetLine(iface + ":" + rest) - if err != nil { - continue - } + dispatchCollectorLine(mode, line, hostKey, store) + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("%s: read: %w", hostKey, err) + } + return nil +} + +// dispatchCollectorLine routes one parsed line to the appropriate store setter +// based on the current protocol mode marker. +func dispatchCollectorLine(mode, line, hostKey string, store StatsStore) { + switch mode { + case ModeLoadAvg: + l := ParseLoadAvg(line) + store.SetLoadAvg(hostKey, l.Load1, l.Load5, l.Load15) + case ModeMemStats: + if mem, ok := ParseMemLine(line); ok { + store.SetMem(hostKey, mem.Key, mem.Value) + } + case ModeNetStats: + if idx := strings.Index(line, ":"); idx >= 0 { + iface := strings.TrimSpace(line[:idx]) + net, err := ParseNetLine(iface + ":" + line[idx+1:]) + if err == nil { store.SetNet(hostKey, net.Iface, net, float64(time.Now().UnixNano())/1e9) } - case ModeCPUStats: - if strings.HasPrefix(line, cpustring) { - cu, err := ParseCPULine(line) - if err != nil { - continue - } + } + case ModeCPUStats: + if strings.HasPrefix(line, "cpu") { + if cu, err := ParseCPULine(line); err == nil { store.SetCPU(hostKey, cu.Name, cu) } } } - if err := scanner.Err(); err != nil { - return fmt.Errorf("%s: read: %w", hostKey, err) - } - return nil } // splitHostUser splits "host:user" into (host, user). If no colon, returns (host, ""). diff --git a/internal/collector/script_embed.go b/internal/collector/script_embed.go deleted file mode 100644 index 168ca37..0000000 --- a/internal/collector/script_embed.go +++ /dev/null @@ -1,9 +0,0 @@ -package collector - -import _ "embed" - -// RemoteScript is the loadbars-remote.sh script embedded for local and SSH execution. -// Path is relative to this file's directory (internal/collector). -// -//go:embed scriptdata/loadbars-remote.sh -var RemoteScript []byte |
