summaryrefslogtreecommitdiff
path: root/internal/ingester/clickhouse.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingester/clickhouse.go')
-rw-r--r--internal/ingester/clickhouse.go191
1 files changed, 191 insertions, 0 deletions
diff --git a/internal/ingester/clickhouse.go b/internal/ingester/clickhouse.go
new file mode 100644
index 0000000..2439d4e
--- /dev/null
+++ b/internal/ingester/clickhouse.go
@@ -0,0 +1,191 @@
+package ingester
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+const (
+ clickhouseBatchSize = 10000
+ clickhouseHTTPTimeout = 30 * time.Second
+ defaultTableName = "epimetheus_metrics"
+)
+
+// ClickHouseIngester ingests metrics into ClickHouse.
+type ClickHouseIngester struct {
+ client *http.Client
+ baseURL string
+ tableName string
+}
+
+// NewClickHouseIngester creates a new ClickHouse ingester.
+func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester {
+ if tableName == "" {
+ tableName = defaultTableName
+ }
+ // Ensure URL has scheme and no trailing slash for query params
+ baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/")
+
+ return ClickHouseIngester{
+ client: &http.Client{
+ Timeout: clickhouseHTTPTimeout,
+ },
+ baseURL: baseURL,
+ tableName: tableName,
+ }
+}
+
+// EnsureTable creates the metrics table if it does not exist.
+func (c ClickHouseIngester) EnsureTable(ctx context.Context) error {
+ createSQL := fmt.Sprintf(`
+ CREATE TABLE IF NOT EXISTS %s (
+ metric String,
+ labels Map(String, String),
+ value Float64,
+ timestamp DateTime64(3)
+ ) ENGINE = MergeTree()
+ ORDER BY (metric, timestamp)
+ `, c.tableName)
+
+ reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL)
+ req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
+ if err != nil {
+ return fmt.Errorf("create request: %w", err)
+ }
+
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("execute create table: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+// Ingest inserts samples into ClickHouse in batches.
+func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ if err := c.EnsureTable(ctx); err != nil {
+ return fmt.Errorf("ensure table: %w", err)
+ }
+
+ batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize)
+ totalBatches := len(batches)
+
+ log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches)
+
+ var wg sync.WaitGroup
+ errChan := make(chan error, totalBatches)
+
+ for i, batch := range batches {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ wg.Add(1)
+ go func(idx int, b []metrics.Sample) {
+ defer wg.Done()
+ if err := c.insertBatch(ctx, b); err != nil {
+ errChan <- fmt.Errorf("batch %d: %w", idx+1, err)
+ }
+ }(i, batch)
+ }
+
+ wg.Wait()
+ close(errChan)
+
+ var firstErr error
+ for err := range errChan {
+ if firstErr == nil {
+ firstErr = err
+ }
+ log.Printf("ClickHouse batch error: %v", err)
+ }
+
+ return firstErr
+}
+
+// insertBatch sends a single batch via HTTP JSONEachRow.
+func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error {
+ var buf bytes.Buffer
+ enc := json.NewEncoder(&buf)
+
+ for _, s := range samples {
+ // Convert labels map to format ClickHouse Map expects
+ labelsMap := make(map[string]string, len(s.Labels))
+ for k, v := range s.Labels {
+ labelsMap[k] = v
+ }
+
+ row := struct {
+ Metric string `json:"metric"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ Timestamp string `json:"timestamp"`
+ }{
+ Metric: s.MetricName,
+ Labels: labelsMap,
+ Value: s.Value,
+ Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"),
+ }
+
+ if err := enc.Encode(row); err != nil {
+ return fmt.Errorf("encode row: %w", err)
+ }
+ }
+
+ query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName)
+ reqURL := c.baseURL + "/?query=" + url.QueryEscape(query)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf)
+ if err != nil {
+ return fmt.Errorf("create request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("send request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample {
+ var batches [][]metrics.Sample
+ for i := 0; i < len(samples); i += size {
+ end := i + size
+ if end > len(samples) {
+ end = len(samples)
+ }
+ batches = append(batches, samples[i:end])
+ }
+ return batches
+}