package ingester import ( "bytes" "context" "fmt" "io" "log" "math/rand" "net/http" "sync" "sync/atomic" "time" "epimetheus/internal/metrics" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" ) const ( requestTimeout = 10 * time.Second backfillDelay = 100 * time.Millisecond // BatchSize defines the maximum number of samples per Remote Write request // Prometheus has a 32MB limit, so we keep batches small to avoid rejection BatchSize = 5000 // NumWorkers defines the number of concurrent goroutines for batch processing // Higher values increase throughput but may overwhelm Prometheus NumWorkers = 10 ) // Buffer pool for reusing buffers across requests (reduces GC pressure) var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } // Preallocated buffer pool for protobuf marshaling var protoBufferPool = sync.Pool{ New: func() interface{} { // Preallocate ~500KB buffer (typical batch size) buf := make([]byte, 0, 512*1024) return &buf }, } // TimeSeries object pool for reuse var timeSeriesPool = sync.Pool{ New: func() interface{} { return &prompb.TimeSeries{ Labels: make([]prompb.Label, 0, 10), // Typical label count Samples: make([]prompb.Sample, 0, 1), } }, } // Label slice pool for reuse var labelSlicePool = sync.Pool{ New: func() interface{} { labels := make([]prompb.Label, 0, 10) return &labels }, } // RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API. // This ingester preserves custom timestamps, making it suitable for importing historic data. type RemoteWriteIngester struct { client *http.Client } // NewRemoteWriteIngester creates a new Remote Write ingester with optimized HTTP client. func NewRemoteWriteIngester() RemoteWriteIngester { // Optimized HTTP transport with connection pooling transport := &http.Transport{ MaxIdleConns: 100, // Global connection pool MaxIdleConnsPerHost: NumWorkers, // Match worker count MaxConnsPerHost: NumWorkers, // Limit concurrent connections per host IdleConnTimeout: 90 * time.Second, // Keep connections alive longer DisableKeepAlives: false, // CRITICAL: enable keep-alive DisableCompression: true, // We compress manually with Snappy ForceAttemptHTTP2: true, // Use HTTP/2 if available WriteBufferSize: 64 * 1024, // 64KB write buffer ReadBufferSize: 64 * 1024, // 64KB read buffer } return RemoteWriteIngester{ client: &http.Client{ Timeout: requestTimeout, Transport: transport, }, } } // Ingest sends samples to Prometheus via Remote Write API with batching and concurrency. // Large datasets are automatically split into batches and processed by a worker pool // to avoid exceeding Prometheus's 32MB limit and maximize throughput. func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error { if len(samples) == 0 { return fmt.Errorf("no samples to ingest") } select { case <-ctx.Done(): return ctx.Err() default: } // Split samples into batches batches := chunkSamples(samples, BatchSize) totalBatches := len(batches) log.Printf("Splitting %d samples into %d batches (batch size: %d)", len(samples), totalBatches, BatchSize) log.Printf("Processing batches with %d concurrent workers", NumWorkers) // Counters for tracking progress (atomic for thread safety) var successCount int32 var errorCount int32 var processedCount int32 // Worker pool pattern batchChan := make(chan batchJob, totalBatches) errorsChan := make(chan error, totalBatches) var wg sync.WaitGroup // Start workers for w := 0; w < NumWorkers; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for job := range batchChan { // Check context before processing select { case <-ctx.Done(): errorsChan <- fmt.Errorf("worker %d cancelled", workerID) return default: } // Convert and send batch timeSeries := convertSamplesToTimeSeries(job.batch) writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { atomic.AddInt32(&errorCount, 1) errorsChan <- fmt.Errorf("batch %d: %w", job.index, err) } else { atomic.AddInt32(&successCount, 1) } // Update progress processed := atomic.AddInt32(&processedCount, 1) if processed%10 == 0 || int(processed) == totalBatches { progress := float64(processed) / float64(totalBatches) * 100 log.Printf("Progress: %.1f%% (%d/%d batches, %d success, %d errors)", progress, processed, totalBatches, atomic.LoadInt32(&successCount), atomic.LoadInt32(&errorCount)) } } }(w) } // Send batches to workers for idx, batch := range batches { batchChan <- batchJob{index: idx + 1, batch: batch} } close(batchChan) // Wait for all workers to finish wg.Wait() close(errorsChan) // Collect errors var firstError error errorList := make([]error, 0) for err := range errorsChan { errorList = append(errorList, err) if firstError == nil { firstError = err } } finalSuccess := int(atomic.LoadInt32(&successCount)) finalErrors := int(atomic.LoadInt32(&errorCount)) log.Printf("Batch ingestion complete: %d successful, %d errors", finalSuccess, finalErrors) if finalErrors > 0 { // Log first few errors as examples numToLog := 5 if len(errorList) < numToLog { numToLog = len(errorList) } log.Printf("Sample errors (showing %d of %d):", numToLog, finalErrors) for i := 0; i < numToLog; i++ { log.Printf(" - %v", errorList[i]) } return fmt.Errorf("completed with %d/%d batches failed", finalErrors, totalBatches) } return nil } // batchJob represents a batch to be processed by a worker. type batchJob struct { index int batch []metrics.Sample } // chunkSamples splits samples into batches of the specified size. func chunkSamples(samples []metrics.Sample, batchSize int) [][]metrics.Sample { var batches [][]metrics.Sample for i := 0; i < len(samples); i += batchSize { end := i + batchSize if end > len(samples) { end = len(samples) } batches = append(batches, samples[i:end]) } return batches } // IngestHistoric generates and ingests historic metrics for a specific time in the past. func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error { timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour) timeSeries := generateHistoricTimeSeries(timestamp) writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { return err } log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", hoursAgo, timestamp.Format(time.RFC3339)) return nil } // Backfill ingests historic metrics for a range of time points. func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error { log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", startHoursAgo, endHoursAgo, intervalHours) successCount := 0 errorCount := 0 for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil { log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) errorCount++ } else { successCount++ } select { case <-ctx.Done(): return ctx.Err() case <-time.After(backfillDelay): } } log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) if errorCount > 0 { return fmt.Errorf("backfill completed with %d errors", errorCount) } return nil } // sendWriteRequest sends a write request to Prometheus using pooled buffers. func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error { // Get protobuf buffer from pool protoBufPtr := protoBufferPool.Get().(*[]byte) protoBuf := (*protoBufPtr)[:0] // Reset length but keep capacity defer protoBufferPool.Put(protoBufPtr) // Marshal into pooled buffer data, err := writeRequest.Marshal() if err != nil { return fmt.Errorf("failed to marshal write request: %w", err) } // Compress using pooled buffer compressed := snappy.Encode(protoBuf, data) // Get request buffer from pool buf := bufferPool.Get().(*bytes.Buffer) buf.Reset() defer bufferPool.Put(buf) buf.Write(compressed) req, err := http.NewRequestWithContext(ctx, "POST", url, buf) if err != nil { return fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "snappy") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("Content-Length", fmt.Sprintf("%d", buf.Len())) resp, err := i.client.Do(req) if err != nil { return fmt.Errorf("failed to send remote write request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) } return nil } // convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format with pooling. func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries { // Preallocate with exact capacity to avoid reallocation timeSeries := make([]prompb.TimeSeries, 0, len(samples)) // Reusable label slice labelsPtr := labelSlicePool.Get().(*[]prompb.Label) labels := *labelsPtr defer labelSlicePool.Put(labelsPtr) for i := range samples { sample := &samples[i] // Avoid copying // Reset labels slice for reuse labels = labels[:0] // Add __name__ label labels = append(labels, prompb.Label{Name: "__name__", Value: sample.MetricName}) // Add custom labels for k, v := range sample.Labels { labels = append(labels, prompb.Label{Name: k, Value: v}) } // Copy labels (must not share slice across time series) labelsCopy := make([]prompb.Label, len(labels)) copy(labelsCopy, labels) // Create time series (reuse pattern, but we need unique objects) timeSeries = append(timeSeries, prompb.TimeSeries{ Labels: labelsCopy, Samples: []prompb.Sample{{ Value: sample.Value, Timestamp: sample.Timestamp.UnixMilli(), }}, }) } return timeSeries } // generateHistoricTimeSeries generates example time series for a specific timestamp. func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries { timestampMs := timestamp.UnixMilli() var timeSeries []prompb.TimeSeries baseLabels := []prompb.Label{ {Name: "instance", Value: "example-app"}, {Name: "job", Value: "historic_data"}, } timeSeries = append(timeSeries, createCounterSeries("epimetheus_test_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs)) timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs)) timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs)) timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...) timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...) return timeSeries } // createCounterSeries creates a counter metric time series. func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { labels := []prompb.Label{{Name: "__name__", Value: name}} labels = append(labels, baseLabels...) return prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}}, } } // createGaugeSeries creates a gauge metric time series. func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { return createCounterSeries(name, baseLabels, value, timestamp) } // generateHistogramSeries generates histogram bucket time series. func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} var series []prompb.TimeSeries cumulativeCount := 0 for _, bucket := range buckets { cumulativeCount += rand.Intn(5) labels := []prompb.Label{ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, {Name: "le", Value: fmt.Sprintf("%g", bucket)}, } labels = append(labels, baseLabels...) series = append(series, prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, }) } infLabels := []prompb.Label{ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, {Name: "le", Value: "+Inf"}, } infLabels = append(infLabels, baseLabels...) series = append(series, prompb.TimeSeries{ Labels: infLabels, Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, }) series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp)) series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp)) return series } // generateLabeledCounterSeries generates labeled counter time series. func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { jobTypes := []string{"email", "report", "backup"} statuses := []string{"success", "failed"} var series []prompb.TimeSeries for _, jobType := range jobTypes { for _, status := range statuses { labels := []prompb.Label{ {Name: "__name__", Value: "epimetheus_test_jobs_processed_total"}, {Name: "job_type", Value: jobType}, {Name: "status", Value: status}, } labels = append(labels, baseLabels...) series = append(series, prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}}, }) } } return series }