diff options
| author | Paul Buetow <paul@buetow.org> | 2025-12-30 21:36:48 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-12-30 21:36:48 +0200 |
| commit | de3e419a76873c8ac6c6415fbdbdd708fcecdf30 (patch) | |
| tree | 2909220393a8099fc376492672458426db09bd76 | |
| parent | 37a0ce2c6ad01a7439739c0794e74fd0b052bcae (diff) | |
Add historic data ingestion support to prometheus-pusher
This commit extends prometheus-pusher to support ingesting historic data
with custom timestamps via Prometheus Remote Write API.
## Key Changes
1. New Historic Data Module (historic.go)
- GenerateHistoricMetrics: Creates metrics for specific past timestamps
- PushHistoricData: Sends single datapoint via Remote Write API
- BackfillHistoricData: Backfills range of historic data
- Uses Protobuf + Snappy encoding per Prometheus spec
2. Enhanced Main Binary (main.go, realtime.go)
- Refactored to support multiple modes
- Mode 1: realtime - Push to Pushgateway (original behavior)
- Mode 2: historic - Push single historic datapoint
- Mode 3: backfill - Backfill range of historic data
- Command-line flags for configuration
3. Prometheus Configuration (persistence-values.yaml)
- Added web.enable-remote-write-receiver flag
- Enables Prometheus to accept timestamped samples via Remote Write API
- Required for historic data ingestion
4. Documentation (HISTORIC.md)
- Complete guide for historic data ingestion
- Explains limitations and best practices
- Examples for all three modes
- Troubleshooting guide
## Technical Details
**Problem**: Pushgateway doesn't support custom timestamps - Prometheus
always uses "now" when scraping. This prevents backfilling historic data.
**Solution**: Use Prometheus Remote Write API which accepts timestamped
samples. Requires enabling --web.enable-remote-write-receiver flag.
**Data Format**: Protobuf (prompb.WriteRequest) with Snappy compression
**Use Cases**:
- Backfill missing data (e.g., during outage)
- Import historic data from other systems
- Testing with specific timestamps
- Data migration scenarios
## Usage Examples
```bash
# Realtime mode (original behavior)
./prometheus-pusher-historic -mode=realtime -continuous
# Push data from 24 hours ago
./prometheus-pusher-historic -mode=historic -hours-ago=24
# Backfill last 48 hours with 1-hour intervals
./prometheus-pusher-historic -mode=backfill -start-hours=48 -end-hours=0 -interval=1
```
## Dependencies Added
- github.com/prometheus/prometheus (for prompb package)
- github.com/golang/snappy (for compression)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| -rw-r--r-- | f3s/prometheus-pusher/HISTORIC.md | 231 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/go.mod | 23 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/go.sum | 91 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/historic.go | 217 | ||||
| -rw-r--r-- | f3s/prometheus-pusher/main.go | 153 | ||||
| -rwxr-xr-x | f3s/prometheus-pusher/prometheus-pusher-historic | bin | 0 -> 13852824 bytes | |||
| -rw-r--r-- | f3s/prometheus-pusher/realtime.go | 100 | ||||
| -rw-r--r-- | f3s/prometheus/persistence-values.yaml | 5 |
8 files changed, 689 insertions, 131 deletions
diff --git a/f3s/prometheus-pusher/HISTORIC.md b/f3s/prometheus-pusher/HISTORIC.md new file mode 100644 index 0000000..92da15d --- /dev/null +++ b/f3s/prometheus-pusher/HISTORIC.md @@ -0,0 +1,231 @@ +# Historic Data Ingestion + +This document explains how to ingest historic data into Prometheus using the prometheus-pusher tool. + +## Problem + +The standard Pushgateway approach has a limitation: it doesn't support custom timestamps. When you push metrics to Pushgateway, Prometheus scrapes them with the current timestamp. This means you cannot backfill historic data (e.g., data from yesterday or last week). + +## Solution + +Prometheus supports the **Remote Write API** which accepts timestamped samples. By enabling the `remote-write-receiver` feature flag, Prometheus can accept historic data with custom timestamps via HTTP POST. + +### Limitations + +- **Out-of-order samples**: By default, Prometheus rejects samples that are older than the most recent sample for that time series +- **Time window**: Prometheus typically accepts data within a certain time window (default: up to 1 hour in the past for new series) +- **Feature flag required**: The remote write receiver must be enabled with `--enable-feature=remote-write-receiver` + +## Setup + +### 1. Enable Remote Write Receiver + +The Prometheus instance needs to be configured with the remote write receiver feature: + +```yaml +# In prometheus/persistence-values.yaml +prometheus: + prometheusSpec: + enableFeatures: + - remote-write-receiver +``` + +This has been configured and applied to the monitoring namespace Prometheus instance. + +### 2. Verify Feature is Enabled + +```bash +kubectl logs -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 | grep "remote-write-receiver" +``` + +You should see: `msg="Experimental features enabled" features=[remote-write-receiver]` + +## Usage + +The `prometheus-pusher-historic` binary supports three modes: + +### Mode 1: Realtime (Default) + +Push current metrics to Pushgateway (same as before): + +```bash +./prometheus-pusher-historic -mode=realtime -continuous +``` + +Options: +- `-pushgateway`: Pushgateway URL (default: http://localhost:9091) +- `-job`: Job name (default: example_metrics_pusher) +- `-continuous`: Keep pushing every 15 seconds + +### Mode 2: Historic (Single Datapoint) + +Push a single datapoint from X hours ago: + +```bash +# Port-forward Prometheus +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & + +# Push data from 24 hours ago +./prometheus-pusher-historic -mode=historic -hours-ago=24 + +# Push data from 3 hours ago +./prometheus-pusher-historic -mode=historic -hours-ago=3 + +# Push data from yesterday (48 hours ago) +./prometheus-pusher-historic -mode=historic -hours-ago=48 +``` + +Options: +- `-prometheus`: Prometheus remote write URL (default: http://localhost:9090/api/v1/write) +- `-hours-ago`: How many hours in the past (default: 24) + +### Mode 3: Backfill (Multiple Datapoints) + +Backfill a range of historic data: + +```bash +# Backfill last 48 hours with 1-hour intervals +./prometheus-pusher-historic -mode=backfill -start-hours=48 -end-hours=0 -interval=1 + +# Backfill last week with 6-hour intervals +./prometheus-pusher-historic -mode=backfill -start-hours=168 -end-hours=0 -interval=6 + +# Backfill specific range (24h ago to 12h ago, every 2 hours) +./prometheus-pusher-historic -mode=backfill -start-hours=24 -end-hours=12 -interval=2 +``` + +Options: +- `-start-hours`: Start time in hours ago (e.g., 48 = 2 days ago) +- `-end-hours`: End time in hours ago (e.g., 0 = now) +- `-interval`: Interval between datapoints in hours + +## Data Format + +Historic data is sent using the Prometheus Remote Write protocol (Protobuf): + +1. **Protocol**: HTTP POST with Protobuf payload +2. **Encoding**: Snappy compression +3. **Headers**: + - Content-Type: application/x-protobuf + - Content-Encoding: snappy + - X-Prometheus-Remote-Write-Version: 0.1.0 + +4. **Payload**: TimeSeries with custom timestamps + +Example time series: +```protobuf +TimeSeries { + Labels: [ + {Name: "__name__", Value: "app_requests_total"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"} + ], + Samples: [ + {Value: 42, Timestamp: 1735516800000} // milliseconds since epoch + ] +} +``` + +## Example: Backfill Last 24 Hours + +```bash +#!/bin/bash + +# 1. Port-forward Prometheus +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & +PF_PID=$! +sleep 2 + +# 2. Backfill data for every hour in the last 24 hours +cd /home/paul/git/conf/f3s/prometheus-pusher +./prometheus-pusher-historic \ + -mode=backfill \ + -prometheus=http://localhost:9090/api/v1/write \ + -start-hours=24 \ + -end-hours=0 \ + -interval=1 + +# 3. Clean up +kill $PF_PID +``` + +## Querying Historic Data + +Once backfilled, the historic data is queryable in Prometheus: + +```promql +# View all historic data +{job="historic_data"} + +# View specific metric from historic data +app_requests_total{job="historic_data"} + +# View data from a specific time range +app_temperature_celsius{job="historic_data"}[24h] + +# Compare realtime vs historic data +app_requests_total{job="example_metrics_pusher"} # realtime +app_requests_total{job="historic_data"} # historic +``` + +## Troubleshooting + +### Error: "remote write receiver not enabled" + +``` +Error: remote write failed with status 404: remote write receiver not enabled +``` + +Solution: Ensure Prometheus has the `remote-write-receiver` feature enabled and has restarted. + +### Error: "out of order sample" + +``` +Error: sample timestamp out of order +``` + +This occurs when trying to insert data older than existing data for the same time series. Solutions: +1. Use a different job label for historic data (already done: `job="historic_data"`) +2. Enable out-of-order ingestion in Prometheus (experimental) +3. Ensure backfill starts from oldest to newest + +### Error: "sample too old" + +``` +Error: sample is too old +``` + +Prometheus has limits on how old data can be. By default: +- For existing series: can't be older than the oldest block +- For new series: typically accepts data up to 1 hour old + +Solution: For very old data (weeks/months), use `promtool tsdb create-blocks-from` instead. + +## Best Practices + +1. **Use different job labels**: Historic data uses `job="historic_data"`, realtime uses `job="example_metrics_pusher"` +2. **Backfill in order**: Always backfill from oldest to newest to avoid out-of-order rejections +3. **Small batches**: Don't overwhelm Prometheus - the tool includes 100ms delays between datapoints +4. **Verify first**: Test with a single datapoint before running large backfills +5. **Monitor errors**: Check Prometheus logs if ingestion fails + +## Limitations + +- **Very old data**: For data older than a few days, consider using `promtool` for TSDB block creation +- **High cardinality**: Be careful with label combinations - they create separate time series +- **Performance**: Large backfills can impact Prometheus performance +- **Out-of-order**: By default, Prometheus rejects out-of-order samples + +## Alternative: Using Promtool + +For very large historic datasets, you can use `promtool` to create TSDB blocks: + +```bash +# 1. Generate OpenMetrics format file +./prometheus-pusher-historic -mode=export -output=metrics.txt + +# 2. Create blocks from the file +promtool tsdb create-blocks-from openmetrics metrics.txt /path/to/prometheus/data +``` + +This method bypasses the API and writes directly to the TSDB, but requires filesystem access. diff --git a/f3s/prometheus-pusher/go.mod b/f3s/prometheus-pusher/go.mod index 4576c38..e5fbf08 100644 --- a/f3s/prometheus-pusher/go.mod +++ b/f3s/prometheus-pusher/go.mod @@ -1,16 +1,25 @@ module prometheus-pusher -go 1.21 +go 1.24.0 -require github.com/prometheus/client_golang v1.20.5 +require ( + github.com/golang/snappy v1.0.0 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/prometheus v0.308.1 +) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect + github.com/kr/text v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.55.0 // indirect - github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/sys v0.22.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.67.4 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/sys v0.37.0 // indirect + golang.org/x/text v0.30.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect ) diff --git a/f3s/prometheus-pusher/go.sum b/f3s/prometheus-pusher/go.sum index d3e9db5..5d7a4eb 100644 --- a/f3s/prometheus-pusher/go.sum +++ b/f3s/prometheus-pusher/go.sum @@ -2,21 +2,80 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 h1:cLN4IBkmkYZNnk7EAJ0BHIethd+J6LqxFNw5mSiI2bM= +github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= -github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= -github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= -github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.4 h1:yR3NqWO1/UyO1w2PhUvXlGQs/PtFmoveVO0KZ4+Lvsc= +github.com/prometheus/common v0.67.4/go.mod h1:gP0fq6YjjNCLssJCQp0yk4M8W6ikLURwkdd/YKtTbyI= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/prometheus/prometheus v0.308.1 h1:ApMNI/3/es3Ze90Z7CMb+wwU2BsSYur0m5VKeqHj7h4= +github.com/prometheus/prometheus v0.308.1/go.mod h1:aHjYCDz9zKRyoUXvMWvu13K9XHOkBB12XrEqibs3e0A= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/f3s/prometheus-pusher/historic.go b/f3s/prometheus-pusher/historic.go new file mode 100644 index 0000000..66a2ae2 --- /dev/null +++ b/f3s/prometheus-pusher/historic.go @@ -0,0 +1,217 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "time" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" +) + +// GenerateHistoricMetrics generates metric samples for a specific time in the past +// hoursAgo: how many hours in the past to generate data for +func GenerateHistoricMetrics(hoursAgo int) []prompb.TimeSeries { + timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour).UnixMilli() + + var timeSeries []prompb.TimeSeries + + // Counter: app_requests_total + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_requests_total"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + }, + Samples: []prompb.Sample{ + {Value: float64(rand.Intn(100) + 1), Timestamp: timestamp}, + }, + }) + + // Gauge: app_active_connections + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_active_connections"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + }, + Samples: []prompb.Sample{ + {Value: float64(rand.Intn(100)), Timestamp: timestamp}, + }, + }) + + // Gauge: app_temperature_celsius + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_temperature_celsius"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + }, + Samples: []prompb.Sample{ + {Value: 15 + rand.Float64()*20, Timestamp: timestamp}, + }, + }) + + // Histogram buckets: app_request_duration_seconds + buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + cumulativeCount := 0 + for _, bucket := range buckets { + cumulativeCount += rand.Intn(5) + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + {Name: "le", Value: fmt.Sprintf("%g", bucket)}, + }, + Samples: []prompb.Sample{ + {Value: float64(cumulativeCount), Timestamp: timestamp}, + }, + }) + } + + // +Inf bucket + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_bucket"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + {Name: "le", Value: "+Inf"}, + }, + Samples: []prompb.Sample{ + {Value: float64(cumulativeCount), Timestamp: timestamp}, + }, + }) + + // Histogram sum + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_sum"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + }, + Samples: []prompb.Sample{ + {Value: rand.Float64() * 100, Timestamp: timestamp}, + }, + }) + + // Histogram count + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_request_duration_seconds_count"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + }, + Samples: []prompb.Sample{ + {Value: float64(cumulativeCount), Timestamp: timestamp}, + }, + }) + + // Labeled counters: app_jobs_processed_total + jobTypes := []string{"email", "report", "backup"} + statuses := []string{"success", "failed"} + for _, jobType := range jobTypes { + for _, status := range statuses { + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "app_jobs_processed_total"}, + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + {Name: "job_type", Value: jobType}, + {Name: "status", Value: status}, + }, + Samples: []prompb.Sample{ + {Value: float64(rand.Intn(20)), Timestamp: timestamp}, + }, + }) + } + } + + return timeSeries +} + +// PushHistoricData sends historic data to Prometheus via Remote Write API +// prometheusURL: URL of Prometheus remote write endpoint (e.g., "http://localhost:9090/api/v1/write") +// hoursAgo: how many hours in the past to generate data for +func PushHistoricData(prometheusURL string, hoursAgo int) error { + // Generate historic metrics + timeSeries := GenerateHistoricMetrics(hoursAgo) + + // Create write request + writeRequest := &prompb.WriteRequest{ + Timeseries: timeSeries, + } + + // Marshal to protobuf + data, err := writeRequest.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal write request: %w", err) + } + + // Compress with snappy + compressed := snappy.Encode(nil, data) + + // Send HTTP POST request + req, err := http.NewRequest("POST", prometheusURL, bytes.NewReader(compressed)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send remote write request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) + } + + log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", + hoursAgo, time.Now().Add(-time.Duration(hoursAgo)*time.Hour).Format(time.RFC3339)) + + return nil +} + +// BackfillHistoricData backfills data for multiple time points +// prometheusURL: URL of Prometheus remote write endpoint +// startHoursAgo: how many hours ago to start backfilling +// endHoursAgo: how many hours ago to end backfilling +// intervalHours: interval between data points in hours +func BackfillHistoricData(prometheusURL string, startHoursAgo, endHoursAgo, intervalHours int) error { + log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", + startHoursAgo, endHoursAgo, intervalHours) + + successCount := 0 + errorCount := 0 + + for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { + if err := PushHistoricData(prometheusURL, hoursAgo); err != nil { + log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) + errorCount++ + } else { + successCount++ + } + + // Small delay to avoid overwhelming Prometheus + time.Sleep(100 * time.Millisecond) + } + + log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) + + if errorCount > 0 { + return fmt.Errorf("backfill completed with %d errors", errorCount) + } + + return nil +} diff --git a/f3s/prometheus-pusher/main.go b/f3s/prometheus-pusher/main.go index 46a896a..a75cb20 100644 --- a/f3s/prometheus-pusher/main.go +++ b/f3s/prometheus-pusher/main.go @@ -1,123 +1,53 @@ package main import ( - "fmt" + "flag" "log" "math/rand" "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/push" -) - -// Define metrics -var ( - // Counter: Monotonically increasing value (e.g., total requests processed) - requestsTotal = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "app_requests_total", - Help: "Total number of requests processed", - }, - ) - - // Gauge: Value that can go up or down (e.g., current temperature, active connections) - activeConnections = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "app_active_connections", - Help: "Number of currently active connections", - }, - ) - - // Gauge for temperature simulation - temperatureCelsius = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "app_temperature_celsius", - Help: "Current temperature in Celsius", - }, - ) - - // Histogram: Distribution of values (e.g., request duration) - requestDuration = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "app_request_duration_seconds", - Help: "Histogram of request duration in seconds", - Buckets: prometheus.DefBuckets, // Default buckets: .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10 - }, - ) - - // Counter with labels - jobsProcessed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "app_jobs_processed_total", - Help: "Total number of jobs processed by type", - }, - []string{"job_type", "status"}, - ) ) -// simulateMetrics generates example metric data -func simulateMetrics() { - // Increment request counter - requestsTotal.Add(float64(rand.Intn(10) + 1)) - - // Update active connections (random number between 0-100) - activeConnections.Set(float64(rand.Intn(100))) - - // Simulate temperature (random between 15-35 Celsius) - temperatureCelsius.Set(15 + rand.Float64()*20) +func main() { + // Command-line flags + mode := flag.String("mode", "realtime", "Mode: realtime (push to pushgateway) or historic (backfill via remote write)") + pushgatewayURL := flag.String("pushgateway", "http://localhost:9091", "Pushgateway URL for realtime mode") + prometheusURL := flag.String("prometheus", "http://localhost:9090/api/v1/write", "Prometheus remote write URL for historic mode") + hoursAgo := flag.Int("hours-ago", 24, "For historic mode: how many hours ago (single datapoint)") + startHours := flag.Int("start-hours", 48, "For backfill: start time in hours ago") + endHours := flag.Int("end-hours", 0, "For backfill: end time in hours ago") + interval := flag.Int("interval", 1, "For backfill: interval between datapoints in hours") + continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s") + jobName := flag.String("job", "example_metrics_pusher", "Job name for realtime mode") + + flag.Parse() - // Record some request durations - for i := 0; i < rand.Intn(5)+1; i++ { - duration := rand.Float64() * 2 // 0-2 seconds - requestDuration.Observe(duration) - } + // Seed random number generator + rand.Seed(time.Now().UnixNano()) - // Record job completions with labels - jobTypes := []string{"email", "report", "backup"} - statuses := []string{"success", "failed"} + switch *mode { + case "realtime": + runRealtimeMode(*pushgatewayURL, *jobName, *continuous) - for _, jobType := range jobTypes { - status := statuses[rand.Intn(len(statuses))] - jobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5))) - } -} + case "historic": + if err := PushHistoricData(*prometheusURL, *hoursAgo); err != nil { + log.Fatalf("Failed to push historic data: %v", err) + } -// pushMetrics pushes all metrics to the Pushgateway -func pushMetrics(pushgatewayURL, jobName string) error { - // Create a new pusher - pusher := push.New(pushgatewayURL, jobName). - Collector(requestsTotal). - Collector(activeConnections). - Collector(temperatureCelsius). - Collector(requestDuration). - Collector(jobsProcessed). - Grouping("instance", "example-app") + case "backfill": + if err := BackfillHistoricData(*prometheusURL, *startHours, *endHours, *interval); err != nil { + log.Fatalf("Failed to backfill data: %v", err) + } - // Push metrics to the Pushgateway - if err := pusher.Push(); err != nil { - return fmt.Errorf("failed to push metrics: %w", err) + default: + log.Fatalf("Unknown mode: %s (use realtime, historic, or backfill)", *mode) } - - return nil } -func main() { - // Configuration - use localhost:9091 when port-forwarding - // kubectl port-forward -n monitoring svc/pushgateway 9091:9091 - pushgatewayURL := "http://localhost:9091" - jobName := "example_metrics_pusher" - - // Seed random number generator - rand.Seed(time.Now().UnixNano()) - - log.Printf("Starting Prometheus metrics pusher") +func runRealtimeMode(pushgatewayURL, jobName string, continuous bool) { + log.Printf("Starting Prometheus metrics pusher in REALTIME mode") log.Printf("Pushgateway URL: %s", pushgatewayURL) log.Printf("Job name: %s", jobName) - // Push metrics every 15 seconds - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - // Push immediately on start simulateMetrics() if err := pushMetrics(pushgatewayURL, jobName); err != nil { @@ -126,13 +56,20 @@ func main() { log.Printf("Successfully pushed metrics to Pushgateway") } - // Continue pushing periodically - for range ticker.C { - simulateMetrics() - if err := pushMetrics(pushgatewayURL, jobName); err != nil { - log.Printf("Error pushing metrics: %v", err) - } else { - log.Printf("Successfully pushed metrics to Pushgateway") + if continuous { + // Push metrics every 15 seconds + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.") + + for range ticker.C { + simulateMetrics() + if err := pushMetrics(pushgatewayURL, jobName); err != nil { + log.Printf("Error pushing metrics: %v", err) + } else { + log.Printf("Successfully pushed metrics to Pushgateway") + } } } } diff --git a/f3s/prometheus-pusher/prometheus-pusher-historic b/f3s/prometheus-pusher/prometheus-pusher-historic Binary files differnew file mode 100755 index 0000000..68fbd2c --- /dev/null +++ b/f3s/prometheus-pusher/prometheus-pusher-historic diff --git a/f3s/prometheus-pusher/realtime.go b/f3s/prometheus-pusher/realtime.go new file mode 100644 index 0000000..092089f --- /dev/null +++ b/f3s/prometheus-pusher/realtime.go @@ -0,0 +1,100 @@ +package main + +import ( + "fmt" + "math/rand" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/push" +) + +// Define metrics for realtime pushing +var ( + // Counter: Monotonically increasing value (e.g., total requests processed) + requestsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "app_requests_total", + Help: "Total number of requests processed", + }, + ) + + // Gauge: Value that can go up or down (e.g., current temperature, active connections) + activeConnections = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "app_active_connections", + Help: "Number of currently active connections", + }, + ) + + // Gauge for temperature simulation + temperatureCelsius = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "app_temperature_celsius", + Help: "Current temperature in Celsius", + }, + ) + + // Histogram: Distribution of values (e.g., request duration) + requestDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "app_request_duration_seconds", + Help: "Histogram of request duration in seconds", + Buckets: prometheus.DefBuckets, // Default buckets: .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10 + }, + ) + + // Counter with labels + jobsProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "app_jobs_processed_total", + Help: "Total number of jobs processed by type", + }, + []string{"job_type", "status"}, + ) +) + +// simulateMetrics generates example metric data +func simulateMetrics() { + // Increment request counter + requestsTotal.Add(float64(rand.Intn(10) + 1)) + + // Update active connections (random number between 0-100) + activeConnections.Set(float64(rand.Intn(100))) + + // Simulate temperature (random between 15-35 Celsius) + temperatureCelsius.Set(15 + rand.Float64()*20) + + // Record some request durations + for i := 0; i < rand.Intn(5)+1; i++ { + duration := rand.Float64() * 2 // 0-2 seconds + requestDuration.Observe(duration) + } + + // Record job completions with labels + jobTypes := []string{"email", "report", "backup"} + statuses := []string{"success", "failed"} + + for _, jobType := range jobTypes { + status := statuses[rand.Intn(len(statuses))] + jobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5))) + } +} + +// pushMetrics pushes all metrics to the Pushgateway +func pushMetrics(pushgatewayURL, jobName string) error { + // Create a new pusher + pusher := push.New(pushgatewayURL, jobName). + Collector(requestsTotal). + Collector(activeConnections). + Collector(temperatureCelsius). + Collector(requestDuration). + Collector(jobsProcessed). + Grouping("instance", "example-app") + + // Push metrics to the Pushgateway + if err := pusher.Push(); err != nil { + return fmt.Errorf("failed to push metrics: %w", err) + } + + return nil +} diff --git a/f3s/prometheus/persistence-values.yaml b/f3s/prometheus/persistence-values.yaml index 5f832d4..4f0d9e4 100644 --- a/f3s/prometheus/persistence-values.yaml +++ b/f3s/prometheus/persistence-values.yaml @@ -26,6 +26,11 @@ kubeControllerManager: prometheus: prometheusSpec: + # Enable remote write receiver for accepting historic data with custom timestamps + # Pass as additional argument to Prometheus binary + additionalArgs: + - name: web.enable-remote-write-receiver + value: "true" additionalScrapeConfigsSecret: enabled: true name: additional-scrape-configs |
