package parser import ( "context" "encoding/csv" "fmt" "io" "log" "regexp" "strconv" "strings" "time" "epimetheus/internal/metrics" "epimetheus/internal/resolver" ) // TabularCSVParser parses tabular CSV files where the first row is headers // and all subsequent rows are data. Numeric columns become metrics, string columns become labels. type TabularCSVParser struct { metricName string timestamp time.Time resolver *resolver.DNSResolver // DNS resolver for IP addresses resolveLabels map[string]bool // Set of label names to resolve via DNS } // SaveDNSCache saves the DNS cache to disk (called after successful ingestion) func (p *TabularCSVParser) SaveDNSCache() { if p.resolver != nil { cacheSize := p.resolver.GetCacheSize() if err := p.resolver.SaveCache(); err != nil { // Log error but don't fail - cache saving is not critical log.Printf("⚠️ Failed to save DNS cache: %v", err) } else if cacheSize > 0 { log.Printf("💾 Saved DNS cache with %d entries", cacheSize) } } } // numericLabelColumns is a list of column names that should be treated as labels // even if they contain numeric values. This is useful for categorical codes like // HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics. // // Column names are matched case-insensitively. // // Common examples: // - responsecode: HTTP response codes (200, 404, 500) // - statuscode: Application status codes // - status: Status identifiers // - code: Generic codes // - period: Date periods (20160101, 20160102, YYYYMMDD format) var numericLabelColumns = map[string]bool{ "responsecode": true, "statuscode": true, "status_code": true, "response_code": true, "status": true, "code": true, "period": true, } // NewTabularCSVParser creates a new tabular CSV parser // metricName is the name of the metric to create // timestamp is the timestamp to use for all samples // resolveIPLabels is a list of label names containing IP addresses to resolve via DNS func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser { // Build set for O(1) lookup resolveMap := make(map[string]bool) for _, label := range resolveIPLabels { resolveMap[label] = true } return &TabularCSVParser{ metricName: metricName, timestamp: timestamp, resolver: resolver.NewDNSResolver(), resolveLabels: resolveMap, } } // Parse reads metrics from a tabular CSV format // First row: column headers // Subsequent rows: data values // All columns become labels (numeric values are converted to strings) func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { var samples []metrics.Sample csvReader := csv.NewReader(reader) csvReader.Comment = '#' csvReader.TrimLeadingSpace = true csvReader.FieldsPerRecord = -1 // Allow variable number of fields // Read header row headers, err := csvReader.Read() if err != nil { return nil, fmt.Errorf("failed to read header row: %w", err) } if len(headers) == 0 { return nil, fmt.Errorf("empty header row") } // Trim whitespace and sanitize headers for Prometheus label compatibility for i := range headers { headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i])) } // Read data rows rowNum := 1 for { select { case <-ctx.Done(): return nil, ctx.Err() default: } record, err := csvReader.Read() if err == io.EOF { break } if err != nil { return nil, fmt.Errorf("line %d: %w", rowNum+1, err) } rowNum++ if len(record) != len(headers) { // Skip rows with mismatched column counts continue } // Add microsecond offset to ensure unique timestamps per row // This prevents duplicate timestamp errors when multiple rows // have the same label combinations rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond) rowSamples := p.parseRecord(headers, record, rowTimestamp) samples = append(samples, rowSamples...) } return samples, nil } // parseRecord converts a single CSV row into multiple Samples // Numeric columns become separate metrics, string columns become labels. // // Exception: Column names in numericLabelColumns are always treated as labels, // even if they contain numbers (e.g., responsecode: 200, 404, 500). // // DNS Resolution: Label values matching IP addresses in resolveLabels will be // resolved to hostnames via reverse DNS lookup. func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample { var samples []metrics.Sample // First pass: identify numeric vs string columns and collect label values labels := make(map[string]string) var numericCols []struct { name string value float64 } for i, header := range headers { if i >= len(record) { continue } value := strings.TrimSpace(record[i]) // Check if this column is in the exception list (should be label even if numeric) headerLower := strings.ToLower(header) if numericLabelColumns[headerLower] { // This column should always be a label, even if it's numeric labels[header] = value continue } // Try to parse as number if floatVal, err := strconv.ParseFloat(value, 64); err == nil { // It's a numeric column - will become a metric numericCols = append(numericCols, struct { name string value float64 }{name: header, value: floatVal}) } else { // It's a string column - becomes a label labels[header] = value } } // Resolve IP addresses to hostnames for labelName := range p.resolveLabels { if ipValue, exists := labels[labelName]; exists { // Attempt DNS resolution if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" { // Replace IP with hostname labels[labelName] = hostname } // If resolution failed, ipValue remains unchanged } } // Create one sample per numeric column for _, numCol := range numericCols { metricName := p.metricName + "_" + numCol.name // Copy labels for each metric metricLabels := make(map[string]string, len(labels)) for k, v := range labels { metricLabels[k] = v } samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp)) } return samples } // sanitizeLabelName converts a string to a valid Prometheus label name // Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]* // Examples: // "avg(totaltime)" -> "avg_totaltime" // "sum(rcv)" -> "sum_rcv" // "response-code" -> "response_code" // "123invalid" -> "label_123invalid" func sanitizeLabelName(name string) string { if name == "" { return "unknown" } // Replace invalid characters with underscore re := regexp.MustCompile(`[^a-zA-Z0-9_]`) sanitized := re.ReplaceAllString(name, "_") // Ensure it starts with a letter or underscore if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' { sanitized = "label_" + sanitized } // Remove consecutive underscores sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_") // Remove leading/trailing underscores sanitized = strings.Trim(sanitized, "_") if sanitized == "" { return "unknown" } return sanitized } // ParseFloat64 tries to parse a string as float64, returns 0.0 if fails func parseFloat64(s string) float64 { f, err := strconv.ParseFloat(strings.TrimSpace(s), 64) if err != nil { return 0.0 } return f }