1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
package parser
import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
"regexp"
"strconv"
"strings"
"time"
"epimetheus/internal/metrics"
"epimetheus/internal/resolver"
)
// TabularCSVParser parses tabular CSV files where the first row is headers
// and all subsequent rows are data. Numeric columns become metrics, string columns become labels.
type TabularCSVParser struct {
metricName string
timestamp time.Time
resolver *resolver.DNSResolver // DNS resolver for IP addresses
resolveLabels map[string]bool // Set of label names to resolve via DNS
}
// SaveDNSCache saves the DNS cache to disk (called after successful ingestion)
func (p *TabularCSVParser) SaveDNSCache() {
if p.resolver != nil {
cacheSize := p.resolver.GetCacheSize()
if err := p.resolver.SaveCache(); err != nil {
// Log error but don't fail - cache saving is not critical
log.Printf("⚠️ Failed to save DNS cache: %v", err)
} else if cacheSize > 0 {
log.Printf("💾 Saved DNS cache with %d entries", cacheSize)
}
}
}
// numericLabelColumns is a list of column names that should be treated as labels
// even if they contain numeric values. This is useful for categorical codes like
// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics.
//
// Column names are matched case-insensitively.
//
// Common examples:
// - responsecode: HTTP response codes (200, 404, 500)
// - statuscode: Application status codes
// - status: Status identifiers
// - code: Generic codes
// - period: Date periods (20160101, 20160102, YYYYMMDD format)
var numericLabelColumns = map[string]bool{
"responsecode": true,
"statuscode": true,
"status_code": true,
"response_code": true,
"status": true,
"code": true,
"period": true,
}
// NewTabularCSVParser creates a new tabular CSV parser
// metricName is the name of the metric to create
// timestamp is the timestamp to use for all samples
// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS
func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser {
// Build set for O(1) lookup
resolveMap := make(map[string]bool)
for _, label := range resolveIPLabels {
resolveMap[label] = true
}
return &TabularCSVParser{
metricName: metricName,
timestamp: timestamp,
resolver: resolver.NewDNSResolver(),
resolveLabels: resolveMap,
}
}
// Parse reads metrics from a tabular CSV format
// First row: column headers
// Subsequent rows: data values
// All columns become labels (numeric values are converted to strings)
func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
var samples []metrics.Sample
csvReader := csv.NewReader(reader)
csvReader.Comment = '#'
csvReader.TrimLeadingSpace = true
csvReader.FieldsPerRecord = -1 // Allow variable number of fields
// Read header row
headers, err := csvReader.Read()
if err != nil {
return nil, fmt.Errorf("failed to read header row: %w", err)
}
if len(headers) == 0 {
return nil, fmt.Errorf("empty header row")
}
// Trim whitespace and sanitize headers for Prometheus label compatibility
for i := range headers {
headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i]))
}
// Read data rows
rowNum := 1
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("line %d: %w", rowNum+1, err)
}
rowNum++
if len(record) != len(headers) {
// Skip rows with mismatched column counts
continue
}
// Add microsecond offset to ensure unique timestamps per row
// This prevents duplicate timestamp errors when multiple rows
// have the same label combinations
rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond)
rowSamples := p.parseRecord(headers, record, rowTimestamp)
samples = append(samples, rowSamples...)
}
return samples, nil
}
// parseRecord converts a single CSV row into multiple Samples
// Numeric columns become separate metrics, string columns become labels.
//
// Exception: Column names in numericLabelColumns are always treated as labels,
// even if they contain numbers (e.g., responsecode: 200, 404, 500).
//
// DNS Resolution: Label values matching IP addresses in resolveLabels will be
// resolved to hostnames via reverse DNS lookup.
func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample {
var samples []metrics.Sample
// First pass: identify numeric vs string columns and collect label values
labels := make(map[string]string)
var numericCols []struct {
name string
value float64
}
for i, header := range headers {
if i >= len(record) {
continue
}
value := strings.TrimSpace(record[i])
// Check if this column is in the exception list (should be label even if numeric)
headerLower := strings.ToLower(header)
if numericLabelColumns[headerLower] {
// This column should always be a label, even if it's numeric
labels[header] = value
continue
}
// Try to parse as number
if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
// It's a numeric column - will become a metric
numericCols = append(numericCols, struct {
name string
value float64
}{name: header, value: floatVal})
} else {
// It's a string column - becomes a label
labels[header] = value
}
}
// Resolve IP addresses to hostnames
for labelName := range p.resolveLabels {
if ipValue, exists := labels[labelName]; exists {
// Attempt DNS resolution
if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" {
// Replace IP with hostname
labels[labelName] = hostname
}
// If resolution failed, ipValue remains unchanged
}
}
// Create one sample per numeric column
for _, numCol := range numericCols {
metricName := p.metricName + "_" + numCol.name
// Copy labels for each metric
metricLabels := make(map[string]string, len(labels))
for k, v := range labels {
metricLabels[k] = v
}
samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp))
}
return samples
}
// sanitizeLabelName converts a string to a valid Prometheus label name
// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]*
// Examples:
// "avg(totaltime)" -> "avg_totaltime"
// "sum(rcv)" -> "sum_rcv"
// "response-code" -> "response_code"
// "123invalid" -> "label_123invalid"
func sanitizeLabelName(name string) string {
if name == "" {
return "unknown"
}
// Replace invalid characters with underscore
re := regexp.MustCompile(`[^a-zA-Z0-9_]`)
sanitized := re.ReplaceAllString(name, "_")
// Ensure it starts with a letter or underscore
if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' {
sanitized = "label_" + sanitized
}
// Remove consecutive underscores
sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_")
// Remove leading/trailing underscores
sanitized = strings.Trim(sanitized, "_")
if sanitized == "" {
return "unknown"
}
return sanitized
}
// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails
func parseFloat64(s string) float64 {
f, err := strconv.ParseFloat(strings.TrimSpace(s), 64)
if err != nil {
return 0.0
}
return f
}
|