summaryrefslogtreecommitdiff
path: root/internal/ingester/auto.go
blob: 315d76754bcc5500442af864e66fdcb75b50c91a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package ingester

import (
	"context"
	"fmt"
	"log"
	"time"

	"epimetheus/internal/config"
	"epimetheus/internal/metrics"
)

const ageThreshold = 5 * time.Minute

// DetermineMode automatically determines which ingestion mode to use based on timestamp age.
// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway).
func DetermineMode(timestamp time.Time) config.Mode {
	age := time.Since(timestamp)
	if age > ageThreshold {
		return config.ModeHistoric
	}
	return config.ModeRealtime
}

// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters.
type AutoIngester struct {
	pushgateway PushgatewayIngester
	remoteWrite RemoteWriteIngester
	collectors  metrics.Collectors
}

// NewAutoIngester creates a new auto ingester.
func NewAutoIngester(collectors metrics.Collectors) AutoIngester {
	return AutoIngester{
		pushgateway: NewPushgatewayIngester(),
		remoteWrite: NewRemoteWriteIngester(),
		collectors:  collectors,
	}
}

// Ingest automatically routes samples to appropriate ingestion method based on timestamp age.
func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
	if len(samples) == 0 {
		return fmt.Errorf("no samples to ingest")
	}

	realtimeSamples, historicSamples := groupSamplesByMode(samples)

	logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples))

	if len(realtimeSamples) > 0 {
		if err := a.ingestRealtime(ctx, cfg); err != nil {
			return fmt.Errorf("failed to ingest realtime samples: %w", err)
		}
	}

	if len(historicSamples) > 0 {
		if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil {
			return fmt.Errorf("failed to ingest historic samples: %w", err)
		}
	}

	log.Printf("\nšŸŽ‰ Auto-ingest complete!")
	return nil
}

// groupSamplesByMode separates samples into realtime and historic groups.
func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) {
	realtimeSamples := make([]metrics.Sample, 0)
	historicSamples := make([]metrics.Sample, 0)

	for _, sample := range samples {
		if DetermineMode(sample.Timestamp) == config.ModeRealtime {
			realtimeSamples = append(realtimeSamples, sample)
		} else {
			historicSamples = append(historicSamples, sample)
		}
	}

	return realtimeSamples, historicSamples
}

// logIngestSummary logs the ingestion summary.
func logIngestSummary(total, realtime, historic int) {
	log.Printf("šŸ“Š Auto-ingest summary:")
	log.Printf("  Total samples: %d", total)
	log.Printf("  Realtime samples (< 5min old): %d", realtime)
	log.Printf("  Historic samples (> 5min old): %d", historic)
}

// ingestRealtime ingests realtime samples via Pushgateway.
func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error {
	log.Printf("\nšŸ”„ Ingesting REALTIME samples via Pushgateway...")
	log.Printf("  Note: Pushgateway uses current timestamp (original timestamps ignored)")

	if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
		return err
	}

	log.Printf("āœ… Successfully ingested realtime samples")
	return nil
}

// ingestHistoric ingests historic samples via Remote Write.
func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
	log.Printf("\nā° Ingesting %d HISTORIC samples via Remote Write...", len(samples))

	// Log a few sample details instead of all samples
	samplesToLog := 3
	if len(samples) < samplesToLog {
		samplesToLog = len(samples)
	}

	for i := 0; i < samplesToLog; i++ {
		age := time.Since(samples[i].Timestamp)
		log.Printf("  Sample %d: %s (age: %s)", i+1, samples[i].MetricName, formatDuration(age))
	}

	if len(samples) > samplesToLog {
		// Show oldest and newest sample ages
		oldestAge := time.Since(samples[0].Timestamp)
		newestAge := time.Since(samples[len(samples)-1].Timestamp)
		log.Printf("  ... (%d more samples)", len(samples)-samplesToLog)
		log.Printf("  Age range: %s (oldest) to %s (newest)", formatDuration(oldestAge), formatDuration(newestAge))
	}

	if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil {
		return err
	}

	log.Printf("āœ… Successfully ingested %d historic samples", len(samples))
	return nil
}

// formatDuration formats a duration in human-readable form.
func formatDuration(d time.Duration) string {
	if d < time.Minute {
		return fmt.Sprintf("%.0f seconds", d.Seconds())
	} else if d < time.Hour {
		return fmt.Sprintf("%.0f minutes", d.Minutes())
	} else if d < 24*time.Hour {
		return fmt.Sprintf("%.1f hours", d.Hours())
	}
	return fmt.Sprintf("%.1f days", d.Hours()/24)
}