summaryrefslogtreecommitdiff
path: root/internal/ingester/remotewrite.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingester/remotewrite.go')
-rw-r--r--internal/ingester/remotewrite.go455
1 files changed, 455 insertions, 0 deletions
diff --git a/internal/ingester/remotewrite.go b/internal/ingester/remotewrite.go
new file mode 100644
index 0000000..b88b7fa
--- /dev/null
+++ b/internal/ingester/remotewrite.go
@@ -0,0 +1,455 @@
+package ingester
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "epimetheus/internal/metrics"
+
+ "github.com/golang/snappy"
+ "github.com/prometheus/prometheus/prompb"
+)
+
+const (
+ requestTimeout = 10 * time.Second
+ backfillDelay = 100 * time.Millisecond
+ // BatchSize defines the maximum number of samples per Remote Write request
+ // Prometheus has a 32MB limit, so we keep batches small to avoid rejection
+ BatchSize = 5000
+ // NumWorkers defines the number of concurrent goroutines for batch processing
+ // Higher values increase throughput but may overwhelm Prometheus
+ NumWorkers = 10
+)
+
+// Buffer pool for reusing buffers across requests (reduces GC pressure)
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+// Preallocated buffer pool for protobuf marshaling
+var protoBufferPool = sync.Pool{
+ New: func() interface{} {
+ // Preallocate ~500KB buffer (typical batch size)
+ buf := make([]byte, 0, 512*1024)
+ return &buf
+ },
+}
+
+// TimeSeries object pool for reuse
+var timeSeriesPool = sync.Pool{
+ New: func() interface{} {
+ return &prompb.TimeSeries{
+ Labels: make([]prompb.Label, 0, 10), // Typical label count
+ Samples: make([]prompb.Sample, 0, 1),
+ }
+ },
+}
+
+// Label slice pool for reuse
+var labelSlicePool = sync.Pool{
+ New: func() interface{} {
+ labels := make([]prompb.Label, 0, 10)
+ return &labels
+ },
+}
+
+// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API.
+// This ingester preserves custom timestamps, making it suitable for importing historic data.
+type RemoteWriteIngester struct {
+ client *http.Client
+}
+
+// NewRemoteWriteIngester creates a new Remote Write ingester with optimized HTTP client.
+func NewRemoteWriteIngester() RemoteWriteIngester {
+ // Optimized HTTP transport with connection pooling
+ transport := &http.Transport{
+ MaxIdleConns: 100, // Global connection pool
+ MaxIdleConnsPerHost: NumWorkers, // Match worker count
+ MaxConnsPerHost: NumWorkers, // Limit concurrent connections per host
+ IdleConnTimeout: 90 * time.Second, // Keep connections alive longer
+ DisableKeepAlives: false, // CRITICAL: enable keep-alive
+ DisableCompression: true, // We compress manually with Snappy
+ ForceAttemptHTTP2: true, // Use HTTP/2 if available
+ WriteBufferSize: 64 * 1024, // 64KB write buffer
+ ReadBufferSize: 64 * 1024, // 64KB read buffer
+ }
+
+ return RemoteWriteIngester{
+ client: &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ },
+ }
+}
+
+// Ingest sends samples to Prometheus via Remote Write API with batching and concurrency.
+// Large datasets are automatically split into batches and processed by a worker pool
+// to avoid exceeding Prometheus's 32MB limit and maximize throughput.
+func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ // Split samples into batches
+ batches := chunkSamples(samples, BatchSize)
+ totalBatches := len(batches)
+
+ log.Printf("Splitting %d samples into %d batches (batch size: %d)", len(samples), totalBatches, BatchSize)
+ log.Printf("Processing batches with %d concurrent workers", NumWorkers)
+
+ // Counters for tracking progress (atomic for thread safety)
+ var successCount int32
+ var errorCount int32
+ var processedCount int32
+
+ // Worker pool pattern
+ batchChan := make(chan batchJob, totalBatches)
+ errorsChan := make(chan error, totalBatches)
+ var wg sync.WaitGroup
+
+ // Start workers
+ for w := 0; w < NumWorkers; w++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for job := range batchChan {
+ // Check context before processing
+ select {
+ case <-ctx.Done():
+ errorsChan <- fmt.Errorf("worker %d cancelled", workerID)
+ return
+ default:
+ }
+
+ // Convert and send batch
+ timeSeries := convertSamplesToTimeSeries(job.batch)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil {
+ atomic.AddInt32(&errorCount, 1)
+ errorsChan <- fmt.Errorf("batch %d: %w", job.index, err)
+ } else {
+ atomic.AddInt32(&successCount, 1)
+ }
+
+ // Update progress
+ processed := atomic.AddInt32(&processedCount, 1)
+ if processed%10 == 0 || int(processed) == totalBatches {
+ progress := float64(processed) / float64(totalBatches) * 100
+ log.Printf("Progress: %.1f%% (%d/%d batches, %d success, %d errors)",
+ progress, processed, totalBatches, atomic.LoadInt32(&successCount), atomic.LoadInt32(&errorCount))
+ }
+ }
+ }(w)
+ }
+
+ // Send batches to workers
+ for idx, batch := range batches {
+ batchChan <- batchJob{index: idx + 1, batch: batch}
+ }
+ close(batchChan)
+
+ // Wait for all workers to finish
+ wg.Wait()
+ close(errorsChan)
+
+ // Collect errors
+ var firstError error
+ errorList := make([]error, 0)
+ for err := range errorsChan {
+ errorList = append(errorList, err)
+ if firstError == nil {
+ firstError = err
+ }
+ }
+
+ finalSuccess := int(atomic.LoadInt32(&successCount))
+ finalErrors := int(atomic.LoadInt32(&errorCount))
+
+ log.Printf("Batch ingestion complete: %d successful, %d errors", finalSuccess, finalErrors)
+
+ if finalErrors > 0 {
+ // Log first few errors as examples
+ numToLog := 5
+ if len(errorList) < numToLog {
+ numToLog = len(errorList)
+ }
+ log.Printf("Sample errors (showing %d of %d):", numToLog, finalErrors)
+ for i := 0; i < numToLog; i++ {
+ log.Printf(" - %v", errorList[i])
+ }
+ return fmt.Errorf("completed with %d/%d batches failed", finalErrors, totalBatches)
+ }
+
+ return nil
+}
+
+// batchJob represents a batch to be processed by a worker.
+type batchJob struct {
+ index int
+ batch []metrics.Sample
+}
+
+// chunkSamples splits samples into batches of the specified size.
+func chunkSamples(samples []metrics.Sample, batchSize int) [][]metrics.Sample {
+ var batches [][]metrics.Sample
+
+ for i := 0; i < len(samples); i += batchSize {
+ end := i + batchSize
+ if end > len(samples) {
+ end = len(samples)
+ }
+ batches = append(batches, samples[i:end])
+ }
+
+ return batches
+}
+
+// IngestHistoric generates and ingests historic metrics for a specific time in the past.
+func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error {
+ timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour)
+ timeSeries := generateHistoricTimeSeries(timestamp)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil {
+ return err
+ }
+
+ log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)",
+ hoursAgo, timestamp.Format(time.RFC3339))
+ return nil
+}
+
+// Backfill ingests historic metrics for a range of time points.
+func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error {
+ log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)",
+ startHoursAgo, endHoursAgo, intervalHours)
+
+ successCount := 0
+ errorCount := 0
+
+ for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours {
+ if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil {
+ log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err)
+ errorCount++
+ } else {
+ successCount++
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(backfillDelay):
+ }
+ }
+
+ log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount)
+
+ if errorCount > 0 {
+ return fmt.Errorf("backfill completed with %d errors", errorCount)
+ }
+
+ return nil
+}
+
+// sendWriteRequest sends a write request to Prometheus using pooled buffers.
+func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error {
+ // Get protobuf buffer from pool
+ protoBufPtr := protoBufferPool.Get().(*[]byte)
+ protoBuf := (*protoBufPtr)[:0] // Reset length but keep capacity
+ defer protoBufferPool.Put(protoBufPtr)
+
+ // Marshal into pooled buffer
+ data, err := writeRequest.Marshal()
+ if err != nil {
+ return fmt.Errorf("failed to marshal write request: %w", err)
+ }
+
+ // Compress using pooled buffer
+ compressed := snappy.Encode(protoBuf, data)
+
+ // Get request buffer from pool
+ buf := bufferPool.Get().(*bytes.Buffer)
+ buf.Reset()
+ defer bufferPool.Put(buf)
+
+ buf.Write(compressed)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", url, buf)
+ if err != nil {
+ return fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/x-protobuf")
+ req.Header.Set("Content-Encoding", "snappy")
+ req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
+ req.Header.Set("Content-Length", fmt.Sprintf("%d", buf.Len()))
+
+ resp, err := i.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to send remote write request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format with pooling.
+func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries {
+ // Preallocate with exact capacity to avoid reallocation
+ timeSeries := make([]prompb.TimeSeries, 0, len(samples))
+
+ // Reusable label slice
+ labelsPtr := labelSlicePool.Get().(*[]prompb.Label)
+ labels := *labelsPtr
+ defer labelSlicePool.Put(labelsPtr)
+
+ for i := range samples {
+ sample := &samples[i] // Avoid copying
+
+ // Reset labels slice for reuse
+ labels = labels[:0]
+
+ // Add __name__ label
+ labels = append(labels, prompb.Label{Name: "__name__", Value: sample.MetricName})
+
+ // Add custom labels
+ for k, v := range sample.Labels {
+ labels = append(labels, prompb.Label{Name: k, Value: v})
+ }
+
+ // Copy labels (must not share slice across time series)
+ labelsCopy := make([]prompb.Label, len(labels))
+ copy(labelsCopy, labels)
+
+ // Create time series (reuse pattern, but we need unique objects)
+ timeSeries = append(timeSeries, prompb.TimeSeries{
+ Labels: labelsCopy,
+ Samples: []prompb.Sample{{
+ Value: sample.Value,
+ Timestamp: sample.Timestamp.UnixMilli(),
+ }},
+ })
+ }
+
+ return timeSeries
+}
+
+// generateHistoricTimeSeries generates example time series for a specific timestamp.
+func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries {
+ timestampMs := timestamp.UnixMilli()
+ var timeSeries []prompb.TimeSeries
+
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "example-app"},
+ {Name: "job", Value: "historic_data"},
+ }
+
+ timeSeries = append(timeSeries, createCounterSeries("epimetheus_test_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs))
+
+ timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...)
+ timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...)
+
+ return timeSeries
+}
+
+// createCounterSeries creates a counter metric time series.
+func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ labels := []prompb.Label{{Name: "__name__", Value: name}}
+ labels = append(labels, baseLabels...)
+
+ return prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}},
+ }
+}
+
+// createGaugeSeries creates a gauge metric time series.
+func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ return createCounterSeries(name, baseLabels, value, timestamp)
+}
+
+// generateHistogramSeries generates histogram bucket time series.
+func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
+ var series []prompb.TimeSeries
+
+ cumulativeCount := 0
+ for _, bucket := range buckets {
+ cumulativeCount += rand.Intn(5)
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"},
+ {Name: "le", Value: fmt.Sprintf("%g", bucket)},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+ }
+
+ infLabels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"},
+ {Name: "le", Value: "+Inf"},
+ }
+ infLabels = append(infLabels, baseLabels...)
+ series = append(series, prompb.TimeSeries{
+ Labels: infLabels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+
+ series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp))
+ series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp))
+
+ return series
+}
+
+// generateLabeledCounterSeries generates labeled counter time series.
+func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ jobTypes := []string{"email", "report", "backup"}
+ statuses := []string{"success", "failed"}
+ var series []prompb.TimeSeries
+
+ for _, jobType := range jobTypes {
+ for _, status := range statuses {
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_jobs_processed_total"},
+ {Name: "job_type", Value: jobType},
+ {Name: "status", Value: status},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}},
+ })
+ }
+ }
+
+ return series
+}