summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/timeseries.go112
-rw-r--r--internal/statsengine/timeseries_test.go65
2 files changed, 177 insertions, 0 deletions
diff --git a/internal/statsengine/timeseries.go b/internal/statsengine/timeseries.go
new file mode 100644
index 0000000..b2846a4
--- /dev/null
+++ b/internal/statsengine/timeseries.go
@@ -0,0 +1,112 @@
+package statsengine
+
+import "time"
+
+const (
+ timeSeriesSlotsDefault = 120
+)
+
+var timeSeriesSlotWidthDefault = 500 * time.Millisecond
+
+type timeSeriesSlot struct {
+ key int64
+ sum float64
+ count uint64
+}
+
+// ringTimeSeries stores fixed-width time buckets in a circular buffer.
+type ringTimeSeries struct {
+ slots []timeSeriesSlot
+ slotSize time.Duration
+ lastKey int64
+ hasData bool
+}
+
+func newRingTimeSeries() *ringTimeSeries {
+ return newRingTimeSeriesWithConfig(timeSeriesSlotWidthDefault, timeSeriesSlotsDefault)
+}
+
+func newRingTimeSeriesWithConfig(slotSize time.Duration, slots int) *ringTimeSeries {
+ if slotSize <= 0 {
+ slotSize = timeSeriesSlotWidthDefault
+ }
+ if slots <= 0 {
+ slots = timeSeriesSlotsDefault
+ }
+
+ return &ringTimeSeries{
+ slots: make([]timeSeriesSlot, slots),
+ slotSize: slotSize,
+ }
+}
+
+func (r *ringTimeSeries) Add(value float64, t time.Time) {
+ if r == nil {
+ return
+ }
+
+ key := r.slotKey(t)
+ if r.isTooOld(key) {
+ return
+ }
+ if !r.hasData || key > r.lastKey {
+ r.lastKey = key
+ r.hasData = true
+ }
+
+ idx := r.slotIndex(key)
+ r.resetSlotIfNeeded(idx, key)
+ r.slots[idx].sum += value
+ r.slots[idx].count++
+}
+
+func (r *ringTimeSeries) Values() []float64 {
+ if r == nil {
+ return nil
+ }
+
+ result := make([]float64, len(r.slots))
+ if !r.hasData {
+ return result
+ }
+
+ start := r.lastKey - int64(len(r.slots)-1)
+ for i := range result {
+ key := start + int64(i)
+ idx := r.slotIndex(key)
+ slot := r.slots[idx]
+ if slot.key != key || slot.count == 0 {
+ continue
+ }
+ result[i] = slot.sum / float64(slot.count)
+ }
+
+ return result
+}
+
+func (r *ringTimeSeries) slotKey(t time.Time) int64 {
+ return t.UnixNano() / r.slotSize.Nanoseconds()
+}
+
+func (r *ringTimeSeries) isTooOld(key int64) bool {
+ if !r.hasData {
+ return false
+ }
+ minKey := r.lastKey - int64(len(r.slots)-1)
+ return key < minKey
+}
+
+func (r *ringTimeSeries) slotIndex(key int64) int {
+ i := int(key % int64(len(r.slots)))
+ if i < 0 {
+ i += len(r.slots)
+ }
+ return i
+}
+
+func (r *ringTimeSeries) resetSlotIfNeeded(idx int, key int64) {
+ if r.slots[idx].key == key {
+ return
+ }
+ r.slots[idx] = timeSeriesSlot{key: key}
+}
diff --git a/internal/statsengine/timeseries_test.go b/internal/statsengine/timeseries_test.go
new file mode 100644
index 0000000..bb11157
--- /dev/null
+++ b/internal/statsengine/timeseries_test.go
@@ -0,0 +1,65 @@
+package statsengine
+
+import (
+ "reflect"
+ "testing"
+ "time"
+)
+
+func TestRingTimeSeriesAveragesWithinSlot(t *testing.T) {
+ r := newRingTimeSeriesWithConfig(time.Second, 4)
+ base := time.Unix(0, 0)
+
+ r.Add(10, base)
+ r.Add(20, base.Add(400*time.Millisecond))
+
+ got := r.Values()
+ want := []float64{0, 0, 0, 15}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("unexpected values: got %v want %v", got, want)
+ }
+}
+
+func TestRingTimeSeriesWrapAround(t *testing.T) {
+ r := newRingTimeSeriesWithConfig(time.Second, 4)
+ base := time.Unix(0, 0)
+
+ for i := 0; i < 6; i++ {
+ r.Add(float64(i+1), base.Add(time.Duration(i)*time.Second))
+ }
+
+ got := r.Values()
+ want := []float64{3, 4, 5, 6}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("unexpected wrap-around values: got %v want %v", got, want)
+ }
+}
+
+func TestRingTimeSeriesGapHandling(t *testing.T) {
+ r := newRingTimeSeriesWithConfig(time.Second, 4)
+ base := time.Unix(0, 0)
+
+ r.Add(10, base)
+ r.Add(40, base.Add(3*time.Second))
+
+ got := r.Values()
+ want := []float64{10, 0, 0, 40}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("unexpected gap values: got %v want %v", got, want)
+ }
+}
+
+func TestRingTimeSeriesIgnoresTooOldSamples(t *testing.T) {
+ r := newRingTimeSeriesWithConfig(time.Second, 4)
+ base := time.Unix(0, 0)
+
+ r.Add(1, base)
+ r.Add(5, base.Add(5*time.Second))
+ r.Add(99, base)
+
+ got := r.Values()
+ want := []float64{0, 0, 0, 5}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("unexpected values with old sample: got %v want %v", got, want)
+ }
+}