summaryrefslogtreecommitdiff
path: root/internal/parser/tabular_csv.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/parser/tabular_csv.go')
-rw-r--r--internal/parser/tabular_csv.go256
1 files changed, 256 insertions, 0 deletions
diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go
new file mode 100644
index 0000000..bfa4fbe
--- /dev/null
+++ b/internal/parser/tabular_csv.go
@@ -0,0 +1,256 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "log"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "epimetheus/internal/metrics"
+ "epimetheus/internal/resolver"
+)
+
+// TabularCSVParser parses tabular CSV files where the first row is headers
+// and all subsequent rows are data. Numeric columns become metrics, string columns become labels.
+type TabularCSVParser struct {
+ metricName string
+ timestamp time.Time
+ resolver *resolver.DNSResolver // DNS resolver for IP addresses
+ resolveLabels map[string]bool // Set of label names to resolve via DNS
+}
+
+// SaveDNSCache saves the DNS cache to disk (called after successful ingestion)
+func (p *TabularCSVParser) SaveDNSCache() {
+ if p.resolver != nil {
+ cacheSize := p.resolver.GetCacheSize()
+ if err := p.resolver.SaveCache(); err != nil {
+ // Log error but don't fail - cache saving is not critical
+ log.Printf("⚠️ Failed to save DNS cache: %v", err)
+ } else if cacheSize > 0 {
+ log.Printf("💾 Saved DNS cache with %d entries", cacheSize)
+ }
+ }
+}
+
+// numericLabelColumns is a list of column names that should be treated as labels
+// even if they contain numeric values. This is useful for categorical codes like
+// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics.
+//
+// Column names are matched case-insensitively.
+//
+// Common examples:
+// - responsecode: HTTP response codes (200, 404, 500)
+// - statuscode: Application status codes
+// - status: Status identifiers
+// - code: Generic codes
+// - period: Date periods (20160101, 20160102, YYYYMMDD format)
+var numericLabelColumns = map[string]bool{
+ "responsecode": true,
+ "statuscode": true,
+ "status_code": true,
+ "response_code": true,
+ "status": true,
+ "code": true,
+ "period": true,
+}
+
+// NewTabularCSVParser creates a new tabular CSV parser
+// metricName is the name of the metric to create
+// timestamp is the timestamp to use for all samples
+// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS
+func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser {
+ // Build set for O(1) lookup
+ resolveMap := make(map[string]bool)
+ for _, label := range resolveIPLabels {
+ resolveMap[label] = true
+ }
+
+ return &TabularCSVParser{
+ metricName: metricName,
+ timestamp: timestamp,
+ resolver: resolver.NewDNSResolver(),
+ resolveLabels: resolveMap,
+ }
+}
+
+// Parse reads metrics from a tabular CSV format
+// First row: column headers
+// Subsequent rows: data values
+// All columns become labels (numeric values are converted to strings)
+func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+ csvReader.TrimLeadingSpace = true
+ csvReader.FieldsPerRecord = -1 // Allow variable number of fields
+
+ // Read header row
+ headers, err := csvReader.Read()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read header row: %w", err)
+ }
+
+ if len(headers) == 0 {
+ return nil, fmt.Errorf("empty header row")
+ }
+
+ // Trim whitespace and sanitize headers for Prometheus label compatibility
+ for i := range headers {
+ headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i]))
+ }
+
+ // Read data rows
+ rowNum := 1
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", rowNum+1, err)
+ }
+ rowNum++
+
+ if len(record) != len(headers) {
+ // Skip rows with mismatched column counts
+ continue
+ }
+
+ // Add microsecond offset to ensure unique timestamps per row
+ // This prevents duplicate timestamp errors when multiple rows
+ // have the same label combinations
+ rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond)
+
+ rowSamples := p.parseRecord(headers, record, rowTimestamp)
+ samples = append(samples, rowSamples...)
+ }
+
+ return samples, nil
+}
+
+// parseRecord converts a single CSV row into multiple Samples
+// Numeric columns become separate metrics, string columns become labels.
+//
+// Exception: Column names in numericLabelColumns are always treated as labels,
+// even if they contain numbers (e.g., responsecode: 200, 404, 500).
+//
+// DNS Resolution: Label values matching IP addresses in resolveLabels will be
+// resolved to hostnames via reverse DNS lookup.
+func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample {
+ var samples []metrics.Sample
+
+ // First pass: identify numeric vs string columns and collect label values
+ labels := make(map[string]string)
+ var numericCols []struct {
+ name string
+ value float64
+ }
+
+ for i, header := range headers {
+ if i >= len(record) {
+ continue
+ }
+
+ value := strings.TrimSpace(record[i])
+
+ // Check if this column is in the exception list (should be label even if numeric)
+ headerLower := strings.ToLower(header)
+ if numericLabelColumns[headerLower] {
+ // This column should always be a label, even if it's numeric
+ labels[header] = value
+ continue
+ }
+
+ // Try to parse as number
+ if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
+ // It's a numeric column - will become a metric
+ numericCols = append(numericCols, struct {
+ name string
+ value float64
+ }{name: header, value: floatVal})
+ } else {
+ // It's a string column - becomes a label
+ labels[header] = value
+ }
+ }
+
+ // Resolve IP addresses to hostnames
+ for labelName := range p.resolveLabels {
+ if ipValue, exists := labels[labelName]; exists {
+ // Attempt DNS resolution
+ if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" {
+ // Replace IP with hostname
+ labels[labelName] = hostname
+ }
+ // If resolution failed, ipValue remains unchanged
+ }
+ }
+
+ // Create one sample per numeric column
+ for _, numCol := range numericCols {
+ metricName := p.metricName + "_" + numCol.name
+ // Copy labels for each metric
+ metricLabels := make(map[string]string, len(labels))
+ for k, v := range labels {
+ metricLabels[k] = v
+ }
+ samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp))
+ }
+
+ return samples
+}
+
+// sanitizeLabelName converts a string to a valid Prometheus label name
+// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]*
+// Examples:
+// "avg(totaltime)" -> "avg_totaltime"
+// "sum(rcv)" -> "sum_rcv"
+// "response-code" -> "response_code"
+// "123invalid" -> "label_123invalid"
+func sanitizeLabelName(name string) string {
+ if name == "" {
+ return "unknown"
+ }
+
+ // Replace invalid characters with underscore
+ re := regexp.MustCompile(`[^a-zA-Z0-9_]`)
+ sanitized := re.ReplaceAllString(name, "_")
+
+ // Ensure it starts with a letter or underscore
+ if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' {
+ sanitized = "label_" + sanitized
+ }
+
+ // Remove consecutive underscores
+ sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_")
+
+ // Remove leading/trailing underscores
+ sanitized = strings.Trim(sanitized, "_")
+
+ if sanitized == "" {
+ return "unknown"
+ }
+
+ return sanitized
+}
+
+// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails
+func parseFloat64(s string) float64 {
+ f, err := strconv.ParseFloat(strings.TrimSpace(s), 64)
+ if err != nil {
+ return 0.0
+ }
+ return f
+}