diff options
Diffstat (limited to 'internal/ingester/clickhouse.go')
| -rw-r--r-- | internal/ingester/clickhouse.go | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/internal/ingester/clickhouse.go b/internal/ingester/clickhouse.go new file mode 100644 index 0000000..2439d4e --- /dev/null +++ b/internal/ingester/clickhouse.go @@ -0,0 +1,191 @@ +package ingester + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "epimetheus/internal/metrics" +) + +const ( + clickhouseBatchSize = 10000 + clickhouseHTTPTimeout = 30 * time.Second + defaultTableName = "epimetheus_metrics" +) + +// ClickHouseIngester ingests metrics into ClickHouse. +type ClickHouseIngester struct { + client *http.Client + baseURL string + tableName string +} + +// NewClickHouseIngester creates a new ClickHouse ingester. +func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester { + if tableName == "" { + tableName = defaultTableName + } + // Ensure URL has scheme and no trailing slash for query params + baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/") + + return ClickHouseIngester{ + client: &http.Client{ + Timeout: clickhouseHTTPTimeout, + }, + baseURL: baseURL, + tableName: tableName, + } +} + +// EnsureTable creates the metrics table if it does not exist. +func (c ClickHouseIngester) EnsureTable(ctx context.Context) error { + createSQL := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + metric String, + labels Map(String, String), + value Float64, + timestamp DateTime64(3) + ) ENGINE = MergeTree() + ORDER BY (metric, timestamp) + `, c.tableName) + + reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL) + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("execute create table: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body)) + } + + return nil +} + +// Ingest inserts samples into ClickHouse in batches. +func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + if err := c.EnsureTable(ctx); err != nil { + return fmt.Errorf("ensure table: %w", err) + } + + batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize) + totalBatches := len(batches) + + log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches) + + var wg sync.WaitGroup + errChan := make(chan error, totalBatches) + + for i, batch := range batches { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + wg.Add(1) + go func(idx int, b []metrics.Sample) { + defer wg.Done() + if err := c.insertBatch(ctx, b); err != nil { + errChan <- fmt.Errorf("batch %d: %w", idx+1, err) + } + }(i, batch) + } + + wg.Wait() + close(errChan) + + var firstErr error + for err := range errChan { + if firstErr == nil { + firstErr = err + } + log.Printf("ClickHouse batch error: %v", err) + } + + return firstErr +} + +// insertBatch sends a single batch via HTTP JSONEachRow. +func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + for _, s := range samples { + // Convert labels map to format ClickHouse Map expects + labelsMap := make(map[string]string, len(s.Labels)) + for k, v := range s.Labels { + labelsMap[k] = v + } + + row := struct { + Metric string `json:"metric"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp string `json:"timestamp"` + }{ + Metric: s.MetricName, + Labels: labelsMap, + Value: s.Value, + Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"), + } + + if err := enc.Encode(row); err != nil { + return fmt.Errorf("encode row: %w", err) + } + } + + query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName) + reqURL := c.baseURL + "/?query=" + url.QueryEscape(query) + + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body)) + } + + return nil +} + +func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample { + var batches [][]metrics.Sample + for i := 0; i < len(samples); i += size { + end := i + size + if end > len(samples) { + end = len(samples) + } + batches = append(batches, samples[i:end]) + } + return batches +} |
