diff options
Diffstat (limited to 'internal/parser/tabular_csv.go')
| -rw-r--r-- | internal/parser/tabular_csv.go | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go new file mode 100644 index 0000000..bfa4fbe --- /dev/null +++ b/internal/parser/tabular_csv.go @@ -0,0 +1,256 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "log" + "regexp" + "strconv" + "strings" + "time" + + "epimetheus/internal/metrics" + "epimetheus/internal/resolver" +) + +// TabularCSVParser parses tabular CSV files where the first row is headers +// and all subsequent rows are data. Numeric columns become metrics, string columns become labels. +type TabularCSVParser struct { + metricName string + timestamp time.Time + resolver *resolver.DNSResolver // DNS resolver for IP addresses + resolveLabels map[string]bool // Set of label names to resolve via DNS +} + +// SaveDNSCache saves the DNS cache to disk (called after successful ingestion) +func (p *TabularCSVParser) SaveDNSCache() { + if p.resolver != nil { + cacheSize := p.resolver.GetCacheSize() + if err := p.resolver.SaveCache(); err != nil { + // Log error but don't fail - cache saving is not critical + log.Printf("⚠️ Failed to save DNS cache: %v", err) + } else if cacheSize > 0 { + log.Printf("💾 Saved DNS cache with %d entries", cacheSize) + } + } +} + +// numericLabelColumns is a list of column names that should be treated as labels +// even if they contain numeric values. This is useful for categorical codes like +// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics. +// +// Column names are matched case-insensitively. +// +// Common examples: +// - responsecode: HTTP response codes (200, 404, 500) +// - statuscode: Application status codes +// - status: Status identifiers +// - code: Generic codes +// - period: Date periods (20160101, 20160102, YYYYMMDD format) +var numericLabelColumns = map[string]bool{ + "responsecode": true, + "statuscode": true, + "status_code": true, + "response_code": true, + "status": true, + "code": true, + "period": true, +} + +// NewTabularCSVParser creates a new tabular CSV parser +// metricName is the name of the metric to create +// timestamp is the timestamp to use for all samples +// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS +func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser { + // Build set for O(1) lookup + resolveMap := make(map[string]bool) + for _, label := range resolveIPLabels { + resolveMap[label] = true + } + + return &TabularCSVParser{ + metricName: metricName, + timestamp: timestamp, + resolver: resolver.NewDNSResolver(), + resolveLabels: resolveMap, + } +} + +// Parse reads metrics from a tabular CSV format +// First row: column headers +// Subsequent rows: data values +// All columns become labels (numeric values are converted to strings) +func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + csvReader.TrimLeadingSpace = true + csvReader.FieldsPerRecord = -1 // Allow variable number of fields + + // Read header row + headers, err := csvReader.Read() + if err != nil { + return nil, fmt.Errorf("failed to read header row: %w", err) + } + + if len(headers) == 0 { + return nil, fmt.Errorf("empty header row") + } + + // Trim whitespace and sanitize headers for Prometheus label compatibility + for i := range headers { + headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i])) + } + + // Read data rows + rowNum := 1 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", rowNum+1, err) + } + rowNum++ + + if len(record) != len(headers) { + // Skip rows with mismatched column counts + continue + } + + // Add microsecond offset to ensure unique timestamps per row + // This prevents duplicate timestamp errors when multiple rows + // have the same label combinations + rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond) + + rowSamples := p.parseRecord(headers, record, rowTimestamp) + samples = append(samples, rowSamples...) + } + + return samples, nil +} + +// parseRecord converts a single CSV row into multiple Samples +// Numeric columns become separate metrics, string columns become labels. +// +// Exception: Column names in numericLabelColumns are always treated as labels, +// even if they contain numbers (e.g., responsecode: 200, 404, 500). +// +// DNS Resolution: Label values matching IP addresses in resolveLabels will be +// resolved to hostnames via reverse DNS lookup. +func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample { + var samples []metrics.Sample + + // First pass: identify numeric vs string columns and collect label values + labels := make(map[string]string) + var numericCols []struct { + name string + value float64 + } + + for i, header := range headers { + if i >= len(record) { + continue + } + + value := strings.TrimSpace(record[i]) + + // Check if this column is in the exception list (should be label even if numeric) + headerLower := strings.ToLower(header) + if numericLabelColumns[headerLower] { + // This column should always be a label, even if it's numeric + labels[header] = value + continue + } + + // Try to parse as number + if floatVal, err := strconv.ParseFloat(value, 64); err == nil { + // It's a numeric column - will become a metric + numericCols = append(numericCols, struct { + name string + value float64 + }{name: header, value: floatVal}) + } else { + // It's a string column - becomes a label + labels[header] = value + } + } + + // Resolve IP addresses to hostnames + for labelName := range p.resolveLabels { + if ipValue, exists := labels[labelName]; exists { + // Attempt DNS resolution + if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" { + // Replace IP with hostname + labels[labelName] = hostname + } + // If resolution failed, ipValue remains unchanged + } + } + + // Create one sample per numeric column + for _, numCol := range numericCols { + metricName := p.metricName + "_" + numCol.name + // Copy labels for each metric + metricLabels := make(map[string]string, len(labels)) + for k, v := range labels { + metricLabels[k] = v + } + samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp)) + } + + return samples +} + +// sanitizeLabelName converts a string to a valid Prometheus label name +// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]* +// Examples: +// "avg(totaltime)" -> "avg_totaltime" +// "sum(rcv)" -> "sum_rcv" +// "response-code" -> "response_code" +// "123invalid" -> "label_123invalid" +func sanitizeLabelName(name string) string { + if name == "" { + return "unknown" + } + + // Replace invalid characters with underscore + re := regexp.MustCompile(`[^a-zA-Z0-9_]`) + sanitized := re.ReplaceAllString(name, "_") + + // Ensure it starts with a letter or underscore + if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' { + sanitized = "label_" + sanitized + } + + // Remove consecutive underscores + sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_") + + // Remove leading/trailing underscores + sanitized = strings.Trim(sanitized, "_") + + if sanitized == "" { + return "unknown" + } + + return sanitized +} + +// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails +func parseFloat64(s string) float64 { + f, err := strconv.ParseFloat(strings.TrimSpace(s), 64) + if err != nil { + return 0.0 + } + return f +} |
