summaryrefslogtreecommitdiff
path: root/internal/parser
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-07 16:32:10 +0200
committerPaul Buetow <paul@buetow.org>2026-02-07 16:32:10 +0200
commit3fd46f3977fb650974e5e936cba362c787c00637 (patch)
treeb49111ddd0b7af4a007bca6a304dba10efcd88ff /internal/parser
reimport this PoC
Diffstat (limited to 'internal/parser')
-rw-r--r--internal/parser/csv.go101
-rw-r--r--internal/parser/csv_test.go175
-rw-r--r--internal/parser/json.go62
-rw-r--r--internal/parser/json_test.go177
-rw-r--r--internal/parser/parser.go56
-rw-r--r--internal/parser/parser_test.go99
-rw-r--r--internal/parser/tabular_csv.go256
-rw-r--r--internal/parser/tabular_csv_test.go469
8 files changed, 1395 insertions, 0 deletions
diff --git a/internal/parser/csv.go b/internal/parser/csv.go
new file mode 100644
index 0000000..64d16e5
--- /dev/null
+++ b/internal/parser/csv.go
@@ -0,0 +1,101 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+// CSVParser parses metrics from CSV format
+type CSVParser struct{}
+
+// NewCSVParser creates a new CSV parser
+func NewCSVParser() *CSVParser {
+ return &CSVParser{}
+}
+
+// Parse reads metrics from CSV format
+// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms
+func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+
+ lineNum := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", lineNum, err)
+ }
+ lineNum++
+
+ if len(record) < 3 {
+ continue // Skip invalid records
+ }
+
+ sample, err := p.parseRecord(record, lineNum)
+ if err != nil {
+ continue // Skip records with errors
+ }
+
+ samples = append(samples, sample)
+ }
+
+ return samples, nil
+}
+
+func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) {
+ metricName := strings.TrimSpace(record[0])
+ if metricName == "" {
+ return metrics.Sample{}, fmt.Errorf("empty metric name")
+ }
+
+ labels := parseLabels(record[1])
+
+ value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64)
+ if err != nil {
+ return metrics.Sample{}, fmt.Errorf("invalid value: %w", err)
+ }
+
+ timestamp := time.Now()
+ if len(record) > 3 && record[3] != "" {
+ timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64)
+ if err == nil {
+ timestamp = time.UnixMilli(timestampMs)
+ }
+ }
+
+ return metrics.NewSample(metricName, labels, value, timestamp), nil
+}
+
+func parseLabels(labelStr string) map[string]string {
+ labels := make(map[string]string)
+ if labelStr == "" {
+ return labels
+ }
+
+ labelPairs := strings.Split(labelStr, ";")
+ for _, pair := range labelPairs {
+ parts := strings.SplitN(pair, "=", 2)
+ if len(parts) == 2 {
+ labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
+ }
+ }
+ return labels
+}
diff --git a/internal/parser/csv_test.go b/internal/parser/csv_test.go
new file mode 100644
index 0000000..ffe9034
--- /dev/null
+++ b/internal/parser/csv_test.go
@@ -0,0 +1,175 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestCSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantCount int
+ wantErr bool
+ }{
+ {
+ name: "valid single line",
+ input: `test_metric,env=prod;host=server1,42.5,1234567890000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "multiple lines",
+ input: `metric1,label1=value1,100,1234567890000
+metric2,label2=value2,200,1234567891000
+metric3,label3=value3,300,1234567892000`,
+ wantCount: 3,
+ wantErr: false,
+ },
+ {
+ name: "with comments",
+ input: `# This is a comment
+metric1,env=test,50,1234567890000
+# Another comment
+metric2,env=prod,75,1234567891000`,
+ wantCount: 2,
+ wantErr: false,
+ },
+ {
+ name: "no timestamp defaults to now",
+ input: `metric1,env=test,100`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "no labels",
+ input: `metric1,,100,1234567890000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty input",
+ input: "",
+ wantCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "invalid line causes error",
+ input: `metric1,env=test,100,1234567890000
+invalid
+metric2,env=prod,200,1234567891000`,
+ wantCount: 0,
+ wantErr: true,
+ },
+ {
+ name: "invalid value skipped",
+ input: `metric1,env=test,not_a_number,1234567890000
+metric2,env=prod,200,1234567891000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ parser := NewCSVParser()
+ reader := strings.NewReader(tt.input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if len(samples) != tt.wantCount {
+ t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount)
+ }
+ })
+ }
+}
+
+func TestCSVParser_ParseLabels(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ want map[string]string
+ }{
+ {
+ name: "single label",
+ input: "env=prod",
+ want: map[string]string{"env": "prod"},
+ },
+ {
+ name: "multiple labels",
+ input: "env=prod;host=server1;region=us-west",
+ want: map[string]string{"env": "prod", "host": "server1", "region": "us-west"},
+ },
+ {
+ name: "empty string",
+ input: "",
+ want: map[string]string{},
+ },
+ {
+ name: "invalid label format skipped",
+ input: "env=prod;invalid;host=server1",
+ want: map[string]string{"env": "prod", "host": "server1"},
+ },
+ {
+ name: "with spaces",
+ input: " env = prod ; host = server1 ",
+ want: map[string]string{"env": "prod", "host": "server1"},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := parseLabels(tt.input)
+ if len(got) != len(tt.want) {
+ t.Errorf("parseLabels() returned %d labels, want %d", len(got), len(tt.want))
+ }
+ for k, v := range tt.want {
+ if got[k] != v {
+ t.Errorf("parseLabels()[%s] = %v, want %v", k, got[k], v)
+ }
+ }
+ })
+ }
+}
+
+func TestCSVParser_ParseWithContext(t *testing.T) {
+ t.Run("context cancellation", func(t *testing.T) {
+ parser := NewCSVParser()
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel() // Cancel immediately
+
+ input := strings.NewReader(`metric1,env=test,100,1234567890000`)
+ _, err := parser.Parse(ctx, input)
+
+ if err != context.Canceled {
+ t.Errorf("Expected context.Canceled error, got %v", err)
+ }
+ })
+}
+
+func TestCSVParser_ParseTimestamp(t *testing.T) {
+ parser := NewCSVParser()
+ input := `metric1,env=test,100,1234567890000`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedTime := time.UnixMilli(1234567890000)
+ if !samples[0].Timestamp.Equal(expectedTime) {
+ t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime)
+ }
+}
diff --git a/internal/parser/json.go b/internal/parser/json.go
new file mode 100644
index 0000000..3b8c2e8
--- /dev/null
+++ b/internal/parser/json.go
@@ -0,0 +1,62 @@
+package parser
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+// JSONParser parses metrics from JSON format
+type JSONParser struct{}
+
+// NewJSONParser creates a new JSON parser
+func NewJSONParser() *JSONParser {
+ return &JSONParser{}
+}
+
+type jsonSample struct {
+ Metric string `json:"metric"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ TimestampMs int64 `json:"timestamp_ms,omitempty"`
+}
+
+// Parse reads metrics from JSON format
+func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var rawSamples []jsonSample
+
+ decoder := json.NewDecoder(reader)
+ if err := decoder.Decode(&rawSamples); err != nil {
+ return nil, fmt.Errorf("failed to parse JSON: %w", err)
+ }
+
+ samples := make([]metrics.Sample, 0, len(rawSamples))
+ for _, raw := range rawSamples {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ if raw.Metric == "" {
+ continue
+ }
+
+ timestamp := time.Now()
+ if raw.TimestampMs > 0 {
+ timestamp = time.UnixMilli(raw.TimestampMs)
+ }
+
+ if raw.Labels == nil {
+ raw.Labels = make(map[string]string)
+ }
+
+ samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp))
+ }
+
+ return samples, nil
+}
diff --git a/internal/parser/json_test.go b/internal/parser/json_test.go
new file mode 100644
index 0000000..d521942
--- /dev/null
+++ b/internal/parser/json_test.go
@@ -0,0 +1,177 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestJSONParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantCount int
+ wantErr bool
+ }{
+ {
+ name: "valid single sample",
+ input: `[
+ {
+ "metric": "test_metric",
+ "labels": {"env": "prod", "host": "server1"},
+ "value": 42.5,
+ "timestamp_ms": 1234567890000
+ }
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "multiple samples",
+ input: `[
+ {"metric": "metric1", "labels": {"env": "prod"}, "value": 100, "timestamp_ms": 1234567890000},
+ {"metric": "metric2", "labels": {"env": "test"}, "value": 200, "timestamp_ms": 1234567891000},
+ {"metric": "metric3", "labels": {"env": "dev"}, "value": 300, "timestamp_ms": 1234567892000}
+ ]`,
+ wantCount: 3,
+ wantErr: false,
+ },
+ {
+ name: "no timestamp defaults to now",
+ input: `[
+ {"metric": "test_metric", "labels": {"env": "prod"}, "value": 100}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "no labels",
+ input: `[
+ {"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty metric skipped",
+ input: `[
+ {"metric": "", "labels": {"env": "prod"}, "value": 100},
+ {"metric": "valid_metric", "labels": {"env": "test"}, "value": 200}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty array",
+ input: `[]`,
+ wantCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "invalid json",
+ input: `{not valid json}`,
+ wantCount: 0,
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ parser := NewJSONParser()
+ reader := strings.NewReader(tt.input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if len(samples) != tt.wantCount {
+ t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount)
+ }
+ })
+ }
+}
+
+func TestJSONParser_ParseWithContext(t *testing.T) {
+ t.Run("context check during parse", func(t *testing.T) {
+ parser := NewJSONParser()
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Create valid input with empty metrics that will be filtered
+ input := `[
+ {"metric": "", "value": 1},
+ {"metric": "", "value": 2},
+ {"metric": "", "value": 3}
+ ]`
+
+ cancel() // Cancel before parsing
+
+ reader := strings.NewReader(input)
+ _, err := parser.Parse(ctx, reader)
+
+ // Context cancellation should be detected during sample processing
+ if err != context.Canceled {
+ // Note: JSON decoder may finish before context is checked
+ // This test verifies context support exists, but timing is not guaranteed
+ t.Logf("Got error: %v (context may not be checked until after JSON decode)", err)
+ }
+ })
+}
+
+func TestJSONParser_ParseTimestamp(t *testing.T) {
+ parser := NewJSONParser()
+ input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedTime := time.UnixMilli(1234567890000)
+ if !samples[0].Timestamp.Equal(expectedTime) {
+ t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime)
+ }
+}
+
+func TestJSONParser_ParseLabels(t *testing.T) {
+ parser := NewJSONParser()
+ input := `[{
+ "metric": "test_metric",
+ "labels": {"env": "prod", "host": "server1", "region": "us-west"},
+ "value": 100
+ }]`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedLabels := map[string]string{
+ "env": "prod",
+ "host": "server1",
+ "region": "us-west",
+ }
+
+ if len(samples[0].Labels) != len(expectedLabels) {
+ t.Errorf("Got %d labels, want %d", len(samples[0].Labels), len(expectedLabels))
+ }
+
+ for k, v := range expectedLabels {
+ if samples[0].Labels[k] != v {
+ t.Errorf("Label[%s] = %v, want %v", k, samples[0].Labels[k], v)
+ }
+ }
+}
diff --git a/internal/parser/parser.go b/internal/parser/parser.go
new file mode 100644
index 0000000..860baa3
--- /dev/null
+++ b/internal/parser/parser.go
@@ -0,0 +1,56 @@
+package parser
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+
+ "epimetheus/internal/metrics"
+)
+
+// Parser defines the interface for metric parsers.
+type Parser interface {
+ Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error)
+}
+
+// ParseFile parses metrics from a file.
+func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) {
+ file, err := os.Open(filename)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open file: %w", err)
+ }
+ defer file.Close()
+
+ return parseWithFormat(ctx, file, format)
+}
+
+// ParseStdin parses metrics from standard input.
+func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) {
+ return parseWithFormat(ctx, os.Stdin, format)
+}
+
+// parseWithFormat parses metrics using the specified format.
+func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) {
+ var parser Parser
+
+ switch format {
+ case "csv":
+ parser = NewCSVParser()
+ case "json":
+ parser = NewJSONParser()
+ default:
+ return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format)
+ }
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse metrics: %w", err)
+ }
+
+ if len(samples) == 0 {
+ return nil, fmt.Errorf("no valid samples found")
+ }
+
+ return samples, nil
+}
diff --git a/internal/parser/parser_test.go b/internal/parser/parser_test.go
new file mode 100644
index 0000000..05255a5
--- /dev/null
+++ b/internal/parser/parser_test.go
@@ -0,0 +1,99 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+)
+
+func TestParseFile_CSV(t *testing.T) {
+ // We can't easily test file operations without creating temp files
+ // So we'll test the error case
+ ctx := context.Background()
+ _, err := ParseFile(ctx, "/nonexistent/file.csv", "csv")
+
+ if err == nil {
+ t.Error("Expected error for nonexistent file")
+ }
+}
+
+func TestParseWithFormat_CSV(t *testing.T) {
+ ctx := context.Background()
+ input := `test_metric,env=prod,100,1234567890000`
+ reader := strings.NewReader(input)
+
+ samples, err := parseWithFormat(ctx, reader, "csv")
+ if err != nil {
+ t.Fatalf("parseWithFormat(csv) error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(samples))
+ }
+}
+
+func TestParseWithFormat_JSON(t *testing.T) {
+ ctx := context.Background()
+ input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]`
+ reader := strings.NewReader(input)
+
+ samples, err := parseWithFormat(ctx, reader, "json")
+ if err != nil {
+ t.Fatalf("parseWithFormat(json) error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(samples))
+ }
+}
+
+func TestParseWithFormat_UnsupportedFormat(t *testing.T) {
+ ctx := context.Background()
+ reader := strings.NewReader("")
+
+ _, err := parseWithFormat(ctx, reader, "xml")
+ if err == nil {
+ t.Error("Expected error for unsupported format")
+ }
+ if err.Error() != "unsupported format: xml (use csv or json)" {
+ t.Errorf("Unexpected error message: %v", err)
+ }
+}
+
+func TestParseWithFormat_EmptyResult(t *testing.T) {
+ ctx := context.Background()
+ input := `[]` // Empty JSON array
+ reader := strings.NewReader(input)
+
+ _, err := parseWithFormat(ctx, reader, "json")
+ if err == nil {
+ t.Error("Expected error for empty samples")
+ }
+ if err.Error() != "no valid samples found" {
+ t.Errorf("Expected 'no valid samples found' error, got: %v", err)
+ }
+}
+
+func TestParseStdin_Format(t *testing.T) {
+ // We can't easily test stdin without mocking,
+ // but we can verify the error path
+ ctx := context.Background()
+
+ // Test with invalid format
+ _, err := parseWithFormat(ctx, strings.NewReader(""), "invalid_format")
+ if err == nil {
+ t.Error("Expected error for invalid format")
+ }
+}
+
+func TestNewCSVParser(t *testing.T) {
+ parser := NewCSVParser()
+ if parser == nil {
+ t.Error("NewCSVParser() returned nil")
+ }
+}
+
+func TestNewJSONParser(t *testing.T) {
+ parser := NewJSONParser()
+ if parser == nil {
+ t.Error("NewJSONParser() returned nil")
+ }
+}
diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go
new file mode 100644
index 0000000..bfa4fbe
--- /dev/null
+++ b/internal/parser/tabular_csv.go
@@ -0,0 +1,256 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "log"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "epimetheus/internal/metrics"
+ "epimetheus/internal/resolver"
+)
+
+// TabularCSVParser parses tabular CSV files where the first row is headers
+// and all subsequent rows are data. Numeric columns become metrics, string columns become labels.
+type TabularCSVParser struct {
+ metricName string
+ timestamp time.Time
+ resolver *resolver.DNSResolver // DNS resolver for IP addresses
+ resolveLabels map[string]bool // Set of label names to resolve via DNS
+}
+
+// SaveDNSCache saves the DNS cache to disk (called after successful ingestion)
+func (p *TabularCSVParser) SaveDNSCache() {
+ if p.resolver != nil {
+ cacheSize := p.resolver.GetCacheSize()
+ if err := p.resolver.SaveCache(); err != nil {
+ // Log error but don't fail - cache saving is not critical
+ log.Printf("⚠️ Failed to save DNS cache: %v", err)
+ } else if cacheSize > 0 {
+ log.Printf("💾 Saved DNS cache with %d entries", cacheSize)
+ }
+ }
+}
+
+// numericLabelColumns is a list of column names that should be treated as labels
+// even if they contain numeric values. This is useful for categorical codes like
+// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics.
+//
+// Column names are matched case-insensitively.
+//
+// Common examples:
+// - responsecode: HTTP response codes (200, 404, 500)
+// - statuscode: Application status codes
+// - status: Status identifiers
+// - code: Generic codes
+// - period: Date periods (20160101, 20160102, YYYYMMDD format)
+var numericLabelColumns = map[string]bool{
+ "responsecode": true,
+ "statuscode": true,
+ "status_code": true,
+ "response_code": true,
+ "status": true,
+ "code": true,
+ "period": true,
+}
+
+// NewTabularCSVParser creates a new tabular CSV parser
+// metricName is the name of the metric to create
+// timestamp is the timestamp to use for all samples
+// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS
+func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser {
+ // Build set for O(1) lookup
+ resolveMap := make(map[string]bool)
+ for _, label := range resolveIPLabels {
+ resolveMap[label] = true
+ }
+
+ return &TabularCSVParser{
+ metricName: metricName,
+ timestamp: timestamp,
+ resolver: resolver.NewDNSResolver(),
+ resolveLabels: resolveMap,
+ }
+}
+
+// Parse reads metrics from a tabular CSV format
+// First row: column headers
+// Subsequent rows: data values
+// All columns become labels (numeric values are converted to strings)
+func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+ csvReader.TrimLeadingSpace = true
+ csvReader.FieldsPerRecord = -1 // Allow variable number of fields
+
+ // Read header row
+ headers, err := csvReader.Read()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read header row: %w", err)
+ }
+
+ if len(headers) == 0 {
+ return nil, fmt.Errorf("empty header row")
+ }
+
+ // Trim whitespace and sanitize headers for Prometheus label compatibility
+ for i := range headers {
+ headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i]))
+ }
+
+ // Read data rows
+ rowNum := 1
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", rowNum+1, err)
+ }
+ rowNum++
+
+ if len(record) != len(headers) {
+ // Skip rows with mismatched column counts
+ continue
+ }
+
+ // Add microsecond offset to ensure unique timestamps per row
+ // This prevents duplicate timestamp errors when multiple rows
+ // have the same label combinations
+ rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond)
+
+ rowSamples := p.parseRecord(headers, record, rowTimestamp)
+ samples = append(samples, rowSamples...)
+ }
+
+ return samples, nil
+}
+
+// parseRecord converts a single CSV row into multiple Samples
+// Numeric columns become separate metrics, string columns become labels.
+//
+// Exception: Column names in numericLabelColumns are always treated as labels,
+// even if they contain numbers (e.g., responsecode: 200, 404, 500).
+//
+// DNS Resolution: Label values matching IP addresses in resolveLabels will be
+// resolved to hostnames via reverse DNS lookup.
+func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample {
+ var samples []metrics.Sample
+
+ // First pass: identify numeric vs string columns and collect label values
+ labels := make(map[string]string)
+ var numericCols []struct {
+ name string
+ value float64
+ }
+
+ for i, header := range headers {
+ if i >= len(record) {
+ continue
+ }
+
+ value := strings.TrimSpace(record[i])
+
+ // Check if this column is in the exception list (should be label even if numeric)
+ headerLower := strings.ToLower(header)
+ if numericLabelColumns[headerLower] {
+ // This column should always be a label, even if it's numeric
+ labels[header] = value
+ continue
+ }
+
+ // Try to parse as number
+ if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
+ // It's a numeric column - will become a metric
+ numericCols = append(numericCols, struct {
+ name string
+ value float64
+ }{name: header, value: floatVal})
+ } else {
+ // It's a string column - becomes a label
+ labels[header] = value
+ }
+ }
+
+ // Resolve IP addresses to hostnames
+ for labelName := range p.resolveLabels {
+ if ipValue, exists := labels[labelName]; exists {
+ // Attempt DNS resolution
+ if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" {
+ // Replace IP with hostname
+ labels[labelName] = hostname
+ }
+ // If resolution failed, ipValue remains unchanged
+ }
+ }
+
+ // Create one sample per numeric column
+ for _, numCol := range numericCols {
+ metricName := p.metricName + "_" + numCol.name
+ // Copy labels for each metric
+ metricLabels := make(map[string]string, len(labels))
+ for k, v := range labels {
+ metricLabels[k] = v
+ }
+ samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp))
+ }
+
+ return samples
+}
+
+// sanitizeLabelName converts a string to a valid Prometheus label name
+// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]*
+// Examples:
+// "avg(totaltime)" -> "avg_totaltime"
+// "sum(rcv)" -> "sum_rcv"
+// "response-code" -> "response_code"
+// "123invalid" -> "label_123invalid"
+func sanitizeLabelName(name string) string {
+ if name == "" {
+ return "unknown"
+ }
+
+ // Replace invalid characters with underscore
+ re := regexp.MustCompile(`[^a-zA-Z0-9_]`)
+ sanitized := re.ReplaceAllString(name, "_")
+
+ // Ensure it starts with a letter or underscore
+ if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' {
+ sanitized = "label_" + sanitized
+ }
+
+ // Remove consecutive underscores
+ sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_")
+
+ // Remove leading/trailing underscores
+ sanitized = strings.Trim(sanitized, "_")
+
+ if sanitized == "" {
+ return "unknown"
+ }
+
+ return sanitized
+}
+
+// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails
+func parseFloat64(s string) float64 {
+ f, err := strconv.ParseFloat(strings.TrimSpace(s), 64)
+ if err != nil {
+ return 0.0
+ }
+ return f
+}
diff --git a/internal/parser/tabular_csv_test.go b/internal/parser/tabular_csv_test.go
new file mode 100644
index 0000000..07818e8
--- /dev/null
+++ b/internal/parser/tabular_csv_test.go
@@ -0,0 +1,469 @@
+package parser
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+func TestTabularCSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ metricName string
+ expectedCount int
+ wantErr bool
+ }{
+ {
+ name: "simple tabular CSV with numeric and text columns",
+ input: `responsecode,httpmethod,user,totaltime
+200,GET,alice,50.5
+404,POST,bob,100.2
+500,GET,charlie,75.0`,
+ metricName: "test_metric",
+ expectedCount: 6, // 3 rows * 2 numeric columns (responsecode, totaltime)
+ wantErr: false,
+ },
+ {
+ name: "CSV with mixed data types",
+ input: `col1,col2,col3
+1,text,3.14
+2,more,2.71
+3,data,1.41`,
+ metricName: "mixed_metric",
+ expectedCount: 6, // 3 rows * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "CSV with whitespace",
+ input: ` col1 , col2 , col3
+ 1 , value2 , 3
+ 4 , value5 , 6 `,
+ metricName: "whitespace_metric",
+ expectedCount: 4, // 2 rows * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "CSV with comments",
+ input: `# This is a comment
+col1,col2,col3
+# Another comment
+1,value2,3`,
+ metricName: "comment_metric",
+ expectedCount: 2, // 1 row * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "empty CSV",
+ input: "",
+ metricName: "empty_metric",
+ expectedCount: 0,
+ wantErr: true, // No header row
+ },
+ {
+ name: "header only",
+ input: `col1,col2,col3
+`,
+ metricName: "header_only",
+ expectedCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "mismatched columns - skipped",
+ input: `col1,col2,col3
+1,value2,3
+value4,value5
+6,value7,8`,
+ metricName: "mismatched_metric",
+ expectedCount: 4, // 2 matching rows * 2 numeric columns
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ parser := NewTabularCSVParser(tt.metricName, timestamp, []string{}) // No DNS resolution
+
+ reader := strings.NewReader(tt.input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if err == nil && len(samples) != tt.expectedCount {
+ t.Errorf("Parse() got %d samples, want %d", len(samples), tt.expectedCount)
+ }
+
+ // Verify all samples have the correct base metric name and timestamp
+ for _, sample := range samples {
+ if !strings.HasPrefix(sample.MetricName, tt.metricName+"_") {
+ t.Errorf("Sample metric name = %s, want prefix %s_", sample.MetricName, tt.metricName)
+ }
+ if !sample.Timestamp.Equal(timestamp) {
+ t.Errorf("Sample timestamp = %v, want %v", sample.Timestamp, timestamp)
+ }
+ }
+ })
+ }
+}
+
+func TestTabularCSVParser_Labels(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ input := `responsecode,httpmethod,user,totaltime
+200,GET,alice,50.5
+404,POST,bob,100.2`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 2 rows * 2 numeric columns (responsecode, totaltime) = 4 samples
+ if len(samples) != 4 {
+ t.Fatalf("Expected 4 samples, got %d", len(samples))
+ }
+
+ // Find the responsecode and totaltime metrics for first row
+ var responsecodeMetric, totaltimeMetric *metrics.Sample
+ for i := range samples {
+ if samples[i].Labels["httpmethod"] == "GET" && samples[i].Labels["user"] == "alice" {
+ if strings.HasSuffix(samples[i].MetricName, "_responsecode") {
+ responsecodeMetric = &samples[i]
+ } else if strings.HasSuffix(samples[i].MetricName, "_totaltime") {
+ totaltimeMetric = &samples[i]
+ }
+ }
+ }
+
+ if responsecodeMetric == nil || totaltimeMetric == nil {
+ t.Fatalf("Could not find expected metrics")
+ }
+
+ // Check responsecode metric
+ if responsecodeMetric.Value != 200 {
+ t.Errorf("responsecode value = %f, want 200", responsecodeMetric.Value)
+ }
+ expectedLabels := map[string]string{
+ "httpmethod": "GET",
+ "user": "alice",
+ }
+ for key, expectedValue := range expectedLabels {
+ if actualValue, ok := responsecodeMetric.Labels[key]; !ok {
+ t.Errorf("responsecode metric missing label %s", key)
+ } else if actualValue != expectedValue {
+ t.Errorf("responsecode metric label %s = %s, want %s", key, actualValue, expectedValue)
+ }
+ }
+
+ // Check totaltime metric
+ if totaltimeMetric.Value != 50.5 {
+ t.Errorf("totaltime value = %f, want 50.5", totaltimeMetric.Value)
+ }
+ for key, expectedValue := range expectedLabels {
+ if actualValue, ok := totaltimeMetric.Labels[key]; !ok {
+ t.Errorf("totaltime metric missing label %s", key)
+ } else if actualValue != expectedValue {
+ t.Errorf("totaltime metric label %s = %s, want %s", key, actualValue, expectedValue)
+ }
+ }
+}
+
+func TestTabularCSVParser_ContextCancellation(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel() // Cancel immediately
+
+ timestamp := time.Now()
+ input := `col1,col2,col3
+value1,value2,value3
+value4,value5,value6`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ _, err := parser.Parse(ctx, reader)
+
+ if err != context.Canceled {
+ t.Errorf("Expected context.Canceled error, got %v", err)
+ }
+}
+
+func TestTabularCSVParser_LargeFile(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Generate a CSV with 1000 rows, 1 numeric column
+ var builder strings.Builder
+ builder.WriteString("col1,col2,col3\n")
+ for i := 0; i < 1000; i++ {
+ builder.WriteString(fmt.Sprintf("%d,value2,value3\n", i))
+ }
+
+ parser := NewTabularCSVParser("large_metric", timestamp, []string{})
+ reader := strings.NewReader(builder.String())
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 1000 rows * 1 numeric column = 1000 samples
+ if len(samples) != 1000 {
+ t.Errorf("Expected 1000 samples, got %d", len(samples))
+ }
+}
+
+func TestSanitizeLabelName(t *testing.T) {
+ tests := []struct {
+ input string
+ expected string
+ }{
+ {"avg(totaltime)", "avg_totaltime"},
+ {"sum(rcv)", "sum_rcv"},
+ {"response-code", "response_code"},
+ {"valid_label", "valid_label"},
+ {"ValidLabel123", "ValidLabel123"},
+ {"123invalid", "label_123invalid"},
+ {"label__with___underscores", "label_with_underscores"},
+ {"_leading", "leading"},
+ {"trailing_", "trailing"},
+ {"special!@#$chars", "special_chars"},
+ {"", "unknown"},
+ {"___", "unknown"},
+ {"http.method", "http_method"},
+ {"status_code", "status_code"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := sanitizeLabelName(tt.input)
+ if result != tt.expected {
+ t.Errorf("sanitizeLabelName(%q) = %q, want %q", tt.input, result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestTabularCSVParser_WithSpecialCharacterHeaders(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ input := `avg(totaltime),sum(rcv),response-code,http.method
+50.5,1102,200,GET
+100.2,2204,404,POST`
+
+ parser := NewTabularCSVParser("special_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 2 rows * 3 numeric columns (avg(totaltime), sum(rcv), response-code) = 6 samples
+ if len(samples) != 6 {
+ t.Fatalf("Expected 6 samples, got %d", len(samples))
+ }
+
+ // Check that metric names are sanitized
+ expectedMetrics := []string{"special_metric_avg_totaltime", "special_metric_sum_rcv", "special_metric_response_code"}
+ foundMetrics := make(map[string]bool)
+ for _, sample := range samples {
+ foundMetrics[sample.MetricName] = true
+ }
+ for _, expected := range expectedMetrics {
+ if !foundMetrics[expected] {
+ t.Errorf("Missing expected metric %s", expected)
+ }
+ }
+
+ // Check that string column becomes label (http.method -> http_method)
+ for _, sample := range samples {
+ if _, ok := sample.Labels["http_method"]; !ok {
+ t.Errorf("Sample missing sanitized label http_method")
+ }
+ }
+
+ // Check values are correct for first row
+ for _, sample := range samples {
+ if sample.Labels["http_method"] == "GET" {
+ if strings.HasSuffix(sample.MetricName, "_avg_totaltime") {
+ if sample.Value != 50.5 {
+ t.Errorf("avg_totaltime value = %f, want 50.5", sample.Value)
+ }
+ } else if strings.HasSuffix(sample.MetricName, "_sum_rcv") {
+ if sample.Value != 1102 {
+ t.Errorf("sum_rcv value = %f, want 1102", sample.Value)
+ }
+ } else if strings.HasSuffix(sample.MetricName, "_response_code") {
+ if sample.Value != 200 {
+ t.Errorf("response_code value = %f, want 200", sample.Value)
+ }
+ }
+ }
+ }
+}
+
+func TestTabularCSVParser_DNSResolution(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Test with localhost IP which should resolve on most systems
+ input := `ip,responsecode,count
+127.0.0.1,200,100`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // Should have 1 sample (count metric)
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // The ip label should either be resolved to a hostname or remain as IP if DNS failed
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel == "" {
+ t.Error("ip label is empty")
+ }
+
+ // If resolution succeeded, it should not be the original IP
+ // If it failed, it should still be the IP
+ t.Logf("IP label value: %s (original: 127.0.0.1)", ipLabel)
+}
+
+func TestTabularCSVParser_DNSResolutionDisabled(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ input := `ip,responsecode,count
+192.168.1.1,200,100`
+
+ // Pass empty slice - no DNS resolution
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // IP should remain unchanged
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel != "192.168.1.1" {
+ t.Errorf("IP label = %s, want 192.168.1.1 (DNS resolution should be disabled)", ipLabel)
+ }
+}
+
+func TestTabularCSVParser_DNSResolutionMultipleLabels(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ input := `source_ip,dest_ip,count
+127.0.0.1,192.168.1.1,100`
+
+ // Resolve both source_ip and dest_ip
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"source_ip", "dest_ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // Both labels should be present
+ if _, ok := samples[0].Labels["source_ip"]; !ok {
+ t.Error("source_ip label missing")
+ }
+ if _, ok := samples[0].Labels["dest_ip"]; !ok {
+ t.Error("dest_ip label missing")
+ }
+
+ t.Logf("source_ip: %s", samples[0].Labels["source_ip"])
+ t.Logf("dest_ip: %s", samples[0].Labels["dest_ip"])
+}
+
+func TestTabularCSVParser_DNSResolutionNonIPValue(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // ip column contains non-IP value
+ input := `ip,responsecode,count
+not-an-ip,200,100`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // Non-IP value should remain unchanged
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel != "not-an-ip" {
+ t.Errorf("IP label = %s, want not-an-ip (should not resolve non-IP values)", ipLabel)
+ }
+}
+
+func TestTabularCSVParser_DNSResolutionCaching(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Multiple rows with same IP
+ input := `ip,responsecode,count
+127.0.0.1,200,100
+127.0.0.1,404,50
+127.0.0.1,500,5`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // Should have 3 samples
+ if len(samples) != 3 {
+ t.Fatalf("Expected 3 samples, got %d", len(samples))
+ }
+
+ // All should have same resolved IP (cached result)
+ firstIP := samples[0].Labels["ip"]
+ for i, sample := range samples {
+ if sample.Labels["ip"] != firstIP {
+ t.Errorf("Sample %d has different IP label: %s vs %s (caching issue)", i, sample.Labels["ip"], firstIP)
+ }
+ }
+
+ // Check cache size
+ if parser.resolver.GetCacheSize() != 1 {
+ t.Errorf("Expected cache size 1 (one unique IP), got %d", parser.resolver.GetCacheSize())
+ }
+}