summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/eventloop_test.go166
-rw-r--r--internal/stats/patterns.go142
-rw-r--r--internal/stats/stats.go170
3 files changed, 478 insertions, 0 deletions
diff --git a/internal/eventloop_test.go b/internal/eventloop_test.go
index 71ef352..a07c511 100644
--- a/internal/eventloop_test.go
+++ b/internal/eventloop_test.go
@@ -26,6 +26,12 @@ func TestEventloop(t *testing.T) {
"OpenEventTest1": makeOpenEventTestData1(t),
"OpenEventTest2": makeOpenEventTestData2(t),
"OpenEventTest3": makeOpenEventTestData3(t),
+ // FdEvent tests
+ "ReadEventTest": makeReadEventTestData(t),
+ "WriteEventTest": makeWriteEventTestData(t),
+ "CloseEventTest": makeCloseEventTestData(t),
+ // PathEvent tests
+ "MkdirEventTest": makeMkdirEventTestData(t),
}
ctx, cancel := context.WithCancel(context.Background())
@@ -173,3 +179,163 @@ func makeExitOpenEvent(t *testing.T, time uint64, pid, tid uint32) (types.RetEve
}
return ev, bytes
}
+
+// Helper functions for FdEvent
+func makeEnterFdEvent(t *testing.T, time uint64, pid, tid uint32, fd int32, traceId types.TraceId) (types.FdEvent, []byte) {
+ ev := types.FdEvent{
+ EventType: types.ENTER_FD_EVENT,
+ TraceId: traceId,
+ Time: time,
+ Pid: pid,
+ Tid: tid,
+ Fd: fd,
+ }
+
+ bytes, err := ev.Bytes()
+ if err != nil {
+ t.Error(err)
+ }
+ return ev, bytes
+}
+
+func makeExitFdEvent(t *testing.T, time uint64, pid, tid uint32, fd int32, traceId types.TraceId) (types.FdEvent, []byte) {
+ ev := types.FdEvent{
+ EventType: types.EXIT_FD_EVENT,
+ TraceId: traceId,
+ Time: time,
+ Pid: pid,
+ Tid: tid,
+ Fd: fd,
+ }
+
+ bytes, err := ev.Bytes()
+ if err != nil {
+ t.Error(err)
+ }
+ return ev, bytes
+}
+
+// Helper function to create exit RetEvent
+func makeExitRetEvent(t *testing.T, time uint64, pid, tid uint32, traceId types.TraceId, ret int64) (types.RetEvent, []byte) {
+ ev := types.RetEvent{
+ EventType: types.EXIT_RET_EVENT,
+ TraceId: traceId,
+ Time: time,
+ Ret: ret,
+ Pid: pid,
+ Tid: tid,
+ }
+
+ bytes, err := ev.Bytes()
+ if err != nil {
+ t.Error(err)
+ }
+ return ev, bytes
+}
+
+// Test data functions for FdEvent syscalls
+func makeReadEventTestData(t *testing.T) (td testData) {
+ fd := int32(42) // Assume file descriptor 42
+ enterEv, enterEvBytes := makeEnterFdEvent(t, defaulTime, defaultPid, defaultTid, fd, types.SYS_ENTER_READ)
+ td.rawTracepoints = append(td.rawTracepoints, enterEvBytes)
+
+ exitEv, exitEvBytes := makeExitFdEvent(t, defaulTime+100, defaultPid, defaultTid, fd, types.SYS_EXIT_READ)
+ td.rawTracepoints = append(td.rawTracepoints, exitEvBytes)
+
+ td.validates = append(td.validates, func(t *testing.T, el *eventLoop, ep *event.Pair) {
+ if !enterEv.Equals(ep.EnterEv) {
+ t.Errorf("Expected '%v' but got '%v'", enterEv, ep.EnterEv)
+ }
+ if !exitEv.Equals(ep.ExitEv) {
+ t.Errorf("Expected '%v' but got '%v'", exitEv, ep.ExitEv)
+ }
+ })
+
+ return td
+}
+
+func makeWriteEventTestData(t *testing.T) (td testData) {
+ fd := int32(43)
+ enterEv, enterEvBytes := makeEnterFdEvent(t, defaulTime, defaultPid, defaultTid, fd, types.SYS_ENTER_WRITE)
+ td.rawTracepoints = append(td.rawTracepoints, enterEvBytes)
+
+ exitEv, exitEvBytes := makeExitFdEvent(t, defaulTime+100, defaultPid, defaultTid, fd, types.SYS_EXIT_WRITE)
+ td.rawTracepoints = append(td.rawTracepoints, exitEvBytes)
+
+ td.validates = append(td.validates, func(t *testing.T, el *eventLoop, ep *event.Pair) {
+ if !enterEv.Equals(ep.EnterEv) {
+ t.Errorf("Expected '%v' but got '%v'", enterEv, ep.EnterEv)
+ }
+ if !exitEv.Equals(ep.ExitEv) {
+ t.Errorf("Expected '%v' but got '%v'", exitEv, ep.ExitEv)
+ }
+ })
+
+ return td
+}
+
+func makeCloseEventTestData(t *testing.T) (td testData) {
+ fd := int32(44)
+ enterEv, enterEvBytes := makeEnterFdEvent(t, defaulTime, defaultPid, defaultTid, fd, types.SYS_ENTER_CLOSE)
+ td.rawTracepoints = append(td.rawTracepoints, enterEvBytes)
+
+ exitEv, exitEvBytes := makeExitFdEvent(t, defaulTime+100, defaultPid, defaultTid, fd, types.SYS_EXIT_CLOSE)
+ td.rawTracepoints = append(td.rawTracepoints, exitEvBytes)
+
+ td.validates = append(td.validates, func(t *testing.T, el *eventLoop, ep *event.Pair) {
+ if !enterEv.Equals(ep.EnterEv) {
+ t.Errorf("Expected '%v' but got '%v'", enterEv, ep.EnterEv)
+ }
+ if !exitEv.Equals(ep.ExitEv) {
+ t.Errorf("Expected '%v' but got '%v'", exitEv, ep.ExitEv)
+ }
+ })
+
+ return td
+}
+
+// Helper functions for PathEvent
+func makeEnterPathEvent(t *testing.T, time uint64, pid, tid uint32, pathname string, traceId types.TraceId) (types.PathEvent, []byte) {
+ ev := types.PathEvent{
+ EventType: types.ENTER_PATH_EVENT,
+ TraceId: traceId,
+ Time: time,
+ Pid: pid,
+ Tid: tid,
+ Pathname: [types.MAX_FILENAME_LENGTH]byte{},
+ }
+ copy(ev.Pathname[:], pathname)
+
+ bytes, err := ev.Bytes()
+ if err != nil {
+ t.Error(err)
+ }
+ return ev, bytes
+}
+
+// Test data functions for PathEvent syscalls
+func makeMkdirEventTestData(t *testing.T) (td testData) {
+ pathname := "/tmp/testdir"
+ enterEv, enterEvBytes := makeEnterPathEvent(t, defaulTime, defaultPid, defaultTid, pathname, types.SYS_ENTER_MKDIR)
+ td.rawTracepoints = append(td.rawTracepoints, enterEvBytes)
+
+ exitEv, exitEvBytes := makeExitRetEvent(t, defaulTime+100, defaultPid, defaultTid, types.SYS_EXIT_MKDIR, 0)
+ td.rawTracepoints = append(td.rawTracepoints, exitEvBytes)
+
+ td.validates = append(td.validates, func(t *testing.T, el *eventLoop, ep *event.Pair) {
+ if !enterEv.Equals(ep.EnterEv) {
+ t.Errorf("Expected '%v' but got '%v'", enterEv, ep.EnterEv)
+ }
+ if !exitEv.Equals(ep.ExitEv) {
+ t.Errorf("Expected '%v' but got '%v'", exitEv, ep.ExitEv)
+ }
+ filenameA := types.StringValue(enterEv.Pathname[:])
+ if ep.File == nil {
+ t.Errorf("Expected file to be set")
+ } else if ep.File.Name() != filenameA {
+ t.Errorf("Expected file name '%v' but got '%v'", filenameA, ep.File.Name())
+ }
+ })
+
+ return td
+}
diff --git a/internal/stats/patterns.go b/internal/stats/patterns.go
new file mode 100644
index 0000000..06e906f
--- /dev/null
+++ b/internal/stats/patterns.go
@@ -0,0 +1,142 @@
+package stats
+
+import (
+ "math"
+ "time"
+)
+
+// PatternAnalysis represents the analysis of I/O patterns
+type PatternAnalysis struct {
+ // Burst detection
+ BurstCount int
+ BurstDuration time.Duration
+ BurstThreshold int64
+
+ // Periodicity detection
+ IsPeriodic bool
+ PeriodDuration time.Duration
+
+ // Trend analysis
+ TrendSlope float64
+ TrendDirection string // "increasing", "decreasing", "stable"
+}
+
+// AnalyzePatterns analyzes the I/O patterns from the statistics
+func (s *IOStats) AnalyzePatterns() *PatternAnalysis {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ analysis := &PatternAnalysis{
+ BurstThreshold: 1000, // Configurable threshold
+ }
+
+ // Analyze bursts
+ analysis.analyzeBursts(s.bytesHistory, s.timestamps)
+
+ // Analyze periodicity
+ analysis.analyzePeriodicity(s.bytesHistory, s.timestamps)
+
+ // Analyze trends
+ analysis.analyzeTrend(s.bytesHistory, s.timestamps)
+
+ return analysis
+}
+
+func (a *PatternAnalysis) analyzeBursts(bytes []int64, timestamps []time.Time) {
+ if len(bytes) < 2 {
+ return
+ }
+
+ var currentBurstSize int64
+ var burstStart time.Time
+ inBurst := false
+
+ for i := 0; i < len(bytes); i++ {
+ if bytes[i] > a.BurstThreshold {
+ if !inBurst {
+ burstStart = timestamps[i]
+ inBurst = true
+ }
+ currentBurstSize += bytes[i]
+ } else if inBurst {
+ a.BurstCount++
+ a.BurstDuration += timestamps[i].Sub(burstStart)
+ inBurst = false
+ currentBurstSize = 0
+ }
+ }
+}
+
+func (a *PatternAnalysis) analyzePeriodicity(bytes []int64, timestamps []time.Time) {
+ if len(bytes) < 4 {
+ return
+ }
+
+ // Simple periodicity detection using autocorrelation
+ mean := 0.0
+ for _, b := range bytes {
+ mean += float64(b)
+ }
+ mean /= float64(len(bytes))
+
+ var maxCorrelation float64
+ var bestPeriod int
+
+ // Check for periods up to half the data length
+ for period := 1; period < len(bytes)/2; period++ {
+ correlation := 0.0
+ count := 0
+
+ for i := 0; i < len(bytes)-period; i++ {
+ correlation += (float64(bytes[i]) - mean) * (float64(bytes[i+period]) - mean)
+ count++
+ }
+
+ if count > 0 {
+ correlation /= float64(count)
+ if correlation > maxCorrelation {
+ maxCorrelation = correlation
+ bestPeriod = period
+ }
+ }
+ }
+
+ // If we found a strong correlation, consider it periodic
+ if maxCorrelation > 0.7 { // Threshold for periodicity
+ a.IsPeriodic = true
+ if bestPeriod > 0 && bestPeriod < len(timestamps)-1 {
+ a.PeriodDuration = timestamps[bestPeriod].Sub(timestamps[0])
+ }
+ }
+}
+
+func (a *PatternAnalysis) analyzeTrend(bytes []int64, timestamps []time.Time) {
+ if len(bytes) < 2 {
+ return
+ }
+
+ // Simple linear regression
+ var sumX, sumY, sumXY, sumX2 float64
+ n := float64(len(bytes))
+
+ for i := 0; i < len(bytes); i++ {
+ x := float64(timestamps[i].UnixNano())
+ y := float64(bytes[i])
+ sumX += x
+ sumY += y
+ sumXY += x * y
+ sumX2 += x * x
+ }
+
+ // Calculate slope
+ a.TrendSlope = (n*sumXY - sumX*sumY) / (n*sumX2 - sumX*sumX)
+
+ // Determine trend direction
+ if math.Abs(a.TrendSlope) < 0.1 {
+ a.TrendDirection = "stable"
+ } else if a.TrendSlope > 0 {
+ a.TrendDirection = "increasing"
+ } else {
+ a.TrendDirection = "decreasing"
+ }
+}
diff --git a/internal/stats/stats.go b/internal/stats/stats.go
new file mode 100644
index 0000000..cd850a3
--- /dev/null
+++ b/internal/stats/stats.go
@@ -0,0 +1,170 @@
+package stats
+
+import (
+ "math"
+ "sort"
+ "sync"
+ "time"
+)
+
+// IOStats represents statistical data for I/O operations
+type IOStats struct {
+ mu sync.RWMutex
+
+ // Basic metrics
+ Count int64
+ TotalBytes int64
+ TotalLatency time.Duration
+
+ // Latency statistics
+ MinLatency time.Duration
+ MaxLatency time.Duration
+ MeanLatency time.Duration
+ MedianLatency time.Duration
+ P95Latency time.Duration
+ P99Latency time.Duration
+
+ // Throughput statistics
+ MinBytes int64
+ MaxBytes int64
+ MeanBytes float64
+ MedianBytes int64
+ P95Bytes int64
+ P99Bytes int64
+
+ // Time series data for trend analysis
+ latencyHistory []time.Duration
+ bytesHistory []int64
+ timestamps []time.Time
+}
+
+// NewIOStats creates a new IOStats instance
+func NewIOStats() *IOStats {
+ return &IOStats{
+ MinLatency: time.Duration(math.MaxInt64),
+ MinBytes: math.MaxInt64,
+ }
+}
+
+// AddOperation adds a new I/O operation to the statistics
+func (s *IOStats) AddOperation(bytes int64, latency time.Duration) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ now := time.Now()
+ s.Count++
+ s.TotalBytes += bytes
+ s.TotalLatency += latency
+
+ // Update latency statistics
+ if latency < s.MinLatency {
+ s.MinLatency = latency
+ }
+ if latency > s.MaxLatency {
+ s.MaxLatency = latency
+ }
+
+ // Update bytes statistics
+ if bytes < s.MinBytes {
+ s.MinBytes = bytes
+ }
+ if bytes > s.MaxBytes {
+ s.MaxBytes = bytes
+ }
+
+ // Store history
+ s.latencyHistory = append(s.latencyHistory, latency)
+ s.bytesHistory = append(s.bytesHistory, bytes)
+ s.timestamps = append(s.timestamps, now)
+
+ // Calculate percentiles if we have enough data
+ if len(s.latencyHistory) > 0 {
+ s.calculatePercentiles()
+ }
+}
+
+// calculatePercentiles calculates various statistical measures
+func (s *IOStats) calculatePercentiles() {
+ // Sort the data for percentile calculations
+ latencies := make([]time.Duration, len(s.latencyHistory))
+ copy(latencies, s.latencyHistory)
+ sort.Slice(latencies, func(i, j int) bool {
+ return latencies[i] < latencies[j]
+ })
+
+ bytes := make([]int64, len(s.bytesHistory))
+ copy(bytes, s.bytesHistory)
+ sort.Slice(bytes, func(i, j int) bool {
+ return bytes[i] < bytes[j]
+ })
+
+ // Calculate mean
+ var totalLatency time.Duration
+ var totalBytes int64
+ for i := range latencies {
+ totalLatency += latencies[i]
+ totalBytes += bytes[i]
+ }
+ s.MeanLatency = totalLatency / time.Duration(len(latencies))
+ s.MeanBytes = float64(totalBytes) / float64(len(bytes))
+
+ // Calculate median and percentiles
+ mid := len(latencies) / 2
+ s.MedianLatency = latencies[mid]
+ s.MedianBytes = bytes[mid]
+
+ p95Index := int(float64(len(latencies)) * 0.95)
+ p99Index := int(float64(len(latencies)) * 0.99)
+ if p95Index < len(latencies) {
+ s.P95Latency = latencies[p95Index]
+ s.P95Bytes = bytes[p95Index]
+ }
+ if p99Index < len(latencies) {
+ s.P99Latency = latencies[p99Index]
+ s.P99Bytes = bytes[p99Index]
+ }
+}
+
+// GetStats returns a map of all statistics
+func (s *IOStats) GetStats() map[string]interface{} {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return map[string]interface{}{
+ "count": s.Count,
+ "total_bytes": s.TotalBytes,
+ "total_latency": s.TotalLatency,
+ "latency": map[string]interface{}{
+ "min": s.MinLatency,
+ "max": s.MaxLatency,
+ "mean": s.MeanLatency,
+ "median": s.MedianLatency,
+ "p95": s.P95Latency,
+ "p99": s.P99Latency,
+ },
+ "bytes": map[string]interface{}{
+ "min": s.MinBytes,
+ "max": s.MaxBytes,
+ "mean": s.MeanBytes,
+ "median": s.MedianBytes,
+ "p95": s.P95Bytes,
+ "p99": s.P99Bytes,
+ },
+ }
+}
+
+// GetTimeSeriesData returns the historical data for trend analysis
+func (s *IOStats) GetTimeSeriesData() ([]time.Time, []time.Duration, []int64) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ timestamps := make([]time.Time, len(s.timestamps))
+ latencies := make([]time.Duration, len(s.latencyHistory))
+ bytes := make([]int64, len(s.bytesHistory))
+
+ copy(timestamps, s.timestamps)
+ copy(latencies, s.latencyHistory)
+ copy(bytes, s.bytesHistory)
+
+ return timestamps, latencies, bytes
+}