package ingester import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "strings" "sync" "time" "epimetheus/internal/metrics" ) const ( clickhouseBatchSize = 10000 clickhouseHTTPTimeout = 30 * time.Second defaultTableName = "epimetheus_metrics" ) // ClickHouseIngester ingests metrics into ClickHouse. type ClickHouseIngester struct { client *http.Client baseURL string tableName string } // NewClickHouseIngester creates a new ClickHouse ingester. func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester { if tableName == "" { tableName = defaultTableName } // Ensure URL has scheme and no trailing slash for query params baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/") return ClickHouseIngester{ client: &http.Client{ Timeout: clickhouseHTTPTimeout, }, baseURL: baseURL, tableName: tableName, } } // EnsureTable creates the metrics table if it does not exist. func (c ClickHouseIngester) EnsureTable(ctx context.Context) error { createSQL := fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( metric String, labels Map(String, String), value Float64, timestamp DateTime64(3) ) ENGINE = MergeTree() ORDER BY (metric, timestamp) `, c.tableName) reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL) req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil) if err != nil { return fmt.Errorf("create request: %w", err) } resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("execute create table: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body)) } return nil } // Ingest inserts samples into ClickHouse in batches. func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error { if len(samples) == 0 { return fmt.Errorf("no samples to ingest") } if err := c.EnsureTable(ctx); err != nil { return fmt.Errorf("ensure table: %w", err) } batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize) totalBatches := len(batches) log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches) var wg sync.WaitGroup errChan := make(chan error, totalBatches) for i, batch := range batches { select { case <-ctx.Done(): return ctx.Err() default: } wg.Add(1) go func(idx int, b []metrics.Sample) { defer wg.Done() if err := c.insertBatch(ctx, b); err != nil { errChan <- fmt.Errorf("batch %d: %w", idx+1, err) } }(i, batch) } wg.Wait() close(errChan) var firstErr error for err := range errChan { if firstErr == nil { firstErr = err } log.Printf("ClickHouse batch error: %v", err) } return firstErr } // insertBatch sends a single batch via HTTP JSONEachRow. func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) for _, s := range samples { // Convert labels map to format ClickHouse Map expects labelsMap := make(map[string]string, len(s.Labels)) for k, v := range s.Labels { labelsMap[k] = v } row := struct { Metric string `json:"metric"` Labels map[string]string `json:"labels"` Value float64 `json:"value"` Timestamp string `json:"timestamp"` }{ Metric: s.MetricName, Labels: labelsMap, Value: s.Value, Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"), } if err := enc.Encode(row); err != nil { return fmt.Errorf("encode row: %w", err) } } query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName) reqURL := c.baseURL + "/?query=" + url.QueryEscape(query) req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf) if err != nil { return fmt.Errorf("create request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body)) } return nil } func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample { var batches [][]metrics.Sample for i := 0; i < len(samples); i += size { end := i + size if end > len(samples) { end = len(samples) } batches = append(batches, samples[i:end]) } return batches }