diff options
Diffstat (limited to 'internal/ingester/auto.go')
| -rw-r--r-- | internal/ingester/auto.go | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/internal/ingester/auto.go b/internal/ingester/auto.go new file mode 100644 index 0000000..315d767 --- /dev/null +++ b/internal/ingester/auto.go @@ -0,0 +1,145 @@ +package ingester + +import ( + "context" + "fmt" + "log" + "time" + + "epimetheus/internal/config" + "epimetheus/internal/metrics" +) + +const ageThreshold = 5 * time.Minute + +// DetermineMode automatically determines which ingestion mode to use based on timestamp age. +// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway). +func DetermineMode(timestamp time.Time) config.Mode { + age := time.Since(timestamp) + if age > ageThreshold { + return config.ModeHistoric + } + return config.ModeRealtime +} + +// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters. +type AutoIngester struct { + pushgateway PushgatewayIngester + remoteWrite RemoteWriteIngester + collectors metrics.Collectors +} + +// NewAutoIngester creates a new auto ingester. +func NewAutoIngester(collectors metrics.Collectors) AutoIngester { + return AutoIngester{ + pushgateway: NewPushgatewayIngester(), + remoteWrite: NewRemoteWriteIngester(), + collectors: collectors, + } +} + +// Ingest automatically routes samples to appropriate ingestion method based on timestamp age. +func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + realtimeSamples, historicSamples := groupSamplesByMode(samples) + + logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples)) + + if len(realtimeSamples) > 0 { + if err := a.ingestRealtime(ctx, cfg); err != nil { + return fmt.Errorf("failed to ingest realtime samples: %w", err) + } + } + + if len(historicSamples) > 0 { + if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil { + return fmt.Errorf("failed to ingest historic samples: %w", err) + } + } + + log.Printf("\nš Auto-ingest complete!") + return nil +} + +// groupSamplesByMode separates samples into realtime and historic groups. +func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) { + realtimeSamples := make([]metrics.Sample, 0) + historicSamples := make([]metrics.Sample, 0) + + for _, sample := range samples { + if DetermineMode(sample.Timestamp) == config.ModeRealtime { + realtimeSamples = append(realtimeSamples, sample) + } else { + historicSamples = append(historicSamples, sample) + } + } + + return realtimeSamples, historicSamples +} + +// logIngestSummary logs the ingestion summary. +func logIngestSummary(total, realtime, historic int) { + log.Printf("š Auto-ingest summary:") + log.Printf(" Total samples: %d", total) + log.Printf(" Realtime samples (< 5min old): %d", realtime) + log.Printf(" Historic samples (> 5min old): %d", historic) +} + +// ingestRealtime ingests realtime samples via Pushgateway. +func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error { + log.Printf("\nš Ingesting REALTIME samples via Pushgateway...") + log.Printf(" Note: Pushgateway uses current timestamp (original timestamps ignored)") + + if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + return err + } + + log.Printf("ā
Successfully ingested realtime samples") + return nil +} + +// ingestHistoric ingests historic samples via Remote Write. +func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(samples)) + + // Log a few sample details instead of all samples + samplesToLog := 3 + if len(samples) < samplesToLog { + samplesToLog = len(samples) + } + + for i := 0; i < samplesToLog; i++ { + age := time.Since(samples[i].Timestamp) + log.Printf(" Sample %d: %s (age: %s)", i+1, samples[i].MetricName, formatDuration(age)) + } + + if len(samples) > samplesToLog { + // Show oldest and newest sample ages + oldestAge := time.Since(samples[0].Timestamp) + newestAge := time.Since(samples[len(samples)-1].Timestamp) + log.Printf(" ... (%d more samples)", len(samples)-samplesToLog) + log.Printf(" Age range: %s (oldest) to %s (newest)", formatDuration(oldestAge), formatDuration(newestAge)) + } + + if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil { + return err + } + + log.Printf("ā
Successfully ingested %d historic samples", len(samples)) + return nil +} + +// formatDuration formats a duration in human-readable form. +func formatDuration(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%.0f seconds", d.Seconds()) + } else if d < time.Hour { + return fmt.Sprintf("%.0f minutes", d.Minutes()) + } else if d < 24*time.Hour { + return fmt.Sprintf("%.1f hours", d.Hours()) + } + return fmt.Sprintf("%.1f days", d.Hours()/24) +} |
