1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
package ingester
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"
"epimetheus/internal/metrics"
)
const (
clickhouseBatchSize = 10000
clickhouseHTTPTimeout = 30 * time.Second
defaultTableName = "epimetheus_metrics"
)
// ClickHouseIngester ingests metrics into ClickHouse.
type ClickHouseIngester struct {
client *http.Client
baseURL string
tableName string
}
// NewClickHouseIngester creates a new ClickHouse ingester.
func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester {
if tableName == "" {
tableName = defaultTableName
}
// Ensure URL has scheme and no trailing slash for query params
baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/")
return ClickHouseIngester{
client: &http.Client{
Timeout: clickhouseHTTPTimeout,
},
baseURL: baseURL,
tableName: tableName,
}
}
// EnsureTable creates the metrics table if it does not exist.
func (c ClickHouseIngester) EnsureTable(ctx context.Context) error {
createSQL := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
metric String,
labels Map(String, String),
value Float64,
timestamp DateTime64(3)
) ENGINE = MergeTree()
ORDER BY (metric, timestamp)
`, c.tableName)
reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL)
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("execute create table: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body))
}
return nil
}
// Ingest inserts samples into ClickHouse in batches.
func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error {
if len(samples) == 0 {
return fmt.Errorf("no samples to ingest")
}
if err := c.EnsureTable(ctx); err != nil {
return fmt.Errorf("ensure table: %w", err)
}
batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize)
totalBatches := len(batches)
log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches)
var wg sync.WaitGroup
errChan := make(chan error, totalBatches)
for i, batch := range batches {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
wg.Add(1)
go func(idx int, b []metrics.Sample) {
defer wg.Done()
if err := c.insertBatch(ctx, b); err != nil {
errChan <- fmt.Errorf("batch %d: %w", idx+1, err)
}
}(i, batch)
}
wg.Wait()
close(errChan)
var firstErr error
for err := range errChan {
if firstErr == nil {
firstErr = err
}
log.Printf("ClickHouse batch error: %v", err)
}
return firstErr
}
// insertBatch sends a single batch via HTTP JSONEachRow.
func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for _, s := range samples {
// Convert labels map to format ClickHouse Map expects
labelsMap := make(map[string]string, len(s.Labels))
for k, v := range s.Labels {
labelsMap[k] = v
}
row := struct {
Metric string `json:"metric"`
Labels map[string]string `json:"labels"`
Value float64 `json:"value"`
Timestamp string `json:"timestamp"`
}{
Metric: s.MetricName,
Labels: labelsMap,
Value: s.Value,
Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"),
}
if err := enc.Encode(row); err != nil {
return fmt.Errorf("encode row: %w", err)
}
}
query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName)
reqURL := c.baseURL + "/?query=" + url.QueryEscape(query)
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body))
}
return nil
}
func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample {
var batches [][]metrics.Sample
for i := 0; i < len(samples); i += size {
end := i + size
if end > len(samples) {
end = len(samples)
}
batches = append(batches, samples[i:end])
}
return batches
}
|