diff options
Diffstat (limited to 'internal/ingester/remotewrite.go')
| -rw-r--r-- | internal/ingester/remotewrite.go | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/internal/ingester/remotewrite.go b/internal/ingester/remotewrite.go new file mode 100644 index 0000000..b88b7fa --- /dev/null +++ b/internal/ingester/remotewrite.go @@ -0,0 +1,455 @@ +package ingester + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "sync" + "sync/atomic" + "time" + + "epimetheus/internal/metrics" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" +) + +const ( + requestTimeout = 10 * time.Second + backfillDelay = 100 * time.Millisecond + // BatchSize defines the maximum number of samples per Remote Write request + // Prometheus has a 32MB limit, so we keep batches small to avoid rejection + BatchSize = 5000 + // NumWorkers defines the number of concurrent goroutines for batch processing + // Higher values increase throughput but may overwhelm Prometheus + NumWorkers = 10 +) + +// Buffer pool for reusing buffers across requests (reduces GC pressure) +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +// Preallocated buffer pool for protobuf marshaling +var protoBufferPool = sync.Pool{ + New: func() interface{} { + // Preallocate ~500KB buffer (typical batch size) + buf := make([]byte, 0, 512*1024) + return &buf + }, +} + +// TimeSeries object pool for reuse +var timeSeriesPool = sync.Pool{ + New: func() interface{} { + return &prompb.TimeSeries{ + Labels: make([]prompb.Label, 0, 10), // Typical label count + Samples: make([]prompb.Sample, 0, 1), + } + }, +} + +// Label slice pool for reuse +var labelSlicePool = sync.Pool{ + New: func() interface{} { + labels := make([]prompb.Label, 0, 10) + return &labels + }, +} + +// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API. +// This ingester preserves custom timestamps, making it suitable for importing historic data. +type RemoteWriteIngester struct { + client *http.Client +} + +// NewRemoteWriteIngester creates a new Remote Write ingester with optimized HTTP client. +func NewRemoteWriteIngester() RemoteWriteIngester { + // Optimized HTTP transport with connection pooling + transport := &http.Transport{ + MaxIdleConns: 100, // Global connection pool + MaxIdleConnsPerHost: NumWorkers, // Match worker count + MaxConnsPerHost: NumWorkers, // Limit concurrent connections per host + IdleConnTimeout: 90 * time.Second, // Keep connections alive longer + DisableKeepAlives: false, // CRITICAL: enable keep-alive + DisableCompression: true, // We compress manually with Snappy + ForceAttemptHTTP2: true, // Use HTTP/2 if available + WriteBufferSize: 64 * 1024, // 64KB write buffer + ReadBufferSize: 64 * 1024, // 64KB read buffer + } + + return RemoteWriteIngester{ + client: &http.Client{ + Timeout: requestTimeout, + Transport: transport, + }, + } +} + +// Ingest sends samples to Prometheus via Remote Write API with batching and concurrency. +// Large datasets are automatically split into batches and processed by a worker pool +// to avoid exceeding Prometheus's 32MB limit and maximize throughput. +func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Split samples into batches + batches := chunkSamples(samples, BatchSize) + totalBatches := len(batches) + + log.Printf("Splitting %d samples into %d batches (batch size: %d)", len(samples), totalBatches, BatchSize) + log.Printf("Processing batches with %d concurrent workers", NumWorkers) + + // Counters for tracking progress (atomic for thread safety) + var successCount int32 + var errorCount int32 + var processedCount int32 + + // Worker pool pattern + batchChan := make(chan batchJob, totalBatches) + errorsChan := make(chan error, totalBatches) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < NumWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for job := range batchChan { + // Check context before processing + select { + case <-ctx.Done(): + errorsChan <- fmt.Errorf("worker %d cancelled", workerID) + return + default: + } + + // Convert and send batch + timeSeries := convertSamplesToTimeSeries(job.batch) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { + atomic.AddInt32(&errorCount, 1) + errorsChan <- fmt.Errorf("batch %d: %w", job.index, err) + } else { + atomic.AddInt32(&successCount, 1) + } + + // Update progress + processed := atomic.AddInt32(&processedCount, 1) + if processed%10 == 0 || int(processed) == totalBatches { + progress := float64(processed) / float64(totalBatches) * 100 + log.Printf("Progress: %.1f%% (%d/%d batches, %d success, %d errors)", + progress, processed, totalBatches, atomic.LoadInt32(&successCount), atomic.LoadInt32(&errorCount)) + } + } + }(w) + } + + // Send batches to workers + for idx, batch := range batches { + batchChan <- batchJob{index: idx + 1, batch: batch} + } + close(batchChan) + + // Wait for all workers to finish + wg.Wait() + close(errorsChan) + + // Collect errors + var firstError error + errorList := make([]error, 0) + for err := range errorsChan { + errorList = append(errorList, err) + if firstError == nil { + firstError = err + } + } + + finalSuccess := int(atomic.LoadInt32(&successCount)) + finalErrors := int(atomic.LoadInt32(&errorCount)) + + log.Printf("Batch ingestion complete: %d successful, %d errors", finalSuccess, finalErrors) + + if finalErrors > 0 { + // Log first few errors as examples + numToLog := 5 + if len(errorList) < numToLog { + numToLog = len(errorList) + } + log.Printf("Sample errors (showing %d of %d):", numToLog, finalErrors) + for i := 0; i < numToLog; i++ { + log.Printf(" - %v", errorList[i]) + } + return fmt.Errorf("completed with %d/%d batches failed", finalErrors, totalBatches) + } + + return nil +} + +// batchJob represents a batch to be processed by a worker. +type batchJob struct { + index int + batch []metrics.Sample +} + +// chunkSamples splits samples into batches of the specified size. +func chunkSamples(samples []metrics.Sample, batchSize int) [][]metrics.Sample { + var batches [][]metrics.Sample + + for i := 0; i < len(samples); i += batchSize { + end := i + batchSize + if end > len(samples) { + end = len(samples) + } + batches = append(batches, samples[i:end]) + } + + return batches +} + +// IngestHistoric generates and ingests historic metrics for a specific time in the past. +func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error { + timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour) + timeSeries := generateHistoricTimeSeries(timestamp) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { + return err + } + + log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", + hoursAgo, timestamp.Format(time.RFC3339)) + return nil +} + +// Backfill ingests historic metrics for a range of time points. +func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error { + log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", + startHoursAgo, endHoursAgo, intervalHours) + + successCount := 0 + errorCount := 0 + + for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { + if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil { + log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) + errorCount++ + } else { + successCount++ + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backfillDelay): + } + } + + log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) + + if errorCount > 0 { + return fmt.Errorf("backfill completed with %d errors", errorCount) + } + + return nil +} + +// sendWriteRequest sends a write request to Prometheus using pooled buffers. +func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error { + // Get protobuf buffer from pool + protoBufPtr := protoBufferPool.Get().(*[]byte) + protoBuf := (*protoBufPtr)[:0] // Reset length but keep capacity + defer protoBufferPool.Put(protoBufPtr) + + // Marshal into pooled buffer + data, err := writeRequest.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal write request: %w", err) + } + + // Compress using pooled buffer + compressed := snappy.Encode(protoBuf, data) + + // Get request buffer from pool + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + buf.Write(compressed) + + req, err := http.NewRequestWithContext(ctx, "POST", url, buf) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + req.Header.Set("Content-Length", fmt.Sprintf("%d", buf.Len())) + + resp, err := i.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send remote write request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format with pooling. +func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries { + // Preallocate with exact capacity to avoid reallocation + timeSeries := make([]prompb.TimeSeries, 0, len(samples)) + + // Reusable label slice + labelsPtr := labelSlicePool.Get().(*[]prompb.Label) + labels := *labelsPtr + defer labelSlicePool.Put(labelsPtr) + + for i := range samples { + sample := &samples[i] // Avoid copying + + // Reset labels slice for reuse + labels = labels[:0] + + // Add __name__ label + labels = append(labels, prompb.Label{Name: "__name__", Value: sample.MetricName}) + + // Add custom labels + for k, v := range sample.Labels { + labels = append(labels, prompb.Label{Name: k, Value: v}) + } + + // Copy labels (must not share slice across time series) + labelsCopy := make([]prompb.Label, len(labels)) + copy(labelsCopy, labels) + + // Create time series (reuse pattern, but we need unique objects) + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: labelsCopy, + Samples: []prompb.Sample{{ + Value: sample.Value, + Timestamp: sample.Timestamp.UnixMilli(), + }}, + }) + } + + return timeSeries +} + +// generateHistoricTimeSeries generates example time series for a specific timestamp. +func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries { + timestampMs := timestamp.UnixMilli() + var timeSeries []prompb.TimeSeries + + baseLabels := []prompb.Label{ + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + } + + timeSeries = append(timeSeries, createCounterSeries("epimetheus_test_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs)) + + timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...) + timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...) + + return timeSeries +} + +// createCounterSeries creates a counter metric time series. +func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + labels := []prompb.Label{{Name: "__name__", Value: name}} + labels = append(labels, baseLabels...) + + return prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}}, + } +} + +// createGaugeSeries creates a gauge metric time series. +func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + return createCounterSeries(name, baseLabels, value, timestamp) +} + +// generateHistogramSeries generates histogram bucket time series. +func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + var series []prompb.TimeSeries + + cumulativeCount := 0 + for _, bucket := range buckets { + cumulativeCount += rand.Intn(5) + labels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, + {Name: "le", Value: fmt.Sprintf("%g", bucket)}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + } + + infLabels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, + {Name: "le", Value: "+Inf"}, + } + infLabels = append(infLabels, baseLabels...) + series = append(series, prompb.TimeSeries{ + Labels: infLabels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + + series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp)) + series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp)) + + return series +} + +// generateLabeledCounterSeries generates labeled counter time series. +func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + jobTypes := []string{"email", "report", "backup"} + statuses := []string{"success", "failed"} + var series []prompb.TimeSeries + + for _, jobType := range jobTypes { + for _, status := range statuses { + labels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_jobs_processed_total"}, + {Name: "job_type", Value: jobType}, + {Name: "status", Value: status}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}}, + }) + } + } + + return series +} |
