diff options
| author | Paul Buetow <paul@buetow.org> | 2025-12-30 22:29:41 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-12-30 22:29:41 +0200 |
| commit | 24592b36da26e7c6ef30aca3017f9da6ceb2f086 (patch) | |
| tree | 231c6abb8cdb5c2e9d56d708bebbf55c57b72fa2 | |
| parent | 88075b925598f438d15a352364ce17c302a21351 (diff) | |
Refactor prometheus-pusher following Go best practices
Major refactoring to improve code organization and follow Go conventions:
- Moved main entry point to cmd/prometheus-pusher/main.go
- Organized code into internal packages (config, metrics, parser, ingester, version)
- Implemented proper dependency injection (no package-level variables)
- Added context.Context to all blocking operations
- Used value semantics where feasible (Sample, Config, Ingesters)
- Proper error wrapping with %w throughout
- All functions under 50 lines, focused and single-purpose
- Consistent ordering: constants, types, constructors, public, private
- Added -version flag to display version from internal/version package
Package structure:
- cmd/prometheus-pusher: Main entry point with flag parsing and mode routing
- internal/config: Configuration types and constants
- internal/version: Version constant (0.0.0)
- internal/metrics: Sample type and Collectors for metric generation
- internal/parser: CSV/JSON parsers with context support
- internal/ingester: Pushgateway, RemoteWrite, and Auto ingesters
All modes tested and working: realtime, historic, backfill, auto
š¤ Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| -rw-r--r-- | f3s/prometheus-pusher/auto-ingest.go | 371 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/cmd/prometheus-pusher/main.go | 193 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/historic.go | 217 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/config/config.go | 49 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/ingester/auto.go | 131 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/ingester/pushgateway.go | 51 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/ingester/remotewrite.go | 252 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/metrics/generator.go | 83 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/metrics/sample.go | 34 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/parser/csv.go | 101 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/parser/json.go | 62 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/parser/parser.go | 56 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/internal/version/version.go | 4 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/main.go | 91 | ||||
| -rwxr-xr-x | f3s/prometheus-pusher/prometheus-pusher | bin | 14022801 -> 14053904 bytes | |||
| -rw-r--r-- | f3s/prometheus-pusher/realtime.go | 100 |
16 files changed, 1016 insertions, 779 deletions
diff --git a/f3s/prometheus-pusher/auto-ingest.go b/f3s/prometheus-pusher/auto-ingest.go deleted file mode 100644 index 0daff20..0000000 --- a/f3s/prometheus-pusher/auto-ingest.go +++ /dev/null @@ -1,371 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "encoding/csv" - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "os" - "strconv" - "strings" - "time" - - "github.com/golang/snappy" - "github.com/prometheus/prometheus/prompb" -) - -// MetricSample represents a single metric sample with timestamp -type MetricSample struct { - MetricName string - Labels map[string]string - Value float64 - Timestamp time.Time -} - -// IngestMode represents the ingestion strategy -type IngestMode string - -const ( - ModeRealtime IngestMode = "realtime" // Use Pushgateway (current data) - ModeHistoric IngestMode = "historic" // Use Remote Write (old data) -) - -// DetermineIngestMode automatically determines which ingestion mode to use -// based on the age of the timestamp -func DetermineIngestMode(timestamp time.Time) IngestMode { - age := time.Since(timestamp) - - // Threshold: data older than 5 minutes uses historic mode - // This allows for some clock skew and processing delay - threshold := 5 * time.Minute - - if age > threshold { - return ModeHistoric - } - return ModeRealtime -} - -// ParseCSVMetrics parses metrics from CSV format -// Expected format: metric_name,label1=value1;label2=value2,value,timestamp_unix_ms -// Example: app_requests_total,instance=web1;env=prod,42,1735516800000 -func ParseCSVMetrics(reader io.Reader) ([]MetricSample, error) { - var samples []MetricSample - - csvReader := csv.NewReader(reader) - csvReader.Comment = '#' - - lineNum := 0 - for { - record, err := csvReader.Read() - if err == io.EOF { - break - } - if err != nil { - return nil, fmt.Errorf("line %d: %w", lineNum, err) - } - lineNum++ - - if len(record) < 3 { - log.Printf("Warning: line %d: skipping invalid record (need at least 3 fields)", lineNum) - continue - } - - // Parse metric name - metricName := strings.TrimSpace(record[0]) - if metricName == "" { - log.Printf("Warning: line %d: skipping empty metric name", lineNum) - continue - } - - // Parse labels - labels := make(map[string]string) - if len(record) > 1 && record[1] != "" { - labelPairs := strings.Split(record[1], ";") - for _, pair := range labelPairs { - parts := strings.SplitN(pair, "=", 2) - if len(parts) == 2 { - labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) - } - } - } - - // Parse value - value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64) - if err != nil { - log.Printf("Warning: line %d: skipping invalid value: %v", lineNum, err) - continue - } - - // Parse timestamp (optional, defaults to now) - var timestamp time.Time - if len(record) > 3 && record[3] != "" { - timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64) - if err != nil { - log.Printf("Warning: line %d: invalid timestamp, using current time: %v", lineNum, err) - timestamp = time.Now() - } else { - timestamp = time.UnixMilli(timestampMs) - } - } else { - timestamp = time.Now() - } - - samples = append(samples, MetricSample{ - MetricName: metricName, - Labels: labels, - Value: value, - Timestamp: timestamp, - }) - } - - return samples, nil -} - -// ParseJSONMetrics parses metrics from JSON format -// Expected format: array of {metric: string, labels: {}, value: number, timestamp_ms: number} -func ParseJSONMetrics(reader io.Reader) ([]MetricSample, error) { - var rawSamples []struct { - Metric string `json:"metric"` - Labels map[string]string `json:"labels"` - Value float64 `json:"value"` - TimestampMs int64 `json:"timestamp_ms,omitempty"` - } - - decoder := json.NewDecoder(reader) - if err := decoder.Decode(&rawSamples); err != nil { - return nil, fmt.Errorf("failed to parse JSON: %w", err) - } - - var samples []MetricSample - for i, raw := range rawSamples { - timestamp := time.Now() - if raw.TimestampMs > 0 { - timestamp = time.UnixMilli(raw.TimestampMs) - } - - if raw.Metric == "" { - log.Printf("Warning: sample %d: skipping empty metric name", i) - continue - } - - if raw.Labels == nil { - raw.Labels = make(map[string]string) - } - - samples = append(samples, MetricSample{ - MetricName: raw.Metric, - Labels: raw.Labels, - Value: raw.Value, - Timestamp: timestamp, - }) - } - - return samples, nil -} - -// AutoIngestMetrics automatically ingests metrics using the appropriate method -// based on timestamp age -func AutoIngestMetrics(samples []MetricSample, pushgatewayURL, prometheusURL, jobName string) error { - if len(samples) == 0 { - return fmt.Errorf("no samples to ingest") - } - - // Group samples by ingestion mode - realtimeSamples := make([]MetricSample, 0) - historicSamples := make([]MetricSample, 0) - - for _, sample := range samples { - mode := DetermineIngestMode(sample.Timestamp) - if mode == ModeRealtime { - realtimeSamples = append(realtimeSamples, sample) - } else { - historicSamples = append(historicSamples, sample) - } - } - - log.Printf("š Auto-ingest summary:") - log.Printf(" Total samples: %d", len(samples)) - log.Printf(" Realtime samples (< 5min old): %d", len(realtimeSamples)) - log.Printf(" Historic samples (> 5min old): %d", len(historicSamples)) - - // Ingest realtime samples via Pushgateway - if len(realtimeSamples) > 0 { - log.Printf("\nš Ingesting %d REALTIME samples via Pushgateway...", len(realtimeSamples)) - if err := ingestViaPushgateway(realtimeSamples, pushgatewayURL, jobName); err != nil { - return fmt.Errorf("failed to ingest realtime samples: %w", err) - } - log.Printf("ā
Successfully ingested %d realtime samples", len(realtimeSamples)) - } - - // Ingest historic samples via Remote Write - if len(historicSamples) > 0 { - log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(historicSamples)) - for i, sample := range historicSamples { - age := time.Since(sample.Timestamp) - log.Printf(" [%d/%d] %s (age: %s)", i+1, len(historicSamples), sample.MetricName, formatDuration(age)) - } - - if err := ingestViaRemoteWrite(historicSamples, prometheusURL); err != nil { - return fmt.Errorf("failed to ingest historic samples: %w", err) - } - log.Printf("ā
Successfully ingested %d historic samples", len(historicSamples)) - } - - log.Printf("\nš Auto-ingest complete!") - return nil -} - -// ingestViaPushgateway ingests samples using Pushgateway (for realtime data) -// Note: Pushgateway doesn't preserve timestamps, so this is only for current data -func ingestViaPushgateway(samples []MetricSample, pushgatewayURL, jobName string) error { - log.Printf(" Note: Pushgateway ingestion uses current timestamp (original timestamps ignored)") - log.Printf(" Samples will appear with 'now' timestamp in Prometheus") - - // We use the existing pushMetrics function for realtime data - // Since Pushgateway doesn't support custom timestamps, we just push current values - simulateMetrics() // Generate current metrics - return pushMetrics(pushgatewayURL, jobName) -} - -// ingestViaRemoteWrite ingests samples using Remote Write API (preserves timestamps) -func ingestViaRemoteWrite(samples []MetricSample, prometheusURL string) error { - var timeSeries []prompb.TimeSeries - - for _, sample := range samples { - labels := []prompb.Label{ - {Name: "__name__", Value: sample.MetricName}, - } - - for k, v := range sample.Labels { - labels = append(labels, prompb.Label{Name: k, Value: v}) - } - - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: labels, - Samples: []prompb.Sample{ - { - Value: sample.Value, - Timestamp: sample.Timestamp.UnixMilli(), - }, - }, - }) - } - - writeRequest := &prompb.WriteRequest{ - Timeseries: timeSeries, - } - - return sendRemoteWrite(prometheusURL, writeRequest) -} - -// formatDuration formats a duration in human-readable form -func formatDuration(d time.Duration) string { - if d < time.Minute { - return fmt.Sprintf("%.0f seconds", d.Seconds()) - } else if d < time.Hour { - return fmt.Sprintf("%.0f minutes", d.Minutes()) - } else if d < 24*time.Hour { - return fmt.Sprintf("%.1f hours", d.Hours()) - } else { - return fmt.Sprintf("%.1f days", d.Hours()/24) - } -} - -// AutoIngestFromFile reads a file and automatically ingests metrics -func AutoIngestFromFile(filename, format, pushgatewayURL, prometheusURL, jobName string) error { - file, err := os.Open(filename) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - log.Printf("š Reading metrics from: %s (format: %s)", filename, format) - - var samples []MetricSample - - switch format { - case "csv": - samples, err = ParseCSVMetrics(file) - case "json": - samples, err = ParseJSONMetrics(file) - default: - return fmt.Errorf("unsupported format: %s (use csv or json)", format) - } - - if err != nil { - return fmt.Errorf("failed to parse metrics: %w", err) - } - - if len(samples) == 0 { - return fmt.Errorf("no valid samples found in file") - } - - return AutoIngestMetrics(samples, pushgatewayURL, prometheusURL, jobName) -} - -// AutoIngestFromStdin reads metrics from stdin and automatically ingests them -func AutoIngestFromStdin(format, pushgatewayURL, prometheusURL, jobName string) error { - log.Printf("š„ Reading metrics from stdin (format: %s)", format) - log.Printf(" Enter metrics, then press Ctrl+D when done") - - var samples []MetricSample - var err error - - reader := bufio.NewReader(os.Stdin) - - switch format { - case "csv": - samples, err = ParseCSVMetrics(reader) - case "json": - samples, err = ParseJSONMetrics(reader) - default: - return fmt.Errorf("unsupported format: %s (use csv or json)", format) - } - - if err != nil { - return fmt.Errorf("failed to parse metrics: %w", err) - } - - if len(samples) == 0 { - return fmt.Errorf("no valid samples found") - } - - return AutoIngestMetrics(samples, pushgatewayURL, prometheusURL, jobName) -} - -// Helper function to send remote write request (reuses code from historic.go) -func sendRemoteWrite(prometheusURL string, writeRequest *prompb.WriteRequest) error { - data, err := writeRequest.Marshal() - if err != nil { - return fmt.Errorf("failed to marshal write request: %w", err) - } - - compressed := snappy.Encode(nil, data) - - req, err := http.NewRequest("POST", prometheusURL, bytes.NewReader(compressed)) - if err != nil { - return fmt.Errorf("failed to create HTTP request: %w", err) - } - - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("Content-Encoding", "snappy") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to send remote write request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) - } - - return nil -} diff --git a/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go b/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go new file mode 100644 index 0000000..905efa1 --- /dev/null +++ b/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go @@ -0,0 +1,193 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "syscall" + "time" + + "prometheus-pusher/internal/config" + "prometheus-pusher/internal/ingester" + "prometheus-pusher/internal/metrics" + "prometheus-pusher/internal/parser" + "prometheus-pusher/internal/version" +) + +func main() { + cfg := parseFlags() + + rand.Seed(time.Now().UnixNano()) + + ctx, cancel := createContextWithSignalHandler() + defer cancel() + + if err := run(ctx, cfg); err != nil { + log.Fatalf("Error: %v", err) + } +} + +// parseFlags parses command-line flags and returns a Config. +func parseFlags() config.Config { + cfg := config.NewConfig() + + showVersion := flag.Bool("version", false, "Print version and exit") + mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, or auto") + pushgatewayURL := flag.String("pushgateway", cfg.PushgatewayURL, "Pushgateway URL for realtime mode") + prometheusURL := flag.String("prometheus", cfg.PrometheusURL, "Prometheus remote write URL for historic mode") + jobName := flag.String("job", cfg.JobName, "Job name for metrics") + continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s") + + hoursAgo := flag.Int("hours-ago", cfg.HoursAgo, "For historic mode: how many hours ago (single datapoint)") + startHours := flag.Int("start-hours", cfg.StartHours, "For backfill: start time in hours ago") + endHours := flag.Int("end-hours", cfg.EndHours, "For backfill: end time in hours ago") + interval := flag.Int("interval", cfg.Interval, "For backfill: interval between datapoints in hours") + + inputFile := flag.String("file", "", "For auto mode: input file with metrics") + inputFormat := flag.String("format", cfg.InputFormat, "For auto mode: input format (csv or json)") + + flag.Parse() + + if *showVersion { + fmt.Printf("prometheus-pusher version %s\n", version.Version) + os.Exit(0) + } + + cfg.Mode = config.Mode(*mode) + cfg.PushgatewayURL = *pushgatewayURL + cfg.PrometheusURL = *prometheusURL + cfg.JobName = *jobName + cfg.Continuous = *continuous + cfg.HoursAgo = *hoursAgo + cfg.StartHours = *startHours + cfg.EndHours = *endHours + cfg.Interval = *interval + cfg.InputFile = *inputFile + cfg.InputFormat = *inputFormat + + return cfg +} + +// createContextWithSignalHandler creates a context that cancels on interrupt signals. +func createContextWithSignalHandler() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigChan + log.Printf("\nReceived interrupt signal, shutting down...") + cancel() + }() + + return ctx, cancel +} + +// run executes the appropriate mode based on configuration. +func run(ctx context.Context, cfg config.Config) error { + switch cfg.Mode { + case config.ModeRealtime: + return runRealtimeMode(ctx, cfg) + case config.ModeHistoric: + return runHistoricMode(ctx, cfg) + case config.ModeBackfill: + return runBackfillMode(ctx, cfg) + case config.ModeAuto: + return runAutoMode(ctx, cfg) + default: + return fmt.Errorf("unknown mode: %s (use realtime, historic, backfill, or auto)", cfg.Mode) + } +} + +// runRealtimeMode runs the realtime ingestion mode. +func runRealtimeMode(ctx context.Context, cfg config.Config) error { + log.Printf("Starting Prometheus metrics pusher in REALTIME mode") + log.Printf("Pushgateway URL: %s", cfg.PushgatewayURL) + log.Printf("Job name: %s", cfg.JobName) + + collectors := metrics.NewCollectors() + pushgateway := ingester.NewPushgatewayIngester() + + if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + return fmt.Errorf("failed to push metrics: %w", err) + } + log.Printf("Successfully pushed metrics to Pushgateway") + + if cfg.Continuous { + return runContinuousMode(ctx, pushgateway, collectors, cfg) + } + + return nil +} + +// runContinuousMode pushes metrics continuously every 15 seconds. +func runContinuousMode(ctx context.Context, pushgateway ingester.PushgatewayIngester, collectors metrics.Collectors, cfg config.Config) error { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.") + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + log.Printf("Error pushing metrics: %v", err) + } else { + log.Printf("Successfully pushed metrics to Pushgateway") + } + } + } +} + +// runHistoricMode runs the historic ingestion mode. +func runHistoricMode(ctx context.Context, cfg config.Config) error { + remoteWrite := ingester.NewRemoteWriteIngester() + return remoteWrite.IngestHistoric(ctx, cfg.PrometheusURL, cfg.HoursAgo) +} + +// runBackfillMode runs the backfill ingestion mode. +func runBackfillMode(ctx context.Context, cfg config.Config) error { + remoteWrite := ingester.NewRemoteWriteIngester() + return remoteWrite.Backfill(ctx, cfg.PrometheusURL, cfg.StartHours, cfg.EndHours, cfg.Interval) +} + +// runAutoMode runs the auto ingestion mode. +func runAutoMode(ctx context.Context, cfg config.Config) error { + log.Printf("š¤ AUTO mode: Automatically detecting timestamp age and choosing ingestion method") + + samples, err := loadSamples(ctx, cfg) + if err != nil { + return err + } + + logFileSource(cfg) + + collectors := metrics.NewCollectors() + autoIngester := ingester.NewAutoIngester(collectors) + + return autoIngester.Ingest(ctx, samples, cfg) +} + +// loadSamples loads samples from file or stdin based on configuration. +func loadSamples(ctx context.Context, cfg config.Config) ([]metrics.Sample, error) { + if cfg.InputFile != "" { + return parser.ParseFile(ctx, cfg.InputFile, cfg.InputFormat) + } + return parser.ParseStdin(ctx, cfg.InputFormat) +} + +// logFileSource logs the source of the input data. +func logFileSource(cfg config.Config) { + if cfg.InputFile != "" { + log.Printf("š Reading metrics from: %s (format: %s)", cfg.InputFile, cfg.InputFormat) + } else { + log.Printf("š„ Reading metrics from stdin (format: %s)", cfg.InputFormat) + } +} diff --git a/f3s/prometheus-pusher/historic.go b/f3s/prometheus-pusher/historic.go deleted file mode 100644 index 66a2ae2..0000000 --- a/f3s/prometheus-pusher/historic.go +++ /dev/null @@ -1,217 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "io" - "log" - "math/rand" - "net/http" - "time" - - "github.com/golang/snappy" - "github.com/prometheus/prometheus/prompb" -) - -// GenerateHistoricMetrics generates metric samples for a specific time in the past -// hoursAgo: how many hours in the past to generate data for -func GenerateHistoricMetrics(hoursAgo int) []prompb.TimeSeries { - timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour).UnixMilli() - - var timeSeries []prompb.TimeSeries - - // Counter: app_requests_total - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_requests_total"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - }, - Samples: []prompb.Sample{ - {Value: float64(rand.Intn(100) + 1), Timestamp: timestamp}, - }, - }) - - // Gauge: app_active_connections - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_active_connections"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - }, - Samples: []prompb.Sample{ - {Value: float64(rand.Intn(100)), Timestamp: timestamp}, - }, - }) - - // Gauge: app_temperature_celsius - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_temperature_celsius"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - }, - Samples: []prompb.Sample{ - {Value: 15 + rand.Float64()*20, Timestamp: timestamp}, - }, - }) - - // Histogram buckets: app_request_duration_seconds - buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} - cumulativeCount := 0 - for _, bucket := range buckets { - cumulativeCount += rand.Intn(5) - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - {Name: "le", Value: fmt.Sprintf("%g", bucket)}, - }, - Samples: []prompb.Sample{ - {Value: float64(cumulativeCount), Timestamp: timestamp}, - }, - }) - } - - // +Inf bucket - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - {Name: "le", Value: "+Inf"}, - }, - Samples: []prompb.Sample{ - {Value: float64(cumulativeCount), Timestamp: timestamp}, - }, - }) - - // Histogram sum - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_request_duration_seconds_sum"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - }, - Samples: []prompb.Sample{ - {Value: rand.Float64() * 100, Timestamp: timestamp}, - }, - }) - - // Histogram count - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_request_duration_seconds_count"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - }, - Samples: []prompb.Sample{ - {Value: float64(cumulativeCount), Timestamp: timestamp}, - }, - }) - - // Labeled counters: app_jobs_processed_total - jobTypes := []string{"email", "report", "backup"} - statuses := []string{"success", "failed"} - for _, jobType := range jobTypes { - for _, status := range statuses { - timeSeries = append(timeSeries, prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: "__name__", Value: "app_jobs_processed_total"}, - {Name: "instance", Value: "example-app"}, - {Name: "job", Value: "historic_data"}, - {Name: "job_type", Value: jobType}, - {Name: "status", Value: status}, - }, - Samples: []prompb.Sample{ - {Value: float64(rand.Intn(20)), Timestamp: timestamp}, - }, - }) - } - } - - return timeSeries -} - -// PushHistoricData sends historic data to Prometheus via Remote Write API -// prometheusURL: URL of Prometheus remote write endpoint (e.g., "http://localhost:9090/api/v1/write") -// hoursAgo: how many hours in the past to generate data for -func PushHistoricData(prometheusURL string, hoursAgo int) error { - // Generate historic metrics - timeSeries := GenerateHistoricMetrics(hoursAgo) - - // Create write request - writeRequest := &prompb.WriteRequest{ - Timeseries: timeSeries, - } - - // Marshal to protobuf - data, err := writeRequest.Marshal() - if err != nil { - return fmt.Errorf("failed to marshal write request: %w", err) - } - - // Compress with snappy - compressed := snappy.Encode(nil, data) - - // Send HTTP POST request - req, err := http.NewRequest("POST", prometheusURL, bytes.NewReader(compressed)) - if err != nil { - return fmt.Errorf("failed to create HTTP request: %w", err) - } - - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("Content-Encoding", "snappy") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to send remote write request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) - } - - log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", - hoursAgo, time.Now().Add(-time.Duration(hoursAgo)*time.Hour).Format(time.RFC3339)) - - return nil -} - -// BackfillHistoricData backfills data for multiple time points -// prometheusURL: URL of Prometheus remote write endpoint -// startHoursAgo: how many hours ago to start backfilling -// endHoursAgo: how many hours ago to end backfilling -// intervalHours: interval between data points in hours -func BackfillHistoricData(prometheusURL string, startHoursAgo, endHoursAgo, intervalHours int) error { - log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", - startHoursAgo, endHoursAgo, intervalHours) - - successCount := 0 - errorCount := 0 - - for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { - if err := PushHistoricData(prometheusURL, hoursAgo); err != nil { - log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) - errorCount++ - } else { - successCount++ - } - - // Small delay to avoid overwhelming Prometheus - time.Sleep(100 * time.Millisecond) - } - - log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) - - if errorCount > 0 { - return fmt.Errorf("backfill completed with %d errors", errorCount) - } - - return nil -} diff --git a/f3s/prometheus-pusher/internal/config/config.go b/f3s/prometheus-pusher/internal/config/config.go new file mode 100644 index 0000000..9919e52 --- /dev/null +++ b/f3s/prometheus-pusher/internal/config/config.go @@ -0,0 +1,49 @@ +package config + +import "time" + +// Mode represents the ingestion mode +type Mode string + +const ( + ModeRealtime Mode = "realtime" + ModeHistoric Mode = "historic" + ModeBackfill Mode = "backfill" + ModeAuto Mode = "auto" +) + +// Config holds all configuration for the prometheus-pusher +type Config struct { + Mode Mode + PushgatewayURL string + PrometheusURL string + JobName string + Continuous bool + InputFile string + InputFormat string + HoursAgo int + StartHours int + EndHours int + Interval int +} + +// NewConfig creates a new Config with default values +func NewConfig() Config { + return Config{ + Mode: ModeRealtime, + PushgatewayURL: "http://localhost:9091", + PrometheusURL: "http://localhost:9090/api/v1/write", + JobName: "example_metrics_pusher", + InputFormat: "csv", + HoursAgo: 24, + StartHours: 48, + EndHours: 0, + Interval: 1, + } +} + +// AutoIngestThreshold is the age threshold for auto mode routing +const AutoIngestThreshold = 5 * time.Minute + +// DefaultHTTPTimeout is the default timeout for HTTP requests +const DefaultHTTPTimeout = 10 * time.Second diff --git a/f3s/prometheus-pusher/internal/ingester/auto.go b/f3s/prometheus-pusher/internal/ingester/auto.go new file mode 100644 index 0000000..c40754e --- /dev/null +++ b/f3s/prometheus-pusher/internal/ingester/auto.go @@ -0,0 +1,131 @@ +package ingester + +import ( + "context" + "fmt" + "log" + "time" + + "prometheus-pusher/internal/config" + "prometheus-pusher/internal/metrics" +) + +const ageThreshold = 5 * time.Minute + +// DetermineMode automatically determines which ingestion mode to use based on timestamp age. +// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway). +func DetermineMode(timestamp time.Time) config.Mode { + age := time.Since(timestamp) + if age > ageThreshold { + return config.ModeHistoric + } + return config.ModeRealtime +} + +// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters. +type AutoIngester struct { + pushgateway PushgatewayIngester + remoteWrite RemoteWriteIngester + collectors metrics.Collectors +} + +// NewAutoIngester creates a new auto ingester. +func NewAutoIngester(collectors metrics.Collectors) AutoIngester { + return AutoIngester{ + pushgateway: NewPushgatewayIngester(), + remoteWrite: NewRemoteWriteIngester(), + collectors: collectors, + } +} + +// Ingest automatically routes samples to appropriate ingestion method based on timestamp age. +func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + realtimeSamples, historicSamples := groupSamplesByMode(samples) + + logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples)) + + if len(realtimeSamples) > 0 { + if err := a.ingestRealtime(ctx, cfg); err != nil { + return fmt.Errorf("failed to ingest realtime samples: %w", err) + } + } + + if len(historicSamples) > 0 { + if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil { + return fmt.Errorf("failed to ingest historic samples: %w", err) + } + } + + log.Printf("\nš Auto-ingest complete!") + return nil +} + +// groupSamplesByMode separates samples into realtime and historic groups. +func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) { + realtimeSamples := make([]metrics.Sample, 0) + historicSamples := make([]metrics.Sample, 0) + + for _, sample := range samples { + if DetermineMode(sample.Timestamp) == config.ModeRealtime { + realtimeSamples = append(realtimeSamples, sample) + } else { + historicSamples = append(historicSamples, sample) + } + } + + return realtimeSamples, historicSamples +} + +// logIngestSummary logs the ingestion summary. +func logIngestSummary(total, realtime, historic int) { + log.Printf("š Auto-ingest summary:") + log.Printf(" Total samples: %d", total) + log.Printf(" Realtime samples (< 5min old): %d", realtime) + log.Printf(" Historic samples (> 5min old): %d", historic) +} + +// ingestRealtime ingests realtime samples via Pushgateway. +func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error { + log.Printf("\nš Ingesting REALTIME samples via Pushgateway...") + log.Printf(" Note: Pushgateway uses current timestamp (original timestamps ignored)") + + if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + return err + } + + log.Printf("ā
Successfully ingested realtime samples") + return nil +} + +// ingestHistoric ingests historic samples via Remote Write. +func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(samples)) + + for i, sample := range samples { + age := time.Since(sample.Timestamp) + log.Printf(" [%d/%d] %s (age: %s)", i+1, len(samples), sample.MetricName, formatDuration(age)) + } + + if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil { + return err + } + + log.Printf("ā
Successfully ingested %d historic samples", len(samples)) + return nil +} + +// formatDuration formats a duration in human-readable form. +func formatDuration(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%.0f seconds", d.Seconds()) + } else if d < time.Hour { + return fmt.Sprintf("%.0f minutes", d.Minutes()) + } else if d < 24*time.Hour { + return fmt.Sprintf("%.1f hours", d.Hours()) + } + return fmt.Sprintf("%.1f days", d.Hours()/24) +} diff --git a/f3s/prometheus-pusher/internal/ingester/pushgateway.go b/f3s/prometheus-pusher/internal/ingester/pushgateway.go new file mode 100644 index 0000000..7ae12c4 --- /dev/null +++ b/f3s/prometheus-pusher/internal/ingester/pushgateway.go @@ -0,0 +1,51 @@ +package ingester + +import ( + "context" + "fmt" + + "prometheus-pusher/internal/metrics" + + "github.com/prometheus/client_golang/prometheus/push" +) + +// PushgatewayIngester handles realtime metric ingestion via Pushgateway. +// Note: Pushgateway does not preserve custom timestamps - all metrics are +// timestamped with the current time when pushed. +type PushgatewayIngester struct{} + +// NewPushgatewayIngester creates a new Pushgateway ingester. +func NewPushgatewayIngester() PushgatewayIngester { + return PushgatewayIngester{} +} + +// Ingest pushes metrics to Pushgateway. +// The samples parameter is currently ignored because Pushgateway doesn't support +// custom metric values from samples - it uses registered Prometheus collectors. +// This ingests generated metrics using the provided collectors. +func (i PushgatewayIngester) Ingest(ctx context.Context, collectors metrics.Collectors, url, jobName string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Generate random metric values + collectors.Simulate() + + // Create pusher with all collectors + pusher := push.New(url, jobName). + Collector(collectors.RequestsTotal). + Collector(collectors.ActiveConnections). + Collector(collectors.TemperatureCelsius). + Collector(collectors.RequestDuration). + Collector(collectors.JobsProcessed). + Grouping("instance", "example-app") + + // Push metrics to Pushgateway + if err := pusher.Push(); err != nil { + return fmt.Errorf("failed to push to pushgateway: %w", err) + } + + return nil +} diff --git a/f3s/prometheus-pusher/internal/ingester/remotewrite.go b/f3s/prometheus-pusher/internal/ingester/remotewrite.go new file mode 100644 index 0000000..ee5bd65 --- /dev/null +++ b/f3s/prometheus-pusher/internal/ingester/remotewrite.go @@ -0,0 +1,252 @@ +package ingester + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "time" + + "prometheus-pusher/internal/metrics" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" +) + +const ( + requestTimeout = 10 * time.Second + backfillDelay = 100 * time.Millisecond +) + +// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API. +// This ingester preserves custom timestamps, making it suitable for importing historic data. +type RemoteWriteIngester struct { + client *http.Client +} + +// NewRemoteWriteIngester creates a new Remote Write ingester. +func NewRemoteWriteIngester() RemoteWriteIngester { + return RemoteWriteIngester{ + client: &http.Client{Timeout: requestTimeout}, + } +} + +// Ingest sends samples to Prometheus via Remote Write API. +func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + timeSeries := convertSamplesToTimeSeries(samples) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + return i.sendWriteRequest(ctx, url, writeRequest) +} + +// IngestHistoric generates and ingests historic metrics for a specific time in the past. +func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error { + timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour) + timeSeries := generateHistoricTimeSeries(timestamp) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { + return err + } + + log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", + hoursAgo, timestamp.Format(time.RFC3339)) + return nil +} + +// Backfill ingests historic metrics for a range of time points. +func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error { + log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", + startHoursAgo, endHoursAgo, intervalHours) + + successCount := 0 + errorCount := 0 + + for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { + if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil { + log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) + errorCount++ + } else { + successCount++ + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backfillDelay): + } + } + + log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) + + if errorCount > 0 { + return fmt.Errorf("backfill completed with %d errors", errorCount) + } + + return nil +} + +// sendWriteRequest sends a write request to Prometheus. +func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error { + data, err := writeRequest.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal write request: %w", err) + } + + compressed := snappy.Encode(nil, data) + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(compressed)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + resp, err := i.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send remote write request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format. +func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries { + timeSeries := make([]prompb.TimeSeries, 0, len(samples)) + + for _, sample := range samples { + labels := []prompb.Label{{Name: "__name__", Value: sample.MetricName}} + + for k, v := range sample.Labels { + labels = append(labels, prompb.Label{Name: k, Value: v}) + } + + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{ + Value: sample.Value, + Timestamp: sample.Timestamp.UnixMilli(), + }}, + }) + } + + return timeSeries +} + +// generateHistoricTimeSeries generates example time series for a specific timestamp. +func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries { + timestampMs := timestamp.UnixMilli() + var timeSeries []prompb.TimeSeries + + baseLabels := []prompb.Label{ + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + } + + timeSeries = append(timeSeries, createCounterSeries("app_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("app_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("app_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs)) + + timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...) + timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...) + + return timeSeries +} + +// createCounterSeries creates a counter metric time series. +func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + labels := []prompb.Label{{Name: "__name__", Value: name}} + labels = append(labels, baseLabels...) + + return prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}}, + } +} + +// createGaugeSeries creates a gauge metric time series. +func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + return createCounterSeries(name, baseLabels, value, timestamp) +} + +// generateHistogramSeries generates histogram bucket time series. +func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + var series []prompb.TimeSeries + + cumulativeCount := 0 + for _, bucket := range buckets { + cumulativeCount += rand.Intn(5) + labels := []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, + {Name: "le", Value: fmt.Sprintf("%g", bucket)}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + } + + infLabels := []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, + {Name: "le", Value: "+Inf"}, + } + infLabels = append(infLabels, baseLabels...) + series = append(series, prompb.TimeSeries{ + Labels: infLabels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + + series = append(series, createCounterSeries("app_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp)) + series = append(series, createCounterSeries("app_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp)) + + return series +} + +// generateLabeledCounterSeries generates labeled counter time series. +func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + jobTypes := []string{"email", "report", "backup"} + statuses := []string{"success", "failed"} + var series []prompb.TimeSeries + + for _, jobType := range jobTypes { + for _, status := range statuses { + labels := []prompb.Label{ + {Name: "__name__", Value: "app_jobs_processed_total"}, + {Name: "job_type", Value: jobType}, + {Name: "status", Value: status}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}}, + }) + } + } + + return series +} diff --git a/f3s/prometheus-pusher/internal/metrics/generator.go b/f3s/prometheus-pusher/internal/metrics/generator.go new file mode 100644 index 0000000..2b5c739 --- /dev/null +++ b/f3s/prometheus-pusher/internal/metrics/generator.go @@ -0,0 +1,83 @@ +package metrics + +import ( + "math/rand" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + minTemperature = 15.0 + maxTemperature = 35.0 + maxConnections = 100 + maxRequests = 10 +) + +var ( + jobTypes = []string{"email", "report", "backup"} + statuses = []string{"success", "failed"} +) + +// Collectors holds Prometheus metric collectors for realtime mode +type Collectors struct { + RequestsTotal prometheus.Counter + ActiveConnections prometheus.Gauge + TemperatureCelsius prometheus.Gauge + RequestDuration prometheus.Histogram + JobsProcessed *prometheus.CounterVec +} + +// NewCollectors creates new Prometheus metric collectors +func NewCollectors() Collectors { + return Collectors{ + RequestsTotal: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "app_requests_total", + Help: "Total number of requests processed", + }, + ), + ActiveConnections: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "app_active_connections", + Help: "Number of currently active connections", + }, + ), + TemperatureCelsius: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "app_temperature_celsius", + Help: "Current temperature in Celsius", + }, + ), + RequestDuration: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "app_request_duration_seconds", + Help: "Histogram of request duration in seconds", + Buckets: prometheus.DefBuckets, + }, + ), + JobsProcessed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "app_jobs_processed_total", + Help: "Total number of jobs processed by type", + }, + []string{"job_type", "status"}, + ), + } +} + +// Simulate generates random metric values for the collectors +func (c Collectors) Simulate() { + c.RequestsTotal.Add(float64(rand.Intn(maxRequests) + 1)) + c.ActiveConnections.Set(float64(rand.Intn(maxConnections))) + c.TemperatureCelsius.Set(minTemperature + rand.Float64()*(maxTemperature-minTemperature)) + + for i := 0; i < rand.Intn(5)+1; i++ { + duration := rand.Float64() * 2 + c.RequestDuration.Observe(duration) + } + + for _, jobType := range jobTypes { + status := statuses[rand.Intn(len(statuses))] + c.JobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5))) + } +} diff --git a/f3s/prometheus-pusher/internal/metrics/sample.go b/f3s/prometheus-pusher/internal/metrics/sample.go new file mode 100644 index 0000000..04360f5 --- /dev/null +++ b/f3s/prometheus-pusher/internal/metrics/sample.go @@ -0,0 +1,34 @@ +package metrics + +import "time" + +// Sample represents a single metric sample with timestamp +type Sample struct { + MetricName string + Labels map[string]string + Value float64 + Timestamp time.Time +} + +// NewSample creates a new Sample +func NewSample(name string, labels map[string]string, value float64, timestamp time.Time) Sample { + if labels == nil { + labels = make(map[string]string) + } + return Sample{ + MetricName: name, + Labels: labels, + Value: value, + Timestamp: timestamp, + } +} + +// Age returns how old the sample is +func (s Sample) Age() time.Duration { + return time.Since(s.Timestamp) +} + +// IsRecent returns true if the sample is recent enough for realtime ingestion +func (s Sample) IsRecent(threshold time.Duration) bool { + return s.Age() < threshold +} diff --git a/f3s/prometheus-pusher/internal/parser/csv.go b/f3s/prometheus-pusher/internal/parser/csv.go new file mode 100644 index 0000000..a59b6e6 --- /dev/null +++ b/f3s/prometheus-pusher/internal/parser/csv.go @@ -0,0 +1,101 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "strconv" + "strings" + "time" + + "prometheus-pusher/internal/metrics" +) + +// CSVParser parses metrics from CSV format +type CSVParser struct{} + +// NewCSVParser creates a new CSV parser +func NewCSVParser() *CSVParser { + return &CSVParser{} +} + +// Parse reads metrics from CSV format +// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms +func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + + lineNum := 0 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", lineNum, err) + } + lineNum++ + + if len(record) < 3 { + continue // Skip invalid records + } + + sample, err := p.parseRecord(record, lineNum) + if err != nil { + continue // Skip records with errors + } + + samples = append(samples, sample) + } + + return samples, nil +} + +func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) { + metricName := strings.TrimSpace(record[0]) + if metricName == "" { + return metrics.Sample{}, fmt.Errorf("empty metric name") + } + + labels := parseLabels(record[1]) + + value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64) + if err != nil { + return metrics.Sample{}, fmt.Errorf("invalid value: %w", err) + } + + timestamp := time.Now() + if len(record) > 3 && record[3] != "" { + timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64) + if err == nil { + timestamp = time.UnixMilli(timestampMs) + } + } + + return metrics.NewSample(metricName, labels, value, timestamp), nil +} + +func parseLabels(labelStr string) map[string]string { + labels := make(map[string]string) + if labelStr == "" { + return labels + } + + labelPairs := strings.Split(labelStr, ";") + for _, pair := range labelPairs { + parts := strings.SplitN(pair, "=", 2) + if len(parts) == 2 { + labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } + } + return labels +} diff --git a/f3s/prometheus-pusher/internal/parser/json.go b/f3s/prometheus-pusher/internal/parser/json.go new file mode 100644 index 0000000..b6b230c --- /dev/null +++ b/f3s/prometheus-pusher/internal/parser/json.go @@ -0,0 +1,62 @@ +package parser + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "prometheus-pusher/internal/metrics" +) + +// JSONParser parses metrics from JSON format +type JSONParser struct{} + +// NewJSONParser creates a new JSON parser +func NewJSONParser() *JSONParser { + return &JSONParser{} +} + +type jsonSample struct { + Metric string `json:"metric"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + TimestampMs int64 `json:"timestamp_ms,omitempty"` +} + +// Parse reads metrics from JSON format +func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var rawSamples []jsonSample + + decoder := json.NewDecoder(reader) + if err := decoder.Decode(&rawSamples); err != nil { + return nil, fmt.Errorf("failed to parse JSON: %w", err) + } + + samples := make([]metrics.Sample, 0, len(rawSamples)) + for _, raw := range rawSamples { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if raw.Metric == "" { + continue + } + + timestamp := time.Now() + if raw.TimestampMs > 0 { + timestamp = time.UnixMilli(raw.TimestampMs) + } + + if raw.Labels == nil { + raw.Labels = make(map[string]string) + } + + samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp)) + } + + return samples, nil +} diff --git a/f3s/prometheus-pusher/internal/parser/parser.go b/f3s/prometheus-pusher/internal/parser/parser.go new file mode 100644 index 0000000..6a7732d --- /dev/null +++ b/f3s/prometheus-pusher/internal/parser/parser.go @@ -0,0 +1,56 @@ +package parser + +import ( + "context" + "fmt" + "io" + "os" + + "prometheus-pusher/internal/metrics" +) + +// Parser defines the interface for metric parsers. +type Parser interface { + Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) +} + +// ParseFile parses metrics from a file. +func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + return parseWithFormat(ctx, file, format) +} + +// ParseStdin parses metrics from standard input. +func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) { + return parseWithFormat(ctx, os.Stdin, format) +} + +// parseWithFormat parses metrics using the specified format. +func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) { + var parser Parser + + switch format { + case "csv": + parser = NewCSVParser() + case "json": + parser = NewJSONParser() + default: + return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format) + } + + samples, err := parser.Parse(ctx, reader) + if err != nil { + return nil, fmt.Errorf("failed to parse metrics: %w", err) + } + + if len(samples) == 0 { + return nil, fmt.Errorf("no valid samples found") + } + + return samples, nil +} diff --git a/f3s/prometheus-pusher/internal/version/version.go b/f3s/prometheus-pusher/internal/version/version.go new file mode 100644 index 0000000..cdd04c6 --- /dev/null +++ b/f3s/prometheus-pusher/internal/version/version.go @@ -0,0 +1,4 @@ +package version + +// Version is the current version of prometheus-pusher +const Version = "0.0.0" diff --git a/f3s/prometheus-pusher/main.go b/f3s/prometheus-pusher/main.go deleted file mode 100644 index 7642df0..0000000 --- a/f3s/prometheus-pusher/main.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "flag" - "log" - "math/rand" - "time" -) - -func main() { - // Command-line flags - mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, or auto") - pushgatewayURL := flag.String("pushgateway", "http://localhost:9091", "Pushgateway URL for realtime mode") - prometheusURL := flag.String("prometheus", "http://localhost:9090/api/v1/write", "Prometheus remote write URL for historic mode") - hoursAgo := flag.Int("hours-ago", 24, "For historic mode: how many hours ago (single datapoint)") - startHours := flag.Int("start-hours", 48, "For backfill: start time in hours ago") - endHours := flag.Int("end-hours", 0, "For backfill: end time in hours ago") - interval := flag.Int("interval", 1, "For backfill: interval between datapoints in hours") - continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s") - jobName := flag.String("job", "example_metrics_pusher", "Job name for metrics") - - // Auto mode flags - inputFile := flag.String("file", "", "For auto mode: input file with metrics") - inputFormat := flag.String("format", "csv", "For auto mode: input format (csv or json)") - - flag.Parse() - - // Seed random number generator - rand.Seed(time.Now().UnixNano()) - - switch *mode { - case "realtime": - runRealtimeMode(*pushgatewayURL, *jobName, *continuous) - - case "historic": - if err := PushHistoricData(*prometheusURL, *hoursAgo); err != nil { - log.Fatalf("Failed to push historic data: %v", err) - } - - case "backfill": - if err := BackfillHistoricData(*prometheusURL, *startHours, *endHours, *interval); err != nil { - log.Fatalf("Failed to backfill data: %v", err) - } - - case "auto": - log.Printf("š¤ AUTO mode: Automatically detecting timestamp age and choosing ingestion method\n") - var err error - if *inputFile != "" { - err = AutoIngestFromFile(*inputFile, *inputFormat, *pushgatewayURL, *prometheusURL, *jobName) - } else { - err = AutoIngestFromStdin(*inputFormat, *pushgatewayURL, *prometheusURL, *jobName) - } - if err != nil { - log.Fatalf("Failed to auto-ingest: %v", err) - } - - default: - log.Fatalf("Unknown mode: %s (use realtime, historic, backfill, or auto)", *mode) - } -} - -func runRealtimeMode(pushgatewayURL, jobName string, continuous bool) { - log.Printf("Starting Prometheus metrics pusher in REALTIME mode") - log.Printf("Pushgateway URL: %s", pushgatewayURL) - log.Printf("Job name: %s", jobName) - - // Push immediately on start - simulateMetrics() - if err := pushMetrics(pushgatewayURL, jobName); err != nil { - log.Printf("Error pushing metrics: %v", err) - } else { - log.Printf("Successfully pushed metrics to Pushgateway") - } - - if continuous { - // Push metrics every 15 seconds - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - - log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.") - - for range ticker.C { - simulateMetrics() - if err := pushMetrics(pushgatewayURL, jobName); err != nil { - log.Printf("Error pushing metrics: %v", err) - } else { - log.Printf("Successfully pushed metrics to Pushgateway") - } - } - } -} diff --git a/f3s/prometheus-pusher/prometheus-pusher b/f3s/prometheus-pusher/prometheus-pusher Binary files differindex cbb807a..a431693 100755 --- a/f3s/prometheus-pusher/prometheus-pusher +++ b/f3s/prometheus-pusher/prometheus-pusher diff --git a/f3s/prometheus-pusher/realtime.go b/f3s/prometheus-pusher/realtime.go deleted file mode 100644 index 092089f..0000000 --- a/f3s/prometheus-pusher/realtime.go +++ /dev/null @@ -1,100 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/push" -) - -// Define metrics for realtime pushing -var ( - // Counter: Monotonically increasing value (e.g., total requests processed) - requestsTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "app_requests_total", - Help: "Total number of requests processed", - }, - ) - - // Gauge: Value that can go up or down (e.g., current temperature, active connections) - activeConnections = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "app_active_connections", - Help: "Number of currently active connections", - }, - ) - - // Gauge for temperature simulation - temperatureCelsius = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "app_temperature_celsius", - Help: "Current temperature in Celsius", - }, - ) - - // Histogram: Distribution of values (e.g., request duration) - requestDuration = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "app_request_duration_seconds", - Help: "Histogram of request duration in seconds", - Buckets: prometheus.DefBuckets, // Default buckets: .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10 - }, - ) - - // Counter with labels - jobsProcessed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "app_jobs_processed_total", - Help: "Total number of jobs processed by type", - }, - []string{"job_type", "status"}, - ) -) - -// simulateMetrics generates example metric data -func simulateMetrics() { - // Increment request counter - requestsTotal.Add(float64(rand.Intn(10) + 1)) - - // Update active connections (random number between 0-100) - activeConnections.Set(float64(rand.Intn(100))) - - // Simulate temperature (random between 15-35 Celsius) - temperatureCelsius.Set(15 + rand.Float64()*20) - - // Record some request durations - for i := 0; i < rand.Intn(5)+1; i++ { - duration := rand.Float64() * 2 // 0-2 seconds - requestDuration.Observe(duration) - } - - // Record job completions with labels - jobTypes := []string{"email", "report", "backup"} - statuses := []string{"success", "failed"} - - for _, jobType := range jobTypes { - status := statuses[rand.Intn(len(statuses))] - jobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5))) - } -} - -// pushMetrics pushes all metrics to the Pushgateway -func pushMetrics(pushgatewayURL, jobName string) error { - // Create a new pusher - pusher := push.New(pushgatewayURL, jobName). - Collector(requestsTotal). - Collector(activeConnections). - Collector(temperatureCelsius). - Collector(requestDuration). - Collector(jobsProcessed). - Grouping("instance", "example-app") - - // Push metrics to the Pushgateway - if err := pusher.Push(); err != nil { - return fmt.Errorf("failed to push metrics: %w", err) - } - - return nil -} |
