diff options
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/timeseries.go | 112 | ||||
| -rw-r--r-- | internal/statsengine/timeseries_test.go | 65 |
2 files changed, 177 insertions, 0 deletions
diff --git a/internal/statsengine/timeseries.go b/internal/statsengine/timeseries.go new file mode 100644 index 0000000..b2846a4 --- /dev/null +++ b/internal/statsengine/timeseries.go @@ -0,0 +1,112 @@ +package statsengine + +import "time" + +const ( + timeSeriesSlotsDefault = 120 +) + +var timeSeriesSlotWidthDefault = 500 * time.Millisecond + +type timeSeriesSlot struct { + key int64 + sum float64 + count uint64 +} + +// ringTimeSeries stores fixed-width time buckets in a circular buffer. +type ringTimeSeries struct { + slots []timeSeriesSlot + slotSize time.Duration + lastKey int64 + hasData bool +} + +func newRingTimeSeries() *ringTimeSeries { + return newRingTimeSeriesWithConfig(timeSeriesSlotWidthDefault, timeSeriesSlotsDefault) +} + +func newRingTimeSeriesWithConfig(slotSize time.Duration, slots int) *ringTimeSeries { + if slotSize <= 0 { + slotSize = timeSeriesSlotWidthDefault + } + if slots <= 0 { + slots = timeSeriesSlotsDefault + } + + return &ringTimeSeries{ + slots: make([]timeSeriesSlot, slots), + slotSize: slotSize, + } +} + +func (r *ringTimeSeries) Add(value float64, t time.Time) { + if r == nil { + return + } + + key := r.slotKey(t) + if r.isTooOld(key) { + return + } + if !r.hasData || key > r.lastKey { + r.lastKey = key + r.hasData = true + } + + idx := r.slotIndex(key) + r.resetSlotIfNeeded(idx, key) + r.slots[idx].sum += value + r.slots[idx].count++ +} + +func (r *ringTimeSeries) Values() []float64 { + if r == nil { + return nil + } + + result := make([]float64, len(r.slots)) + if !r.hasData { + return result + } + + start := r.lastKey - int64(len(r.slots)-1) + for i := range result { + key := start + int64(i) + idx := r.slotIndex(key) + slot := r.slots[idx] + if slot.key != key || slot.count == 0 { + continue + } + result[i] = slot.sum / float64(slot.count) + } + + return result +} + +func (r *ringTimeSeries) slotKey(t time.Time) int64 { + return t.UnixNano() / r.slotSize.Nanoseconds() +} + +func (r *ringTimeSeries) isTooOld(key int64) bool { + if !r.hasData { + return false + } + minKey := r.lastKey - int64(len(r.slots)-1) + return key < minKey +} + +func (r *ringTimeSeries) slotIndex(key int64) int { + i := int(key % int64(len(r.slots))) + if i < 0 { + i += len(r.slots) + } + return i +} + +func (r *ringTimeSeries) resetSlotIfNeeded(idx int, key int64) { + if r.slots[idx].key == key { + return + } + r.slots[idx] = timeSeriesSlot{key: key} +} diff --git a/internal/statsengine/timeseries_test.go b/internal/statsengine/timeseries_test.go new file mode 100644 index 0000000..bb11157 --- /dev/null +++ b/internal/statsengine/timeseries_test.go @@ -0,0 +1,65 @@ +package statsengine + +import ( + "reflect" + "testing" + "time" +) + +func TestRingTimeSeriesAveragesWithinSlot(t *testing.T) { + r := newRingTimeSeriesWithConfig(time.Second, 4) + base := time.Unix(0, 0) + + r.Add(10, base) + r.Add(20, base.Add(400*time.Millisecond)) + + got := r.Values() + want := []float64{0, 0, 0, 15} + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected values: got %v want %v", got, want) + } +} + +func TestRingTimeSeriesWrapAround(t *testing.T) { + r := newRingTimeSeriesWithConfig(time.Second, 4) + base := time.Unix(0, 0) + + for i := 0; i < 6; i++ { + r.Add(float64(i+1), base.Add(time.Duration(i)*time.Second)) + } + + got := r.Values() + want := []float64{3, 4, 5, 6} + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected wrap-around values: got %v want %v", got, want) + } +} + +func TestRingTimeSeriesGapHandling(t *testing.T) { + r := newRingTimeSeriesWithConfig(time.Second, 4) + base := time.Unix(0, 0) + + r.Add(10, base) + r.Add(40, base.Add(3*time.Second)) + + got := r.Values() + want := []float64{10, 0, 0, 40} + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected gap values: got %v want %v", got, want) + } +} + +func TestRingTimeSeriesIgnoresTooOldSamples(t *testing.T) { + r := newRingTimeSeriesWithConfig(time.Second, 4) + base := time.Unix(0, 0) + + r.Add(1, base) + r.Add(5, base.Add(5*time.Second)) + r.Add(99, base) + + got := r.Values() + want := []float64{0, 0, 0, 5} + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected values with old sample: got %v want %v", got, want) + } +} |
