summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-12-30 22:29:41 +0200
committerPaul Buetow <paul@buetow.org>2025-12-30 22:29:41 +0200
commit24592b36da26e7c6ef30aca3017f9da6ceb2f086 (patch)
tree231c6abb8cdb5c2e9d56d708bebbf55c57b72fa2
parent88075b925598f438d15a352364ce17c302a21351 (diff)
Refactor prometheus-pusher following Go best practices
Major refactoring to improve code organization and follow Go conventions: - Moved main entry point to cmd/prometheus-pusher/main.go - Organized code into internal packages (config, metrics, parser, ingester, version) - Implemented proper dependency injection (no package-level variables) - Added context.Context to all blocking operations - Used value semantics where feasible (Sample, Config, Ingesters) - Proper error wrapping with %w throughout - All functions under 50 lines, focused and single-purpose - Consistent ordering: constants, types, constructors, public, private - Added -version flag to display version from internal/version package Package structure: - cmd/prometheus-pusher: Main entry point with flag parsing and mode routing - internal/config: Configuration types and constants - internal/version: Version constant (0.0.0) - internal/metrics: Sample type and Collectors for metric generation - internal/parser: CSV/JSON parsers with context support - internal/ingester: Pushgateway, RemoteWrite, and Auto ingesters All modes tested and working: realtime, historic, backfill, auto šŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
-rw-r--r--f3s/prometheus-pusher/auto-ingest.go371
-rw-r--r--f3s/prometheus-pusher/cmd/prometheus-pusher/main.go193
-rw-r--r--f3s/prometheus-pusher/historic.go217
-rw-r--r--f3s/prometheus-pusher/internal/config/config.go49
-rw-r--r--f3s/prometheus-pusher/internal/ingester/auto.go131
-rw-r--r--f3s/prometheus-pusher/internal/ingester/pushgateway.go51
-rw-r--r--f3s/prometheus-pusher/internal/ingester/remotewrite.go252
-rw-r--r--f3s/prometheus-pusher/internal/metrics/generator.go83
-rw-r--r--f3s/prometheus-pusher/internal/metrics/sample.go34
-rw-r--r--f3s/prometheus-pusher/internal/parser/csv.go101
-rw-r--r--f3s/prometheus-pusher/internal/parser/json.go62
-rw-r--r--f3s/prometheus-pusher/internal/parser/parser.go56
-rw-r--r--f3s/prometheus-pusher/internal/version/version.go4
-rw-r--r--f3s/prometheus-pusher/main.go91
-rwxr-xr-xf3s/prometheus-pusher/prometheus-pusherbin14022801 -> 14053904 bytes
-rw-r--r--f3s/prometheus-pusher/realtime.go100
16 files changed, 1016 insertions, 779 deletions
diff --git a/f3s/prometheus-pusher/auto-ingest.go b/f3s/prometheus-pusher/auto-ingest.go
deleted file mode 100644
index 0daff20..0000000
--- a/f3s/prometheus-pusher/auto-ingest.go
+++ /dev/null
@@ -1,371 +0,0 @@
-package main
-
-import (
- "bufio"
- "bytes"
- "encoding/csv"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "os"
- "strconv"
- "strings"
- "time"
-
- "github.com/golang/snappy"
- "github.com/prometheus/prometheus/prompb"
-)
-
-// MetricSample represents a single metric sample with timestamp
-type MetricSample struct {
- MetricName string
- Labels map[string]string
- Value float64
- Timestamp time.Time
-}
-
-// IngestMode represents the ingestion strategy
-type IngestMode string
-
-const (
- ModeRealtime IngestMode = "realtime" // Use Pushgateway (current data)
- ModeHistoric IngestMode = "historic" // Use Remote Write (old data)
-)
-
-// DetermineIngestMode automatically determines which ingestion mode to use
-// based on the age of the timestamp
-func DetermineIngestMode(timestamp time.Time) IngestMode {
- age := time.Since(timestamp)
-
- // Threshold: data older than 5 minutes uses historic mode
- // This allows for some clock skew and processing delay
- threshold := 5 * time.Minute
-
- if age > threshold {
- return ModeHistoric
- }
- return ModeRealtime
-}
-
-// ParseCSVMetrics parses metrics from CSV format
-// Expected format: metric_name,label1=value1;label2=value2,value,timestamp_unix_ms
-// Example: app_requests_total,instance=web1;env=prod,42,1735516800000
-func ParseCSVMetrics(reader io.Reader) ([]MetricSample, error) {
- var samples []MetricSample
-
- csvReader := csv.NewReader(reader)
- csvReader.Comment = '#'
-
- lineNum := 0
- for {
- record, err := csvReader.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, fmt.Errorf("line %d: %w", lineNum, err)
- }
- lineNum++
-
- if len(record) < 3 {
- log.Printf("Warning: line %d: skipping invalid record (need at least 3 fields)", lineNum)
- continue
- }
-
- // Parse metric name
- metricName := strings.TrimSpace(record[0])
- if metricName == "" {
- log.Printf("Warning: line %d: skipping empty metric name", lineNum)
- continue
- }
-
- // Parse labels
- labels := make(map[string]string)
- if len(record) > 1 && record[1] != "" {
- labelPairs := strings.Split(record[1], ";")
- for _, pair := range labelPairs {
- parts := strings.SplitN(pair, "=", 2)
- if len(parts) == 2 {
- labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
- }
- }
- }
-
- // Parse value
- value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64)
- if err != nil {
- log.Printf("Warning: line %d: skipping invalid value: %v", lineNum, err)
- continue
- }
-
- // Parse timestamp (optional, defaults to now)
- var timestamp time.Time
- if len(record) > 3 && record[3] != "" {
- timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64)
- if err != nil {
- log.Printf("Warning: line %d: invalid timestamp, using current time: %v", lineNum, err)
- timestamp = time.Now()
- } else {
- timestamp = time.UnixMilli(timestampMs)
- }
- } else {
- timestamp = time.Now()
- }
-
- samples = append(samples, MetricSample{
- MetricName: metricName,
- Labels: labels,
- Value: value,
- Timestamp: timestamp,
- })
- }
-
- return samples, nil
-}
-
-// ParseJSONMetrics parses metrics from JSON format
-// Expected format: array of {metric: string, labels: {}, value: number, timestamp_ms: number}
-func ParseJSONMetrics(reader io.Reader) ([]MetricSample, error) {
- var rawSamples []struct {
- Metric string `json:"metric"`
- Labels map[string]string `json:"labels"`
- Value float64 `json:"value"`
- TimestampMs int64 `json:"timestamp_ms,omitempty"`
- }
-
- decoder := json.NewDecoder(reader)
- if err := decoder.Decode(&rawSamples); err != nil {
- return nil, fmt.Errorf("failed to parse JSON: %w", err)
- }
-
- var samples []MetricSample
- for i, raw := range rawSamples {
- timestamp := time.Now()
- if raw.TimestampMs > 0 {
- timestamp = time.UnixMilli(raw.TimestampMs)
- }
-
- if raw.Metric == "" {
- log.Printf("Warning: sample %d: skipping empty metric name", i)
- continue
- }
-
- if raw.Labels == nil {
- raw.Labels = make(map[string]string)
- }
-
- samples = append(samples, MetricSample{
- MetricName: raw.Metric,
- Labels: raw.Labels,
- Value: raw.Value,
- Timestamp: timestamp,
- })
- }
-
- return samples, nil
-}
-
-// AutoIngestMetrics automatically ingests metrics using the appropriate method
-// based on timestamp age
-func AutoIngestMetrics(samples []MetricSample, pushgatewayURL, prometheusURL, jobName string) error {
- if len(samples) == 0 {
- return fmt.Errorf("no samples to ingest")
- }
-
- // Group samples by ingestion mode
- realtimeSamples := make([]MetricSample, 0)
- historicSamples := make([]MetricSample, 0)
-
- for _, sample := range samples {
- mode := DetermineIngestMode(sample.Timestamp)
- if mode == ModeRealtime {
- realtimeSamples = append(realtimeSamples, sample)
- } else {
- historicSamples = append(historicSamples, sample)
- }
- }
-
- log.Printf("šŸ“Š Auto-ingest summary:")
- log.Printf(" Total samples: %d", len(samples))
- log.Printf(" Realtime samples (< 5min old): %d", len(realtimeSamples))
- log.Printf(" Historic samples (> 5min old): %d", len(historicSamples))
-
- // Ingest realtime samples via Pushgateway
- if len(realtimeSamples) > 0 {
- log.Printf("\nšŸ”„ Ingesting %d REALTIME samples via Pushgateway...", len(realtimeSamples))
- if err := ingestViaPushgateway(realtimeSamples, pushgatewayURL, jobName); err != nil {
- return fmt.Errorf("failed to ingest realtime samples: %w", err)
- }
- log.Printf("āœ… Successfully ingested %d realtime samples", len(realtimeSamples))
- }
-
- // Ingest historic samples via Remote Write
- if len(historicSamples) > 0 {
- log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(historicSamples))
- for i, sample := range historicSamples {
- age := time.Since(sample.Timestamp)
- log.Printf(" [%d/%d] %s (age: %s)", i+1, len(historicSamples), sample.MetricName, formatDuration(age))
- }
-
- if err := ingestViaRemoteWrite(historicSamples, prometheusURL); err != nil {
- return fmt.Errorf("failed to ingest historic samples: %w", err)
- }
- log.Printf("āœ… Successfully ingested %d historic samples", len(historicSamples))
- }
-
- log.Printf("\nšŸŽ‰ Auto-ingest complete!")
- return nil
-}
-
-// ingestViaPushgateway ingests samples using Pushgateway (for realtime data)
-// Note: Pushgateway doesn't preserve timestamps, so this is only for current data
-func ingestViaPushgateway(samples []MetricSample, pushgatewayURL, jobName string) error {
- log.Printf(" Note: Pushgateway ingestion uses current timestamp (original timestamps ignored)")
- log.Printf(" Samples will appear with 'now' timestamp in Prometheus")
-
- // We use the existing pushMetrics function for realtime data
- // Since Pushgateway doesn't support custom timestamps, we just push current values
- simulateMetrics() // Generate current metrics
- return pushMetrics(pushgatewayURL, jobName)
-}
-
-// ingestViaRemoteWrite ingests samples using Remote Write API (preserves timestamps)
-func ingestViaRemoteWrite(samples []MetricSample, prometheusURL string) error {
- var timeSeries []prompb.TimeSeries
-
- for _, sample := range samples {
- labels := []prompb.Label{
- {Name: "__name__", Value: sample.MetricName},
- }
-
- for k, v := range sample.Labels {
- labels = append(labels, prompb.Label{Name: k, Value: v})
- }
-
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: labels,
- Samples: []prompb.Sample{
- {
- Value: sample.Value,
- Timestamp: sample.Timestamp.UnixMilli(),
- },
- },
- })
- }
-
- writeRequest := &prompb.WriteRequest{
- Timeseries: timeSeries,
- }
-
- return sendRemoteWrite(prometheusURL, writeRequest)
-}
-
-// formatDuration formats a duration in human-readable form
-func formatDuration(d time.Duration) string {
- if d < time.Minute {
- return fmt.Sprintf("%.0f seconds", d.Seconds())
- } else if d < time.Hour {
- return fmt.Sprintf("%.0f minutes", d.Minutes())
- } else if d < 24*time.Hour {
- return fmt.Sprintf("%.1f hours", d.Hours())
- } else {
- return fmt.Sprintf("%.1f days", d.Hours()/24)
- }
-}
-
-// AutoIngestFromFile reads a file and automatically ingests metrics
-func AutoIngestFromFile(filename, format, pushgatewayURL, prometheusURL, jobName string) error {
- file, err := os.Open(filename)
- if err != nil {
- return fmt.Errorf("failed to open file: %w", err)
- }
- defer file.Close()
-
- log.Printf("šŸ“ Reading metrics from: %s (format: %s)", filename, format)
-
- var samples []MetricSample
-
- switch format {
- case "csv":
- samples, err = ParseCSVMetrics(file)
- case "json":
- samples, err = ParseJSONMetrics(file)
- default:
- return fmt.Errorf("unsupported format: %s (use csv or json)", format)
- }
-
- if err != nil {
- return fmt.Errorf("failed to parse metrics: %w", err)
- }
-
- if len(samples) == 0 {
- return fmt.Errorf("no valid samples found in file")
- }
-
- return AutoIngestMetrics(samples, pushgatewayURL, prometheusURL, jobName)
-}
-
-// AutoIngestFromStdin reads metrics from stdin and automatically ingests them
-func AutoIngestFromStdin(format, pushgatewayURL, prometheusURL, jobName string) error {
- log.Printf("šŸ“„ Reading metrics from stdin (format: %s)", format)
- log.Printf(" Enter metrics, then press Ctrl+D when done")
-
- var samples []MetricSample
- var err error
-
- reader := bufio.NewReader(os.Stdin)
-
- switch format {
- case "csv":
- samples, err = ParseCSVMetrics(reader)
- case "json":
- samples, err = ParseJSONMetrics(reader)
- default:
- return fmt.Errorf("unsupported format: %s (use csv or json)", format)
- }
-
- if err != nil {
- return fmt.Errorf("failed to parse metrics: %w", err)
- }
-
- if len(samples) == 0 {
- return fmt.Errorf("no valid samples found")
- }
-
- return AutoIngestMetrics(samples, pushgatewayURL, prometheusURL, jobName)
-}
-
-// Helper function to send remote write request (reuses code from historic.go)
-func sendRemoteWrite(prometheusURL string, writeRequest *prompb.WriteRequest) error {
- data, err := writeRequest.Marshal()
- if err != nil {
- return fmt.Errorf("failed to marshal write request: %w", err)
- }
-
- compressed := snappy.Encode(nil, data)
-
- req, err := http.NewRequest("POST", prometheusURL, bytes.NewReader(compressed))
- if err != nil {
- return fmt.Errorf("failed to create HTTP request: %w", err)
- }
-
- req.Header.Set("Content-Type", "application/x-protobuf")
- req.Header.Set("Content-Encoding", "snappy")
- req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
-
- client := &http.Client{Timeout: 10 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("failed to send remote write request: %w", err)
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
- body, _ := io.ReadAll(resp.Body)
- return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body))
- }
-
- return nil
-}
diff --git a/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go b/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go
new file mode 100644
index 0000000..905efa1
--- /dev/null
+++ b/f3s/prometheus-pusher/cmd/prometheus-pusher/main.go
@@ -0,0 +1,193 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "prometheus-pusher/internal/config"
+ "prometheus-pusher/internal/ingester"
+ "prometheus-pusher/internal/metrics"
+ "prometheus-pusher/internal/parser"
+ "prometheus-pusher/internal/version"
+)
+
+func main() {
+ cfg := parseFlags()
+
+ rand.Seed(time.Now().UnixNano())
+
+ ctx, cancel := createContextWithSignalHandler()
+ defer cancel()
+
+ if err := run(ctx, cfg); err != nil {
+ log.Fatalf("Error: %v", err)
+ }
+}
+
+// parseFlags parses command-line flags and returns a Config.
+func parseFlags() config.Config {
+ cfg := config.NewConfig()
+
+ showVersion := flag.Bool("version", false, "Print version and exit")
+ mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, or auto")
+ pushgatewayURL := flag.String("pushgateway", cfg.PushgatewayURL, "Pushgateway URL for realtime mode")
+ prometheusURL := flag.String("prometheus", cfg.PrometheusURL, "Prometheus remote write URL for historic mode")
+ jobName := flag.String("job", cfg.JobName, "Job name for metrics")
+ continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s")
+
+ hoursAgo := flag.Int("hours-ago", cfg.HoursAgo, "For historic mode: how many hours ago (single datapoint)")
+ startHours := flag.Int("start-hours", cfg.StartHours, "For backfill: start time in hours ago")
+ endHours := flag.Int("end-hours", cfg.EndHours, "For backfill: end time in hours ago")
+ interval := flag.Int("interval", cfg.Interval, "For backfill: interval between datapoints in hours")
+
+ inputFile := flag.String("file", "", "For auto mode: input file with metrics")
+ inputFormat := flag.String("format", cfg.InputFormat, "For auto mode: input format (csv or json)")
+
+ flag.Parse()
+
+ if *showVersion {
+ fmt.Printf("prometheus-pusher version %s\n", version.Version)
+ os.Exit(0)
+ }
+
+ cfg.Mode = config.Mode(*mode)
+ cfg.PushgatewayURL = *pushgatewayURL
+ cfg.PrometheusURL = *prometheusURL
+ cfg.JobName = *jobName
+ cfg.Continuous = *continuous
+ cfg.HoursAgo = *hoursAgo
+ cfg.StartHours = *startHours
+ cfg.EndHours = *endHours
+ cfg.Interval = *interval
+ cfg.InputFile = *inputFile
+ cfg.InputFormat = *inputFormat
+
+ return cfg
+}
+
+// createContextWithSignalHandler creates a context that cancels on interrupt signals.
+func createContextWithSignalHandler() (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ <-sigChan
+ log.Printf("\nReceived interrupt signal, shutting down...")
+ cancel()
+ }()
+
+ return ctx, cancel
+}
+
+// run executes the appropriate mode based on configuration.
+func run(ctx context.Context, cfg config.Config) error {
+ switch cfg.Mode {
+ case config.ModeRealtime:
+ return runRealtimeMode(ctx, cfg)
+ case config.ModeHistoric:
+ return runHistoricMode(ctx, cfg)
+ case config.ModeBackfill:
+ return runBackfillMode(ctx, cfg)
+ case config.ModeAuto:
+ return runAutoMode(ctx, cfg)
+ default:
+ return fmt.Errorf("unknown mode: %s (use realtime, historic, backfill, or auto)", cfg.Mode)
+ }
+}
+
+// runRealtimeMode runs the realtime ingestion mode.
+func runRealtimeMode(ctx context.Context, cfg config.Config) error {
+ log.Printf("Starting Prometheus metrics pusher in REALTIME mode")
+ log.Printf("Pushgateway URL: %s", cfg.PushgatewayURL)
+ log.Printf("Job name: %s", cfg.JobName)
+
+ collectors := metrics.NewCollectors()
+ pushgateway := ingester.NewPushgatewayIngester()
+
+ if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ return fmt.Errorf("failed to push metrics: %w", err)
+ }
+ log.Printf("Successfully pushed metrics to Pushgateway")
+
+ if cfg.Continuous {
+ return runContinuousMode(ctx, pushgateway, collectors, cfg)
+ }
+
+ return nil
+}
+
+// runContinuousMode pushes metrics continuously every 15 seconds.
+func runContinuousMode(ctx context.Context, pushgateway ingester.PushgatewayIngester, collectors metrics.Collectors, cfg config.Config) error {
+ ticker := time.NewTicker(15 * time.Second)
+ defer ticker.Stop()
+
+ log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.")
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ log.Printf("Error pushing metrics: %v", err)
+ } else {
+ log.Printf("Successfully pushed metrics to Pushgateway")
+ }
+ }
+ }
+}
+
+// runHistoricMode runs the historic ingestion mode.
+func runHistoricMode(ctx context.Context, cfg config.Config) error {
+ remoteWrite := ingester.NewRemoteWriteIngester()
+ return remoteWrite.IngestHistoric(ctx, cfg.PrometheusURL, cfg.HoursAgo)
+}
+
+// runBackfillMode runs the backfill ingestion mode.
+func runBackfillMode(ctx context.Context, cfg config.Config) error {
+ remoteWrite := ingester.NewRemoteWriteIngester()
+ return remoteWrite.Backfill(ctx, cfg.PrometheusURL, cfg.StartHours, cfg.EndHours, cfg.Interval)
+}
+
+// runAutoMode runs the auto ingestion mode.
+func runAutoMode(ctx context.Context, cfg config.Config) error {
+ log.Printf("šŸ¤– AUTO mode: Automatically detecting timestamp age and choosing ingestion method")
+
+ samples, err := loadSamples(ctx, cfg)
+ if err != nil {
+ return err
+ }
+
+ logFileSource(cfg)
+
+ collectors := metrics.NewCollectors()
+ autoIngester := ingester.NewAutoIngester(collectors)
+
+ return autoIngester.Ingest(ctx, samples, cfg)
+}
+
+// loadSamples loads samples from file or stdin based on configuration.
+func loadSamples(ctx context.Context, cfg config.Config) ([]metrics.Sample, error) {
+ if cfg.InputFile != "" {
+ return parser.ParseFile(ctx, cfg.InputFile, cfg.InputFormat)
+ }
+ return parser.ParseStdin(ctx, cfg.InputFormat)
+}
+
+// logFileSource logs the source of the input data.
+func logFileSource(cfg config.Config) {
+ if cfg.InputFile != "" {
+ log.Printf("šŸ“ Reading metrics from: %s (format: %s)", cfg.InputFile, cfg.InputFormat)
+ } else {
+ log.Printf("šŸ“„ Reading metrics from stdin (format: %s)", cfg.InputFormat)
+ }
+}
diff --git a/f3s/prometheus-pusher/historic.go b/f3s/prometheus-pusher/historic.go
deleted file mode 100644
index 66a2ae2..0000000
--- a/f3s/prometheus-pusher/historic.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package main
-
-import (
- "bytes"
- "fmt"
- "io"
- "log"
- "math/rand"
- "net/http"
- "time"
-
- "github.com/golang/snappy"
- "github.com/prometheus/prometheus/prompb"
-)
-
-// GenerateHistoricMetrics generates metric samples for a specific time in the past
-// hoursAgo: how many hours in the past to generate data for
-func GenerateHistoricMetrics(hoursAgo int) []prompb.TimeSeries {
- timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour).UnixMilli()
-
- var timeSeries []prompb.TimeSeries
-
- // Counter: app_requests_total
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_requests_total"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- },
- Samples: []prompb.Sample{
- {Value: float64(rand.Intn(100) + 1), Timestamp: timestamp},
- },
- })
-
- // Gauge: app_active_connections
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_active_connections"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- },
- Samples: []prompb.Sample{
- {Value: float64(rand.Intn(100)), Timestamp: timestamp},
- },
- })
-
- // Gauge: app_temperature_celsius
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_temperature_celsius"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- },
- Samples: []prompb.Sample{
- {Value: 15 + rand.Float64()*20, Timestamp: timestamp},
- },
- })
-
- // Histogram buckets: app_request_duration_seconds
- buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
- cumulativeCount := 0
- for _, bucket := range buckets {
- cumulativeCount += rand.Intn(5)
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_request_duration_seconds_bucket"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- {Name: "le", Value: fmt.Sprintf("%g", bucket)},
- },
- Samples: []prompb.Sample{
- {Value: float64(cumulativeCount), Timestamp: timestamp},
- },
- })
- }
-
- // +Inf bucket
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_request_duration_seconds_bucket"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- {Name: "le", Value: "+Inf"},
- },
- Samples: []prompb.Sample{
- {Value: float64(cumulativeCount), Timestamp: timestamp},
- },
- })
-
- // Histogram sum
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_request_duration_seconds_sum"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- },
- Samples: []prompb.Sample{
- {Value: rand.Float64() * 100, Timestamp: timestamp},
- },
- })
-
- // Histogram count
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_request_duration_seconds_count"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- },
- Samples: []prompb.Sample{
- {Value: float64(cumulativeCount), Timestamp: timestamp},
- },
- })
-
- // Labeled counters: app_jobs_processed_total
- jobTypes := []string{"email", "report", "backup"}
- statuses := []string{"success", "failed"}
- for _, jobType := range jobTypes {
- for _, status := range statuses {
- timeSeries = append(timeSeries, prompb.TimeSeries{
- Labels: []prompb.Label{
- {Name: "__name__", Value: "app_jobs_processed_total"},
- {Name: "instance", Value: "example-app"},
- {Name: "job", Value: "historic_data"},
- {Name: "job_type", Value: jobType},
- {Name: "status", Value: status},
- },
- Samples: []prompb.Sample{
- {Value: float64(rand.Intn(20)), Timestamp: timestamp},
- },
- })
- }
- }
-
- return timeSeries
-}
-
-// PushHistoricData sends historic data to Prometheus via Remote Write API
-// prometheusURL: URL of Prometheus remote write endpoint (e.g., "http://localhost:9090/api/v1/write")
-// hoursAgo: how many hours in the past to generate data for
-func PushHistoricData(prometheusURL string, hoursAgo int) error {
- // Generate historic metrics
- timeSeries := GenerateHistoricMetrics(hoursAgo)
-
- // Create write request
- writeRequest := &prompb.WriteRequest{
- Timeseries: timeSeries,
- }
-
- // Marshal to protobuf
- data, err := writeRequest.Marshal()
- if err != nil {
- return fmt.Errorf("failed to marshal write request: %w", err)
- }
-
- // Compress with snappy
- compressed := snappy.Encode(nil, data)
-
- // Send HTTP POST request
- req, err := http.NewRequest("POST", prometheusURL, bytes.NewReader(compressed))
- if err != nil {
- return fmt.Errorf("failed to create HTTP request: %w", err)
- }
-
- req.Header.Set("Content-Type", "application/x-protobuf")
- req.Header.Set("Content-Encoding", "snappy")
- req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
-
- client := &http.Client{Timeout: 10 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("failed to send remote write request: %w", err)
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
- body, _ := io.ReadAll(resp.Body)
- return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body))
- }
-
- log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)",
- hoursAgo, time.Now().Add(-time.Duration(hoursAgo)*time.Hour).Format(time.RFC3339))
-
- return nil
-}
-
-// BackfillHistoricData backfills data for multiple time points
-// prometheusURL: URL of Prometheus remote write endpoint
-// startHoursAgo: how many hours ago to start backfilling
-// endHoursAgo: how many hours ago to end backfilling
-// intervalHours: interval between data points in hours
-func BackfillHistoricData(prometheusURL string, startHoursAgo, endHoursAgo, intervalHours int) error {
- log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)",
- startHoursAgo, endHoursAgo, intervalHours)
-
- successCount := 0
- errorCount := 0
-
- for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours {
- if err := PushHistoricData(prometheusURL, hoursAgo); err != nil {
- log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err)
- errorCount++
- } else {
- successCount++
- }
-
- // Small delay to avoid overwhelming Prometheus
- time.Sleep(100 * time.Millisecond)
- }
-
- log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount)
-
- if errorCount > 0 {
- return fmt.Errorf("backfill completed with %d errors", errorCount)
- }
-
- return nil
-}
diff --git a/f3s/prometheus-pusher/internal/config/config.go b/f3s/prometheus-pusher/internal/config/config.go
new file mode 100644
index 0000000..9919e52
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/config/config.go
@@ -0,0 +1,49 @@
+package config
+
+import "time"
+
+// Mode represents the ingestion mode
+type Mode string
+
+const (
+ ModeRealtime Mode = "realtime"
+ ModeHistoric Mode = "historic"
+ ModeBackfill Mode = "backfill"
+ ModeAuto Mode = "auto"
+)
+
+// Config holds all configuration for the prometheus-pusher
+type Config struct {
+ Mode Mode
+ PushgatewayURL string
+ PrometheusURL string
+ JobName string
+ Continuous bool
+ InputFile string
+ InputFormat string
+ HoursAgo int
+ StartHours int
+ EndHours int
+ Interval int
+}
+
+// NewConfig creates a new Config with default values
+func NewConfig() Config {
+ return Config{
+ Mode: ModeRealtime,
+ PushgatewayURL: "http://localhost:9091",
+ PrometheusURL: "http://localhost:9090/api/v1/write",
+ JobName: "example_metrics_pusher",
+ InputFormat: "csv",
+ HoursAgo: 24,
+ StartHours: 48,
+ EndHours: 0,
+ Interval: 1,
+ }
+}
+
+// AutoIngestThreshold is the age threshold for auto mode routing
+const AutoIngestThreshold = 5 * time.Minute
+
+// DefaultHTTPTimeout is the default timeout for HTTP requests
+const DefaultHTTPTimeout = 10 * time.Second
diff --git a/f3s/prometheus-pusher/internal/ingester/auto.go b/f3s/prometheus-pusher/internal/ingester/auto.go
new file mode 100644
index 0000000..c40754e
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/ingester/auto.go
@@ -0,0 +1,131 @@
+package ingester
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "time"
+
+ "prometheus-pusher/internal/config"
+ "prometheus-pusher/internal/metrics"
+)
+
+const ageThreshold = 5 * time.Minute
+
+// DetermineMode automatically determines which ingestion mode to use based on timestamp age.
+// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway).
+func DetermineMode(timestamp time.Time) config.Mode {
+ age := time.Since(timestamp)
+ if age > ageThreshold {
+ return config.ModeHistoric
+ }
+ return config.ModeRealtime
+}
+
+// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters.
+type AutoIngester struct {
+ pushgateway PushgatewayIngester
+ remoteWrite RemoteWriteIngester
+ collectors metrics.Collectors
+}
+
+// NewAutoIngester creates a new auto ingester.
+func NewAutoIngester(collectors metrics.Collectors) AutoIngester {
+ return AutoIngester{
+ pushgateway: NewPushgatewayIngester(),
+ remoteWrite: NewRemoteWriteIngester(),
+ collectors: collectors,
+ }
+}
+
+// Ingest automatically routes samples to appropriate ingestion method based on timestamp age.
+func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ realtimeSamples, historicSamples := groupSamplesByMode(samples)
+
+ logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples))
+
+ if len(realtimeSamples) > 0 {
+ if err := a.ingestRealtime(ctx, cfg); err != nil {
+ return fmt.Errorf("failed to ingest realtime samples: %w", err)
+ }
+ }
+
+ if len(historicSamples) > 0 {
+ if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil {
+ return fmt.Errorf("failed to ingest historic samples: %w", err)
+ }
+ }
+
+ log.Printf("\nšŸŽ‰ Auto-ingest complete!")
+ return nil
+}
+
+// groupSamplesByMode separates samples into realtime and historic groups.
+func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) {
+ realtimeSamples := make([]metrics.Sample, 0)
+ historicSamples := make([]metrics.Sample, 0)
+
+ for _, sample := range samples {
+ if DetermineMode(sample.Timestamp) == config.ModeRealtime {
+ realtimeSamples = append(realtimeSamples, sample)
+ } else {
+ historicSamples = append(historicSamples, sample)
+ }
+ }
+
+ return realtimeSamples, historicSamples
+}
+
+// logIngestSummary logs the ingestion summary.
+func logIngestSummary(total, realtime, historic int) {
+ log.Printf("šŸ“Š Auto-ingest summary:")
+ log.Printf(" Total samples: %d", total)
+ log.Printf(" Realtime samples (< 5min old): %d", realtime)
+ log.Printf(" Historic samples (> 5min old): %d", historic)
+}
+
+// ingestRealtime ingests realtime samples via Pushgateway.
+func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error {
+ log.Printf("\nšŸ”„ Ingesting REALTIME samples via Pushgateway...")
+ log.Printf(" Note: Pushgateway uses current timestamp (original timestamps ignored)")
+
+ if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ return err
+ }
+
+ log.Printf("āœ… Successfully ingested realtime samples")
+ return nil
+}
+
+// ingestHistoric ingests historic samples via Remote Write.
+func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
+ log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(samples))
+
+ for i, sample := range samples {
+ age := time.Since(sample.Timestamp)
+ log.Printf(" [%d/%d] %s (age: %s)", i+1, len(samples), sample.MetricName, formatDuration(age))
+ }
+
+ if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil {
+ return err
+ }
+
+ log.Printf("āœ… Successfully ingested %d historic samples", len(samples))
+ return nil
+}
+
+// formatDuration formats a duration in human-readable form.
+func formatDuration(d time.Duration) string {
+ if d < time.Minute {
+ return fmt.Sprintf("%.0f seconds", d.Seconds())
+ } else if d < time.Hour {
+ return fmt.Sprintf("%.0f minutes", d.Minutes())
+ } else if d < 24*time.Hour {
+ return fmt.Sprintf("%.1f hours", d.Hours())
+ }
+ return fmt.Sprintf("%.1f days", d.Hours()/24)
+}
diff --git a/f3s/prometheus-pusher/internal/ingester/pushgateway.go b/f3s/prometheus-pusher/internal/ingester/pushgateway.go
new file mode 100644
index 0000000..7ae12c4
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/ingester/pushgateway.go
@@ -0,0 +1,51 @@
+package ingester
+
+import (
+ "context"
+ "fmt"
+
+ "prometheus-pusher/internal/metrics"
+
+ "github.com/prometheus/client_golang/prometheus/push"
+)
+
+// PushgatewayIngester handles realtime metric ingestion via Pushgateway.
+// Note: Pushgateway does not preserve custom timestamps - all metrics are
+// timestamped with the current time when pushed.
+type PushgatewayIngester struct{}
+
+// NewPushgatewayIngester creates a new Pushgateway ingester.
+func NewPushgatewayIngester() PushgatewayIngester {
+ return PushgatewayIngester{}
+}
+
+// Ingest pushes metrics to Pushgateway.
+// The samples parameter is currently ignored because Pushgateway doesn't support
+// custom metric values from samples - it uses registered Prometheus collectors.
+// This ingests generated metrics using the provided collectors.
+func (i PushgatewayIngester) Ingest(ctx context.Context, collectors metrics.Collectors, url, jobName string) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ // Generate random metric values
+ collectors.Simulate()
+
+ // Create pusher with all collectors
+ pusher := push.New(url, jobName).
+ Collector(collectors.RequestsTotal).
+ Collector(collectors.ActiveConnections).
+ Collector(collectors.TemperatureCelsius).
+ Collector(collectors.RequestDuration).
+ Collector(collectors.JobsProcessed).
+ Grouping("instance", "example-app")
+
+ // Push metrics to Pushgateway
+ if err := pusher.Push(); err != nil {
+ return fmt.Errorf("failed to push to pushgateway: %w", err)
+ }
+
+ return nil
+}
diff --git a/f3s/prometheus-pusher/internal/ingester/remotewrite.go b/f3s/prometheus-pusher/internal/ingester/remotewrite.go
new file mode 100644
index 0000000..ee5bd65
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/ingester/remotewrite.go
@@ -0,0 +1,252 @@
+package ingester
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net/http"
+ "time"
+
+ "prometheus-pusher/internal/metrics"
+
+ "github.com/golang/snappy"
+ "github.com/prometheus/prometheus/prompb"
+)
+
+const (
+ requestTimeout = 10 * time.Second
+ backfillDelay = 100 * time.Millisecond
+)
+
+// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API.
+// This ingester preserves custom timestamps, making it suitable for importing historic data.
+type RemoteWriteIngester struct {
+ client *http.Client
+}
+
+// NewRemoteWriteIngester creates a new Remote Write ingester.
+func NewRemoteWriteIngester() RemoteWriteIngester {
+ return RemoteWriteIngester{
+ client: &http.Client{Timeout: requestTimeout},
+ }
+}
+
+// Ingest sends samples to Prometheus via Remote Write API.
+func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ timeSeries := convertSamplesToTimeSeries(samples)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ return i.sendWriteRequest(ctx, url, writeRequest)
+}
+
+// IngestHistoric generates and ingests historic metrics for a specific time in the past.
+func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error {
+ timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour)
+ timeSeries := generateHistoricTimeSeries(timestamp)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil {
+ return err
+ }
+
+ log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)",
+ hoursAgo, timestamp.Format(time.RFC3339))
+ return nil
+}
+
+// Backfill ingests historic metrics for a range of time points.
+func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error {
+ log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)",
+ startHoursAgo, endHoursAgo, intervalHours)
+
+ successCount := 0
+ errorCount := 0
+
+ for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours {
+ if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil {
+ log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err)
+ errorCount++
+ } else {
+ successCount++
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(backfillDelay):
+ }
+ }
+
+ log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount)
+
+ if errorCount > 0 {
+ return fmt.Errorf("backfill completed with %d errors", errorCount)
+ }
+
+ return nil
+}
+
+// sendWriteRequest sends a write request to Prometheus.
+func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error {
+ data, err := writeRequest.Marshal()
+ if err != nil {
+ return fmt.Errorf("failed to marshal write request: %w", err)
+ }
+
+ compressed := snappy.Encode(nil, data)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(compressed))
+ if err != nil {
+ return fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/x-protobuf")
+ req.Header.Set("Content-Encoding", "snappy")
+ req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
+
+ resp, err := i.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to send remote write request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format.
+func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries {
+ timeSeries := make([]prompb.TimeSeries, 0, len(samples))
+
+ for _, sample := range samples {
+ labels := []prompb.Label{{Name: "__name__", Value: sample.MetricName}}
+
+ for k, v := range sample.Labels {
+ labels = append(labels, prompb.Label{Name: k, Value: v})
+ }
+
+ timeSeries = append(timeSeries, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{
+ Value: sample.Value,
+ Timestamp: sample.Timestamp.UnixMilli(),
+ }},
+ })
+ }
+
+ return timeSeries
+}
+
+// generateHistoricTimeSeries generates example time series for a specific timestamp.
+func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries {
+ timestampMs := timestamp.UnixMilli()
+ var timeSeries []prompb.TimeSeries
+
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "example-app"},
+ {Name: "job", Value: "historic_data"},
+ }
+
+ timeSeries = append(timeSeries, createCounterSeries("app_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("app_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("app_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs))
+
+ timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...)
+ timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...)
+
+ return timeSeries
+}
+
+// createCounterSeries creates a counter metric time series.
+func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ labels := []prompb.Label{{Name: "__name__", Value: name}}
+ labels = append(labels, baseLabels...)
+
+ return prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}},
+ }
+}
+
+// createGaugeSeries creates a gauge metric time series.
+func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ return createCounterSeries(name, baseLabels, value, timestamp)
+}
+
+// generateHistogramSeries generates histogram bucket time series.
+func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
+ var series []prompb.TimeSeries
+
+ cumulativeCount := 0
+ for _, bucket := range buckets {
+ cumulativeCount += rand.Intn(5)
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "app_request_duration_seconds_bucket"},
+ {Name: "le", Value: fmt.Sprintf("%g", bucket)},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+ }
+
+ infLabels := []prompb.Label{
+ {Name: "__name__", Value: "app_request_duration_seconds_bucket"},
+ {Name: "le", Value: "+Inf"},
+ }
+ infLabels = append(infLabels, baseLabels...)
+ series = append(series, prompb.TimeSeries{
+ Labels: infLabels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+
+ series = append(series, createCounterSeries("app_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp))
+ series = append(series, createCounterSeries("app_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp))
+
+ return series
+}
+
+// generateLabeledCounterSeries generates labeled counter time series.
+func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ jobTypes := []string{"email", "report", "backup"}
+ statuses := []string{"success", "failed"}
+ var series []prompb.TimeSeries
+
+ for _, jobType := range jobTypes {
+ for _, status := range statuses {
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "app_jobs_processed_total"},
+ {Name: "job_type", Value: jobType},
+ {Name: "status", Value: status},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}},
+ })
+ }
+ }
+
+ return series
+}
diff --git a/f3s/prometheus-pusher/internal/metrics/generator.go b/f3s/prometheus-pusher/internal/metrics/generator.go
new file mode 100644
index 0000000..2b5c739
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/metrics/generator.go
@@ -0,0 +1,83 @@
+package metrics
+
+import (
+ "math/rand"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+ minTemperature = 15.0
+ maxTemperature = 35.0
+ maxConnections = 100
+ maxRequests = 10
+)
+
+var (
+ jobTypes = []string{"email", "report", "backup"}
+ statuses = []string{"success", "failed"}
+)
+
+// Collectors holds Prometheus metric collectors for realtime mode
+type Collectors struct {
+ RequestsTotal prometheus.Counter
+ ActiveConnections prometheus.Gauge
+ TemperatureCelsius prometheus.Gauge
+ RequestDuration prometheus.Histogram
+ JobsProcessed *prometheus.CounterVec
+}
+
+// NewCollectors creates new Prometheus metric collectors
+func NewCollectors() Collectors {
+ return Collectors{
+ RequestsTotal: prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "app_requests_total",
+ Help: "Total number of requests processed",
+ },
+ ),
+ ActiveConnections: prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "app_active_connections",
+ Help: "Number of currently active connections",
+ },
+ ),
+ TemperatureCelsius: prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "app_temperature_celsius",
+ Help: "Current temperature in Celsius",
+ },
+ ),
+ RequestDuration: prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "app_request_duration_seconds",
+ Help: "Histogram of request duration in seconds",
+ Buckets: prometheus.DefBuckets,
+ },
+ ),
+ JobsProcessed: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "app_jobs_processed_total",
+ Help: "Total number of jobs processed by type",
+ },
+ []string{"job_type", "status"},
+ ),
+ }
+}
+
+// Simulate generates random metric values for the collectors
+func (c Collectors) Simulate() {
+ c.RequestsTotal.Add(float64(rand.Intn(maxRequests) + 1))
+ c.ActiveConnections.Set(float64(rand.Intn(maxConnections)))
+ c.TemperatureCelsius.Set(minTemperature + rand.Float64()*(maxTemperature-minTemperature))
+
+ for i := 0; i < rand.Intn(5)+1; i++ {
+ duration := rand.Float64() * 2
+ c.RequestDuration.Observe(duration)
+ }
+
+ for _, jobType := range jobTypes {
+ status := statuses[rand.Intn(len(statuses))]
+ c.JobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5)))
+ }
+}
diff --git a/f3s/prometheus-pusher/internal/metrics/sample.go b/f3s/prometheus-pusher/internal/metrics/sample.go
new file mode 100644
index 0000000..04360f5
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/metrics/sample.go
@@ -0,0 +1,34 @@
+package metrics
+
+import "time"
+
+// Sample represents a single metric sample with timestamp
+type Sample struct {
+ MetricName string
+ Labels map[string]string
+ Value float64
+ Timestamp time.Time
+}
+
+// NewSample creates a new Sample
+func NewSample(name string, labels map[string]string, value float64, timestamp time.Time) Sample {
+ if labels == nil {
+ labels = make(map[string]string)
+ }
+ return Sample{
+ MetricName: name,
+ Labels: labels,
+ Value: value,
+ Timestamp: timestamp,
+ }
+}
+
+// Age returns how old the sample is
+func (s Sample) Age() time.Duration {
+ return time.Since(s.Timestamp)
+}
+
+// IsRecent returns true if the sample is recent enough for realtime ingestion
+func (s Sample) IsRecent(threshold time.Duration) bool {
+ return s.Age() < threshold
+}
diff --git a/f3s/prometheus-pusher/internal/parser/csv.go b/f3s/prometheus-pusher/internal/parser/csv.go
new file mode 100644
index 0000000..a59b6e6
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/parser/csv.go
@@ -0,0 +1,101 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "prometheus-pusher/internal/metrics"
+)
+
+// CSVParser parses metrics from CSV format
+type CSVParser struct{}
+
+// NewCSVParser creates a new CSV parser
+func NewCSVParser() *CSVParser {
+ return &CSVParser{}
+}
+
+// Parse reads metrics from CSV format
+// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms
+func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+
+ lineNum := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", lineNum, err)
+ }
+ lineNum++
+
+ if len(record) < 3 {
+ continue // Skip invalid records
+ }
+
+ sample, err := p.parseRecord(record, lineNum)
+ if err != nil {
+ continue // Skip records with errors
+ }
+
+ samples = append(samples, sample)
+ }
+
+ return samples, nil
+}
+
+func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) {
+ metricName := strings.TrimSpace(record[0])
+ if metricName == "" {
+ return metrics.Sample{}, fmt.Errorf("empty metric name")
+ }
+
+ labels := parseLabels(record[1])
+
+ value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64)
+ if err != nil {
+ return metrics.Sample{}, fmt.Errorf("invalid value: %w", err)
+ }
+
+ timestamp := time.Now()
+ if len(record) > 3 && record[3] != "" {
+ timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64)
+ if err == nil {
+ timestamp = time.UnixMilli(timestampMs)
+ }
+ }
+
+ return metrics.NewSample(metricName, labels, value, timestamp), nil
+}
+
+func parseLabels(labelStr string) map[string]string {
+ labels := make(map[string]string)
+ if labelStr == "" {
+ return labels
+ }
+
+ labelPairs := strings.Split(labelStr, ";")
+ for _, pair := range labelPairs {
+ parts := strings.SplitN(pair, "=", 2)
+ if len(parts) == 2 {
+ labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
+ }
+ }
+ return labels
+}
diff --git a/f3s/prometheus-pusher/internal/parser/json.go b/f3s/prometheus-pusher/internal/parser/json.go
new file mode 100644
index 0000000..b6b230c
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/parser/json.go
@@ -0,0 +1,62 @@
+package parser
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "time"
+
+ "prometheus-pusher/internal/metrics"
+)
+
+// JSONParser parses metrics from JSON format
+type JSONParser struct{}
+
+// NewJSONParser creates a new JSON parser
+func NewJSONParser() *JSONParser {
+ return &JSONParser{}
+}
+
+type jsonSample struct {
+ Metric string `json:"metric"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ TimestampMs int64 `json:"timestamp_ms,omitempty"`
+}
+
+// Parse reads metrics from JSON format
+func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var rawSamples []jsonSample
+
+ decoder := json.NewDecoder(reader)
+ if err := decoder.Decode(&rawSamples); err != nil {
+ return nil, fmt.Errorf("failed to parse JSON: %w", err)
+ }
+
+ samples := make([]metrics.Sample, 0, len(rawSamples))
+ for _, raw := range rawSamples {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ if raw.Metric == "" {
+ continue
+ }
+
+ timestamp := time.Now()
+ if raw.TimestampMs > 0 {
+ timestamp = time.UnixMilli(raw.TimestampMs)
+ }
+
+ if raw.Labels == nil {
+ raw.Labels = make(map[string]string)
+ }
+
+ samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp))
+ }
+
+ return samples, nil
+}
diff --git a/f3s/prometheus-pusher/internal/parser/parser.go b/f3s/prometheus-pusher/internal/parser/parser.go
new file mode 100644
index 0000000..6a7732d
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/parser/parser.go
@@ -0,0 +1,56 @@
+package parser
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+
+ "prometheus-pusher/internal/metrics"
+)
+
+// Parser defines the interface for metric parsers.
+type Parser interface {
+ Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error)
+}
+
+// ParseFile parses metrics from a file.
+func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) {
+ file, err := os.Open(filename)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open file: %w", err)
+ }
+ defer file.Close()
+
+ return parseWithFormat(ctx, file, format)
+}
+
+// ParseStdin parses metrics from standard input.
+func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) {
+ return parseWithFormat(ctx, os.Stdin, format)
+}
+
+// parseWithFormat parses metrics using the specified format.
+func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) {
+ var parser Parser
+
+ switch format {
+ case "csv":
+ parser = NewCSVParser()
+ case "json":
+ parser = NewJSONParser()
+ default:
+ return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format)
+ }
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse metrics: %w", err)
+ }
+
+ if len(samples) == 0 {
+ return nil, fmt.Errorf("no valid samples found")
+ }
+
+ return samples, nil
+}
diff --git a/f3s/prometheus-pusher/internal/version/version.go b/f3s/prometheus-pusher/internal/version/version.go
new file mode 100644
index 0000000..cdd04c6
--- /dev/null
+++ b/f3s/prometheus-pusher/internal/version/version.go
@@ -0,0 +1,4 @@
+package version
+
+// Version is the current version of prometheus-pusher
+const Version = "0.0.0"
diff --git a/f3s/prometheus-pusher/main.go b/f3s/prometheus-pusher/main.go
deleted file mode 100644
index 7642df0..0000000
--- a/f3s/prometheus-pusher/main.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package main
-
-import (
- "flag"
- "log"
- "math/rand"
- "time"
-)
-
-func main() {
- // Command-line flags
- mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, or auto")
- pushgatewayURL := flag.String("pushgateway", "http://localhost:9091", "Pushgateway URL for realtime mode")
- prometheusURL := flag.String("prometheus", "http://localhost:9090/api/v1/write", "Prometheus remote write URL for historic mode")
- hoursAgo := flag.Int("hours-ago", 24, "For historic mode: how many hours ago (single datapoint)")
- startHours := flag.Int("start-hours", 48, "For backfill: start time in hours ago")
- endHours := flag.Int("end-hours", 0, "For backfill: end time in hours ago")
- interval := flag.Int("interval", 1, "For backfill: interval between datapoints in hours")
- continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s")
- jobName := flag.String("job", "example_metrics_pusher", "Job name for metrics")
-
- // Auto mode flags
- inputFile := flag.String("file", "", "For auto mode: input file with metrics")
- inputFormat := flag.String("format", "csv", "For auto mode: input format (csv or json)")
-
- flag.Parse()
-
- // Seed random number generator
- rand.Seed(time.Now().UnixNano())
-
- switch *mode {
- case "realtime":
- runRealtimeMode(*pushgatewayURL, *jobName, *continuous)
-
- case "historic":
- if err := PushHistoricData(*prometheusURL, *hoursAgo); err != nil {
- log.Fatalf("Failed to push historic data: %v", err)
- }
-
- case "backfill":
- if err := BackfillHistoricData(*prometheusURL, *startHours, *endHours, *interval); err != nil {
- log.Fatalf("Failed to backfill data: %v", err)
- }
-
- case "auto":
- log.Printf("šŸ¤– AUTO mode: Automatically detecting timestamp age and choosing ingestion method\n")
- var err error
- if *inputFile != "" {
- err = AutoIngestFromFile(*inputFile, *inputFormat, *pushgatewayURL, *prometheusURL, *jobName)
- } else {
- err = AutoIngestFromStdin(*inputFormat, *pushgatewayURL, *prometheusURL, *jobName)
- }
- if err != nil {
- log.Fatalf("Failed to auto-ingest: %v", err)
- }
-
- default:
- log.Fatalf("Unknown mode: %s (use realtime, historic, backfill, or auto)", *mode)
- }
-}
-
-func runRealtimeMode(pushgatewayURL, jobName string, continuous bool) {
- log.Printf("Starting Prometheus metrics pusher in REALTIME mode")
- log.Printf("Pushgateway URL: %s", pushgatewayURL)
- log.Printf("Job name: %s", jobName)
-
- // Push immediately on start
- simulateMetrics()
- if err := pushMetrics(pushgatewayURL, jobName); err != nil {
- log.Printf("Error pushing metrics: %v", err)
- } else {
- log.Printf("Successfully pushed metrics to Pushgateway")
- }
-
- if continuous {
- // Push metrics every 15 seconds
- ticker := time.NewTicker(15 * time.Second)
- defer ticker.Stop()
-
- log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.")
-
- for range ticker.C {
- simulateMetrics()
- if err := pushMetrics(pushgatewayURL, jobName); err != nil {
- log.Printf("Error pushing metrics: %v", err)
- } else {
- log.Printf("Successfully pushed metrics to Pushgateway")
- }
- }
- }
-}
diff --git a/f3s/prometheus-pusher/prometheus-pusher b/f3s/prometheus-pusher/prometheus-pusher
index cbb807a..a431693 100755
--- a/f3s/prometheus-pusher/prometheus-pusher
+++ b/f3s/prometheus-pusher/prometheus-pusher
Binary files differ
diff --git a/f3s/prometheus-pusher/realtime.go b/f3s/prometheus-pusher/realtime.go
deleted file mode 100644
index 092089f..0000000
--- a/f3s/prometheus-pusher/realtime.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package main
-
-import (
- "fmt"
- "math/rand"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/push"
-)
-
-// Define metrics for realtime pushing
-var (
- // Counter: Monotonically increasing value (e.g., total requests processed)
- requestsTotal = prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "app_requests_total",
- Help: "Total number of requests processed",
- },
- )
-
- // Gauge: Value that can go up or down (e.g., current temperature, active connections)
- activeConnections = prometheus.NewGauge(
- prometheus.GaugeOpts{
- Name: "app_active_connections",
- Help: "Number of currently active connections",
- },
- )
-
- // Gauge for temperature simulation
- temperatureCelsius = prometheus.NewGauge(
- prometheus.GaugeOpts{
- Name: "app_temperature_celsius",
- Help: "Current temperature in Celsius",
- },
- )
-
- // Histogram: Distribution of values (e.g., request duration)
- requestDuration = prometheus.NewHistogram(
- prometheus.HistogramOpts{
- Name: "app_request_duration_seconds",
- Help: "Histogram of request duration in seconds",
- Buckets: prometheus.DefBuckets, // Default buckets: .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10
- },
- )
-
- // Counter with labels
- jobsProcessed = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "app_jobs_processed_total",
- Help: "Total number of jobs processed by type",
- },
- []string{"job_type", "status"},
- )
-)
-
-// simulateMetrics generates example metric data
-func simulateMetrics() {
- // Increment request counter
- requestsTotal.Add(float64(rand.Intn(10) + 1))
-
- // Update active connections (random number between 0-100)
- activeConnections.Set(float64(rand.Intn(100)))
-
- // Simulate temperature (random between 15-35 Celsius)
- temperatureCelsius.Set(15 + rand.Float64()*20)
-
- // Record some request durations
- for i := 0; i < rand.Intn(5)+1; i++ {
- duration := rand.Float64() * 2 // 0-2 seconds
- requestDuration.Observe(duration)
- }
-
- // Record job completions with labels
- jobTypes := []string{"email", "report", "backup"}
- statuses := []string{"success", "failed"}
-
- for _, jobType := range jobTypes {
- status := statuses[rand.Intn(len(statuses))]
- jobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5)))
- }
-}
-
-// pushMetrics pushes all metrics to the Pushgateway
-func pushMetrics(pushgatewayURL, jobName string) error {
- // Create a new pusher
- pusher := push.New(pushgatewayURL, jobName).
- Collector(requestsTotal).
- Collector(activeConnections).
- Collector(temperatureCelsius).
- Collector(requestDuration).
- Collector(jobsProcessed).
- Grouping("instance", "example-app")
-
- // Push metrics to the Pushgateway
- if err := pusher.Push(); err != nil {
- return fmt.Errorf("failed to push metrics: %w", err)
- }
-
- return nil
-}