diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-07 16:32:10 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-07 16:32:10 +0200 |
| commit | 3fd46f3977fb650974e5e936cba362c787c00637 (patch) | |
| tree | b49111ddd0b7af4a007bca6a304dba10efcd88ff /internal/parser | |
reimport this PoC
Diffstat (limited to 'internal/parser')
| -rw-r--r-- | internal/parser/csv.go | 101 | ||||
| -rw-r--r-- | internal/parser/csv_test.go | 175 | ||||
| -rw-r--r-- | internal/parser/json.go | 62 | ||||
| -rw-r--r-- | internal/parser/json_test.go | 177 | ||||
| -rw-r--r-- | internal/parser/parser.go | 56 | ||||
| -rw-r--r-- | internal/parser/parser_test.go | 99 | ||||
| -rw-r--r-- | internal/parser/tabular_csv.go | 256 | ||||
| -rw-r--r-- | internal/parser/tabular_csv_test.go | 469 |
8 files changed, 1395 insertions, 0 deletions
diff --git a/internal/parser/csv.go b/internal/parser/csv.go new file mode 100644 index 0000000..64d16e5 --- /dev/null +++ b/internal/parser/csv.go @@ -0,0 +1,101 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "strconv" + "strings" + "time" + + "epimetheus/internal/metrics" +) + +// CSVParser parses metrics from CSV format +type CSVParser struct{} + +// NewCSVParser creates a new CSV parser +func NewCSVParser() *CSVParser { + return &CSVParser{} +} + +// Parse reads metrics from CSV format +// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms +func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + + lineNum := 0 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", lineNum, err) + } + lineNum++ + + if len(record) < 3 { + continue // Skip invalid records + } + + sample, err := p.parseRecord(record, lineNum) + if err != nil { + continue // Skip records with errors + } + + samples = append(samples, sample) + } + + return samples, nil +} + +func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) { + metricName := strings.TrimSpace(record[0]) + if metricName == "" { + return metrics.Sample{}, fmt.Errorf("empty metric name") + } + + labels := parseLabels(record[1]) + + value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64) + if err != nil { + return metrics.Sample{}, fmt.Errorf("invalid value: %w", err) + } + + timestamp := time.Now() + if len(record) > 3 && record[3] != "" { + timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64) + if err == nil { + timestamp = time.UnixMilli(timestampMs) + } + } + + return metrics.NewSample(metricName, labels, value, timestamp), nil +} + +func parseLabels(labelStr string) map[string]string { + labels := make(map[string]string) + if labelStr == "" { + return labels + } + + labelPairs := strings.Split(labelStr, ";") + for _, pair := range labelPairs { + parts := strings.SplitN(pair, "=", 2) + if len(parts) == 2 { + labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } + } + return labels +} diff --git a/internal/parser/csv_test.go b/internal/parser/csv_test.go new file mode 100644 index 0000000..ffe9034 --- /dev/null +++ b/internal/parser/csv_test.go @@ -0,0 +1,175 @@ +package parser + +import ( + "context" + "strings" + "testing" + "time" +) + +func TestCSVParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + wantCount int + wantErr bool + }{ + { + name: "valid single line", + input: `test_metric,env=prod;host=server1,42.5,1234567890000`, + wantCount: 1, + wantErr: false, + }, + { + name: "multiple lines", + input: `metric1,label1=value1,100,1234567890000 +metric2,label2=value2,200,1234567891000 +metric3,label3=value3,300,1234567892000`, + wantCount: 3, + wantErr: false, + }, + { + name: "with comments", + input: `# This is a comment +metric1,env=test,50,1234567890000 +# Another comment +metric2,env=prod,75,1234567891000`, + wantCount: 2, + wantErr: false, + }, + { + name: "no timestamp defaults to now", + input: `metric1,env=test,100`, + wantCount: 1, + wantErr: false, + }, + { + name: "no labels", + input: `metric1,,100,1234567890000`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty input", + input: "", + wantCount: 0, + wantErr: false, + }, + { + name: "invalid line causes error", + input: `metric1,env=test,100,1234567890000 +invalid +metric2,env=prod,200,1234567891000`, + wantCount: 0, + wantErr: true, + }, + { + name: "invalid value skipped", + input: `metric1,env=test,not_a_number,1234567890000 +metric2,env=prod,200,1234567891000`, + wantCount: 1, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := NewCSVParser() + reader := strings.NewReader(tt.input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(samples) != tt.wantCount { + t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount) + } + }) + } +} + +func TestCSVParser_ParseLabels(t *testing.T) { + tests := []struct { + name string + input string + want map[string]string + }{ + { + name: "single label", + input: "env=prod", + want: map[string]string{"env": "prod"}, + }, + { + name: "multiple labels", + input: "env=prod;host=server1;region=us-west", + want: map[string]string{"env": "prod", "host": "server1", "region": "us-west"}, + }, + { + name: "empty string", + input: "", + want: map[string]string{}, + }, + { + name: "invalid label format skipped", + input: "env=prod;invalid;host=server1", + want: map[string]string{"env": "prod", "host": "server1"}, + }, + { + name: "with spaces", + input: " env = prod ; host = server1 ", + want: map[string]string{"env": "prod", "host": "server1"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseLabels(tt.input) + if len(got) != len(tt.want) { + t.Errorf("parseLabels() returned %d labels, want %d", len(got), len(tt.want)) + } + for k, v := range tt.want { + if got[k] != v { + t.Errorf("parseLabels()[%s] = %v, want %v", k, got[k], v) + } + } + }) + } +} + +func TestCSVParser_ParseWithContext(t *testing.T) { + t.Run("context cancellation", func(t *testing.T) { + parser := NewCSVParser() + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + input := strings.NewReader(`metric1,env=test,100,1234567890000`) + _, err := parser.Parse(ctx, input) + + if err != context.Canceled { + t.Errorf("Expected context.Canceled error, got %v", err) + } + }) +} + +func TestCSVParser_ParseTimestamp(t *testing.T) { + parser := NewCSVParser() + input := `metric1,env=test,100,1234567890000` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedTime := time.UnixMilli(1234567890000) + if !samples[0].Timestamp.Equal(expectedTime) { + t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime) + } +} diff --git a/internal/parser/json.go b/internal/parser/json.go new file mode 100644 index 0000000..3b8c2e8 --- /dev/null +++ b/internal/parser/json.go @@ -0,0 +1,62 @@ +package parser + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "epimetheus/internal/metrics" +) + +// JSONParser parses metrics from JSON format +type JSONParser struct{} + +// NewJSONParser creates a new JSON parser +func NewJSONParser() *JSONParser { + return &JSONParser{} +} + +type jsonSample struct { + Metric string `json:"metric"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + TimestampMs int64 `json:"timestamp_ms,omitempty"` +} + +// Parse reads metrics from JSON format +func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var rawSamples []jsonSample + + decoder := json.NewDecoder(reader) + if err := decoder.Decode(&rawSamples); err != nil { + return nil, fmt.Errorf("failed to parse JSON: %w", err) + } + + samples := make([]metrics.Sample, 0, len(rawSamples)) + for _, raw := range rawSamples { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if raw.Metric == "" { + continue + } + + timestamp := time.Now() + if raw.TimestampMs > 0 { + timestamp = time.UnixMilli(raw.TimestampMs) + } + + if raw.Labels == nil { + raw.Labels = make(map[string]string) + } + + samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp)) + } + + return samples, nil +} diff --git a/internal/parser/json_test.go b/internal/parser/json_test.go new file mode 100644 index 0000000..d521942 --- /dev/null +++ b/internal/parser/json_test.go @@ -0,0 +1,177 @@ +package parser + +import ( + "context" + "strings" + "testing" + "time" +) + +func TestJSONParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + wantCount int + wantErr bool + }{ + { + name: "valid single sample", + input: `[ + { + "metric": "test_metric", + "labels": {"env": "prod", "host": "server1"}, + "value": 42.5, + "timestamp_ms": 1234567890000 + } + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "multiple samples", + input: `[ + {"metric": "metric1", "labels": {"env": "prod"}, "value": 100, "timestamp_ms": 1234567890000}, + {"metric": "metric2", "labels": {"env": "test"}, "value": 200, "timestamp_ms": 1234567891000}, + {"metric": "metric3", "labels": {"env": "dev"}, "value": 300, "timestamp_ms": 1234567892000} + ]`, + wantCount: 3, + wantErr: false, + }, + { + name: "no timestamp defaults to now", + input: `[ + {"metric": "test_metric", "labels": {"env": "prod"}, "value": 100} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "no labels", + input: `[ + {"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty metric skipped", + input: `[ + {"metric": "", "labels": {"env": "prod"}, "value": 100}, + {"metric": "valid_metric", "labels": {"env": "test"}, "value": 200} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty array", + input: `[]`, + wantCount: 0, + wantErr: false, + }, + { + name: "invalid json", + input: `{not valid json}`, + wantCount: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := NewJSONParser() + reader := strings.NewReader(tt.input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(samples) != tt.wantCount { + t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount) + } + }) + } +} + +func TestJSONParser_ParseWithContext(t *testing.T) { + t.Run("context check during parse", func(t *testing.T) { + parser := NewJSONParser() + ctx, cancel := context.WithCancel(context.Background()) + + // Create valid input with empty metrics that will be filtered + input := `[ + {"metric": "", "value": 1}, + {"metric": "", "value": 2}, + {"metric": "", "value": 3} + ]` + + cancel() // Cancel before parsing + + reader := strings.NewReader(input) + _, err := parser.Parse(ctx, reader) + + // Context cancellation should be detected during sample processing + if err != context.Canceled { + // Note: JSON decoder may finish before context is checked + // This test verifies context support exists, but timing is not guaranteed + t.Logf("Got error: %v (context may not be checked until after JSON decode)", err) + } + }) +} + +func TestJSONParser_ParseTimestamp(t *testing.T) { + parser := NewJSONParser() + input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedTime := time.UnixMilli(1234567890000) + if !samples[0].Timestamp.Equal(expectedTime) { + t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime) + } +} + +func TestJSONParser_ParseLabels(t *testing.T) { + parser := NewJSONParser() + input := `[{ + "metric": "test_metric", + "labels": {"env": "prod", "host": "server1", "region": "us-west"}, + "value": 100 + }]` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedLabels := map[string]string{ + "env": "prod", + "host": "server1", + "region": "us-west", + } + + if len(samples[0].Labels) != len(expectedLabels) { + t.Errorf("Got %d labels, want %d", len(samples[0].Labels), len(expectedLabels)) + } + + for k, v := range expectedLabels { + if samples[0].Labels[k] != v { + t.Errorf("Label[%s] = %v, want %v", k, samples[0].Labels[k], v) + } + } +} diff --git a/internal/parser/parser.go b/internal/parser/parser.go new file mode 100644 index 0000000..860baa3 --- /dev/null +++ b/internal/parser/parser.go @@ -0,0 +1,56 @@ +package parser + +import ( + "context" + "fmt" + "io" + "os" + + "epimetheus/internal/metrics" +) + +// Parser defines the interface for metric parsers. +type Parser interface { + Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) +} + +// ParseFile parses metrics from a file. +func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + return parseWithFormat(ctx, file, format) +} + +// ParseStdin parses metrics from standard input. +func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) { + return parseWithFormat(ctx, os.Stdin, format) +} + +// parseWithFormat parses metrics using the specified format. +func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) { + var parser Parser + + switch format { + case "csv": + parser = NewCSVParser() + case "json": + parser = NewJSONParser() + default: + return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format) + } + + samples, err := parser.Parse(ctx, reader) + if err != nil { + return nil, fmt.Errorf("failed to parse metrics: %w", err) + } + + if len(samples) == 0 { + return nil, fmt.Errorf("no valid samples found") + } + + return samples, nil +} diff --git a/internal/parser/parser_test.go b/internal/parser/parser_test.go new file mode 100644 index 0000000..05255a5 --- /dev/null +++ b/internal/parser/parser_test.go @@ -0,0 +1,99 @@ +package parser + +import ( + "context" + "strings" + "testing" +) + +func TestParseFile_CSV(t *testing.T) { + // We can't easily test file operations without creating temp files + // So we'll test the error case + ctx := context.Background() + _, err := ParseFile(ctx, "/nonexistent/file.csv", "csv") + + if err == nil { + t.Error("Expected error for nonexistent file") + } +} + +func TestParseWithFormat_CSV(t *testing.T) { + ctx := context.Background() + input := `test_metric,env=prod,100,1234567890000` + reader := strings.NewReader(input) + + samples, err := parseWithFormat(ctx, reader, "csv") + if err != nil { + t.Fatalf("parseWithFormat(csv) error = %v", err) + } + if len(samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(samples)) + } +} + +func TestParseWithFormat_JSON(t *testing.T) { + ctx := context.Background() + input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]` + reader := strings.NewReader(input) + + samples, err := parseWithFormat(ctx, reader, "json") + if err != nil { + t.Fatalf("parseWithFormat(json) error = %v", err) + } + if len(samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(samples)) + } +} + +func TestParseWithFormat_UnsupportedFormat(t *testing.T) { + ctx := context.Background() + reader := strings.NewReader("") + + _, err := parseWithFormat(ctx, reader, "xml") + if err == nil { + t.Error("Expected error for unsupported format") + } + if err.Error() != "unsupported format: xml (use csv or json)" { + t.Errorf("Unexpected error message: %v", err) + } +} + +func TestParseWithFormat_EmptyResult(t *testing.T) { + ctx := context.Background() + input := `[]` // Empty JSON array + reader := strings.NewReader(input) + + _, err := parseWithFormat(ctx, reader, "json") + if err == nil { + t.Error("Expected error for empty samples") + } + if err.Error() != "no valid samples found" { + t.Errorf("Expected 'no valid samples found' error, got: %v", err) + } +} + +func TestParseStdin_Format(t *testing.T) { + // We can't easily test stdin without mocking, + // but we can verify the error path + ctx := context.Background() + + // Test with invalid format + _, err := parseWithFormat(ctx, strings.NewReader(""), "invalid_format") + if err == nil { + t.Error("Expected error for invalid format") + } +} + +func TestNewCSVParser(t *testing.T) { + parser := NewCSVParser() + if parser == nil { + t.Error("NewCSVParser() returned nil") + } +} + +func TestNewJSONParser(t *testing.T) { + parser := NewJSONParser() + if parser == nil { + t.Error("NewJSONParser() returned nil") + } +} diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go new file mode 100644 index 0000000..bfa4fbe --- /dev/null +++ b/internal/parser/tabular_csv.go @@ -0,0 +1,256 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "log" + "regexp" + "strconv" + "strings" + "time" + + "epimetheus/internal/metrics" + "epimetheus/internal/resolver" +) + +// TabularCSVParser parses tabular CSV files where the first row is headers +// and all subsequent rows are data. Numeric columns become metrics, string columns become labels. +type TabularCSVParser struct { + metricName string + timestamp time.Time + resolver *resolver.DNSResolver // DNS resolver for IP addresses + resolveLabels map[string]bool // Set of label names to resolve via DNS +} + +// SaveDNSCache saves the DNS cache to disk (called after successful ingestion) +func (p *TabularCSVParser) SaveDNSCache() { + if p.resolver != nil { + cacheSize := p.resolver.GetCacheSize() + if err := p.resolver.SaveCache(); err != nil { + // Log error but don't fail - cache saving is not critical + log.Printf("⚠️ Failed to save DNS cache: %v", err) + } else if cacheSize > 0 { + log.Printf("💾 Saved DNS cache with %d entries", cacheSize) + } + } +} + +// numericLabelColumns is a list of column names that should be treated as labels +// even if they contain numeric values. This is useful for categorical codes like +// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics. +// +// Column names are matched case-insensitively. +// +// Common examples: +// - responsecode: HTTP response codes (200, 404, 500) +// - statuscode: Application status codes +// - status: Status identifiers +// - code: Generic codes +// - period: Date periods (20160101, 20160102, YYYYMMDD format) +var numericLabelColumns = map[string]bool{ + "responsecode": true, + "statuscode": true, + "status_code": true, + "response_code": true, + "status": true, + "code": true, + "period": true, +} + +// NewTabularCSVParser creates a new tabular CSV parser +// metricName is the name of the metric to create +// timestamp is the timestamp to use for all samples +// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS +func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser { + // Build set for O(1) lookup + resolveMap := make(map[string]bool) + for _, label := range resolveIPLabels { + resolveMap[label] = true + } + + return &TabularCSVParser{ + metricName: metricName, + timestamp: timestamp, + resolver: resolver.NewDNSResolver(), + resolveLabels: resolveMap, + } +} + +// Parse reads metrics from a tabular CSV format +// First row: column headers +// Subsequent rows: data values +// All columns become labels (numeric values are converted to strings) +func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + csvReader.TrimLeadingSpace = true + csvReader.FieldsPerRecord = -1 // Allow variable number of fields + + // Read header row + headers, err := csvReader.Read() + if err != nil { + return nil, fmt.Errorf("failed to read header row: %w", err) + } + + if len(headers) == 0 { + return nil, fmt.Errorf("empty header row") + } + + // Trim whitespace and sanitize headers for Prometheus label compatibility + for i := range headers { + headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i])) + } + + // Read data rows + rowNum := 1 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", rowNum+1, err) + } + rowNum++ + + if len(record) != len(headers) { + // Skip rows with mismatched column counts + continue + } + + // Add microsecond offset to ensure unique timestamps per row + // This prevents duplicate timestamp errors when multiple rows + // have the same label combinations + rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond) + + rowSamples := p.parseRecord(headers, record, rowTimestamp) + samples = append(samples, rowSamples...) + } + + return samples, nil +} + +// parseRecord converts a single CSV row into multiple Samples +// Numeric columns become separate metrics, string columns become labels. +// +// Exception: Column names in numericLabelColumns are always treated as labels, +// even if they contain numbers (e.g., responsecode: 200, 404, 500). +// +// DNS Resolution: Label values matching IP addresses in resolveLabels will be +// resolved to hostnames via reverse DNS lookup. +func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample { + var samples []metrics.Sample + + // First pass: identify numeric vs string columns and collect label values + labels := make(map[string]string) + var numericCols []struct { + name string + value float64 + } + + for i, header := range headers { + if i >= len(record) { + continue + } + + value := strings.TrimSpace(record[i]) + + // Check if this column is in the exception list (should be label even if numeric) + headerLower := strings.ToLower(header) + if numericLabelColumns[headerLower] { + // This column should always be a label, even if it's numeric + labels[header] = value + continue + } + + // Try to parse as number + if floatVal, err := strconv.ParseFloat(value, 64); err == nil { + // It's a numeric column - will become a metric + numericCols = append(numericCols, struct { + name string + value float64 + }{name: header, value: floatVal}) + } else { + // It's a string column - becomes a label + labels[header] = value + } + } + + // Resolve IP addresses to hostnames + for labelName := range p.resolveLabels { + if ipValue, exists := labels[labelName]; exists { + // Attempt DNS resolution + if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" { + // Replace IP with hostname + labels[labelName] = hostname + } + // If resolution failed, ipValue remains unchanged + } + } + + // Create one sample per numeric column + for _, numCol := range numericCols { + metricName := p.metricName + "_" + numCol.name + // Copy labels for each metric + metricLabels := make(map[string]string, len(labels)) + for k, v := range labels { + metricLabels[k] = v + } + samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp)) + } + + return samples +} + +// sanitizeLabelName converts a string to a valid Prometheus label name +// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]* +// Examples: +// "avg(totaltime)" -> "avg_totaltime" +// "sum(rcv)" -> "sum_rcv" +// "response-code" -> "response_code" +// "123invalid" -> "label_123invalid" +func sanitizeLabelName(name string) string { + if name == "" { + return "unknown" + } + + // Replace invalid characters with underscore + re := regexp.MustCompile(`[^a-zA-Z0-9_]`) + sanitized := re.ReplaceAllString(name, "_") + + // Ensure it starts with a letter or underscore + if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' { + sanitized = "label_" + sanitized + } + + // Remove consecutive underscores + sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_") + + // Remove leading/trailing underscores + sanitized = strings.Trim(sanitized, "_") + + if sanitized == "" { + return "unknown" + } + + return sanitized +} + +// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails +func parseFloat64(s string) float64 { + f, err := strconv.ParseFloat(strings.TrimSpace(s), 64) + if err != nil { + return 0.0 + } + return f +} diff --git a/internal/parser/tabular_csv_test.go b/internal/parser/tabular_csv_test.go new file mode 100644 index 0000000..07818e8 --- /dev/null +++ b/internal/parser/tabular_csv_test.go @@ -0,0 +1,469 @@ +package parser + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "epimetheus/internal/metrics" +) + +func TestTabularCSVParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + metricName string + expectedCount int + wantErr bool + }{ + { + name: "simple tabular CSV with numeric and text columns", + input: `responsecode,httpmethod,user,totaltime +200,GET,alice,50.5 +404,POST,bob,100.2 +500,GET,charlie,75.0`, + metricName: "test_metric", + expectedCount: 6, // 3 rows * 2 numeric columns (responsecode, totaltime) + wantErr: false, + }, + { + name: "CSV with mixed data types", + input: `col1,col2,col3 +1,text,3.14 +2,more,2.71 +3,data,1.41`, + metricName: "mixed_metric", + expectedCount: 6, // 3 rows * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "CSV with whitespace", + input: ` col1 , col2 , col3 + 1 , value2 , 3 + 4 , value5 , 6 `, + metricName: "whitespace_metric", + expectedCount: 4, // 2 rows * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "CSV with comments", + input: `# This is a comment +col1,col2,col3 +# Another comment +1,value2,3`, + metricName: "comment_metric", + expectedCount: 2, // 1 row * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "empty CSV", + input: "", + metricName: "empty_metric", + expectedCount: 0, + wantErr: true, // No header row + }, + { + name: "header only", + input: `col1,col2,col3 +`, + metricName: "header_only", + expectedCount: 0, + wantErr: false, + }, + { + name: "mismatched columns - skipped", + input: `col1,col2,col3 +1,value2,3 +value4,value5 +6,value7,8`, + metricName: "mismatched_metric", + expectedCount: 4, // 2 matching rows * 2 numeric columns + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + parser := NewTabularCSVParser(tt.metricName, timestamp, []string{}) // No DNS resolution + + reader := strings.NewReader(tt.input) + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err == nil && len(samples) != tt.expectedCount { + t.Errorf("Parse() got %d samples, want %d", len(samples), tt.expectedCount) + } + + // Verify all samples have the correct base metric name and timestamp + for _, sample := range samples { + if !strings.HasPrefix(sample.MetricName, tt.metricName+"_") { + t.Errorf("Sample metric name = %s, want prefix %s_", sample.MetricName, tt.metricName) + } + if !sample.Timestamp.Equal(timestamp) { + t.Errorf("Sample timestamp = %v, want %v", sample.Timestamp, timestamp) + } + } + }) + } +} + +func TestTabularCSVParser_Labels(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + input := `responsecode,httpmethod,user,totaltime +200,GET,alice,50.5 +404,POST,bob,100.2` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 2 rows * 2 numeric columns (responsecode, totaltime) = 4 samples + if len(samples) != 4 { + t.Fatalf("Expected 4 samples, got %d", len(samples)) + } + + // Find the responsecode and totaltime metrics for first row + var responsecodeMetric, totaltimeMetric *metrics.Sample + for i := range samples { + if samples[i].Labels["httpmethod"] == "GET" && samples[i].Labels["user"] == "alice" { + if strings.HasSuffix(samples[i].MetricName, "_responsecode") { + responsecodeMetric = &samples[i] + } else if strings.HasSuffix(samples[i].MetricName, "_totaltime") { + totaltimeMetric = &samples[i] + } + } + } + + if responsecodeMetric == nil || totaltimeMetric == nil { + t.Fatalf("Could not find expected metrics") + } + + // Check responsecode metric + if responsecodeMetric.Value != 200 { + t.Errorf("responsecode value = %f, want 200", responsecodeMetric.Value) + } + expectedLabels := map[string]string{ + "httpmethod": "GET", + "user": "alice", + } + for key, expectedValue := range expectedLabels { + if actualValue, ok := responsecodeMetric.Labels[key]; !ok { + t.Errorf("responsecode metric missing label %s", key) + } else if actualValue != expectedValue { + t.Errorf("responsecode metric label %s = %s, want %s", key, actualValue, expectedValue) + } + } + + // Check totaltime metric + if totaltimeMetric.Value != 50.5 { + t.Errorf("totaltime value = %f, want 50.5", totaltimeMetric.Value) + } + for key, expectedValue := range expectedLabels { + if actualValue, ok := totaltimeMetric.Labels[key]; !ok { + t.Errorf("totaltime metric missing label %s", key) + } else if actualValue != expectedValue { + t.Errorf("totaltime metric label %s = %s, want %s", key, actualValue, expectedValue) + } + } +} + +func TestTabularCSVParser_ContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + timestamp := time.Now() + input := `col1,col2,col3 +value1,value2,value3 +value4,value5,value6` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + _, err := parser.Parse(ctx, reader) + + if err != context.Canceled { + t.Errorf("Expected context.Canceled error, got %v", err) + } +} + +func TestTabularCSVParser_LargeFile(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Generate a CSV with 1000 rows, 1 numeric column + var builder strings.Builder + builder.WriteString("col1,col2,col3\n") + for i := 0; i < 1000; i++ { + builder.WriteString(fmt.Sprintf("%d,value2,value3\n", i)) + } + + parser := NewTabularCSVParser("large_metric", timestamp, []string{}) + reader := strings.NewReader(builder.String()) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 1000 rows * 1 numeric column = 1000 samples + if len(samples) != 1000 { + t.Errorf("Expected 1000 samples, got %d", len(samples)) + } +} + +func TestSanitizeLabelName(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {"avg(totaltime)", "avg_totaltime"}, + {"sum(rcv)", "sum_rcv"}, + {"response-code", "response_code"}, + {"valid_label", "valid_label"}, + {"ValidLabel123", "ValidLabel123"}, + {"123invalid", "label_123invalid"}, + {"label__with___underscores", "label_with_underscores"}, + {"_leading", "leading"}, + {"trailing_", "trailing"}, + {"special!@#$chars", "special_chars"}, + {"", "unknown"}, + {"___", "unknown"}, + {"http.method", "http_method"}, + {"status_code", "status_code"}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result := sanitizeLabelName(tt.input) + if result != tt.expected { + t.Errorf("sanitizeLabelName(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} + +func TestTabularCSVParser_WithSpecialCharacterHeaders(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + input := `avg(totaltime),sum(rcv),response-code,http.method +50.5,1102,200,GET +100.2,2204,404,POST` + + parser := NewTabularCSVParser("special_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 2 rows * 3 numeric columns (avg(totaltime), sum(rcv), response-code) = 6 samples + if len(samples) != 6 { + t.Fatalf("Expected 6 samples, got %d", len(samples)) + } + + // Check that metric names are sanitized + expectedMetrics := []string{"special_metric_avg_totaltime", "special_metric_sum_rcv", "special_metric_response_code"} + foundMetrics := make(map[string]bool) + for _, sample := range samples { + foundMetrics[sample.MetricName] = true + } + for _, expected := range expectedMetrics { + if !foundMetrics[expected] { + t.Errorf("Missing expected metric %s", expected) + } + } + + // Check that string column becomes label (http.method -> http_method) + for _, sample := range samples { + if _, ok := sample.Labels["http_method"]; !ok { + t.Errorf("Sample missing sanitized label http_method") + } + } + + // Check values are correct for first row + for _, sample := range samples { + if sample.Labels["http_method"] == "GET" { + if strings.HasSuffix(sample.MetricName, "_avg_totaltime") { + if sample.Value != 50.5 { + t.Errorf("avg_totaltime value = %f, want 50.5", sample.Value) + } + } else if strings.HasSuffix(sample.MetricName, "_sum_rcv") { + if sample.Value != 1102 { + t.Errorf("sum_rcv value = %f, want 1102", sample.Value) + } + } else if strings.HasSuffix(sample.MetricName, "_response_code") { + if sample.Value != 200 { + t.Errorf("response_code value = %f, want 200", sample.Value) + } + } + } + } +} + +func TestTabularCSVParser_DNSResolution(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Test with localhost IP which should resolve on most systems + input := `ip,responsecode,count +127.0.0.1,200,100` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // Should have 1 sample (count metric) + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // The ip label should either be resolved to a hostname or remain as IP if DNS failed + ipLabel := samples[0].Labels["ip"] + if ipLabel == "" { + t.Error("ip label is empty") + } + + // If resolution succeeded, it should not be the original IP + // If it failed, it should still be the IP + t.Logf("IP label value: %s (original: 127.0.0.1)", ipLabel) +} + +func TestTabularCSVParser_DNSResolutionDisabled(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + input := `ip,responsecode,count +192.168.1.1,200,100` + + // Pass empty slice - no DNS resolution + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // IP should remain unchanged + ipLabel := samples[0].Labels["ip"] + if ipLabel != "192.168.1.1" { + t.Errorf("IP label = %s, want 192.168.1.1 (DNS resolution should be disabled)", ipLabel) + } +} + +func TestTabularCSVParser_DNSResolutionMultipleLabels(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + input := `source_ip,dest_ip,count +127.0.0.1,192.168.1.1,100` + + // Resolve both source_ip and dest_ip + parser := NewTabularCSVParser("test_metric", timestamp, []string{"source_ip", "dest_ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // Both labels should be present + if _, ok := samples[0].Labels["source_ip"]; !ok { + t.Error("source_ip label missing") + } + if _, ok := samples[0].Labels["dest_ip"]; !ok { + t.Error("dest_ip label missing") + } + + t.Logf("source_ip: %s", samples[0].Labels["source_ip"]) + t.Logf("dest_ip: %s", samples[0].Labels["dest_ip"]) +} + +func TestTabularCSVParser_DNSResolutionNonIPValue(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // ip column contains non-IP value + input := `ip,responsecode,count +not-an-ip,200,100` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // Non-IP value should remain unchanged + ipLabel := samples[0].Labels["ip"] + if ipLabel != "not-an-ip" { + t.Errorf("IP label = %s, want not-an-ip (should not resolve non-IP values)", ipLabel) + } +} + +func TestTabularCSVParser_DNSResolutionCaching(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Multiple rows with same IP + input := `ip,responsecode,count +127.0.0.1,200,100 +127.0.0.1,404,50 +127.0.0.1,500,5` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // Should have 3 samples + if len(samples) != 3 { + t.Fatalf("Expected 3 samples, got %d", len(samples)) + } + + // All should have same resolved IP (cached result) + firstIP := samples[0].Labels["ip"] + for i, sample := range samples { + if sample.Labels["ip"] != firstIP { + t.Errorf("Sample %d has different IP label: %s vs %s (caching issue)", i, sample.Labels["ip"], firstIP) + } + } + + // Check cache size + if parser.resolver.GetCacheSize() != 1 { + t.Errorf("Expected cache size 1 (one unique IP), got %d", parser.resolver.GetCacheSize()) + } +} |
