diff options
| -rw-r--r-- | internal/parquet/recorder.go | 7 | ||||
| -rw-r--r-- | internal/parquet/recorder_test.go | 21 |
2 files changed, 27 insertions, 1 deletions
diff --git a/internal/parquet/recorder.go b/internal/parquet/recorder.go index f489b68..4c201e9 100644 --- a/internal/parquet/recorder.go +++ b/internal/parquet/recorder.go @@ -73,6 +73,7 @@ type recordingSession struct { mu sync.Mutex accepting bool stopCause error + doneErr error stopOnce sync.Once } @@ -161,7 +162,10 @@ func (r *Recorder) Stop() error { } session.stop(nil) - return <-session.doneC + if err := <-session.doneC; err != nil { + return err + } + return session.doneErr } // Status returns a snapshot of the recorder state. @@ -268,6 +272,7 @@ func (r *Recorder) abortSession(session *recordingSession, writer rowWriter, err func (r *Recorder) completeSession(session *recordingSession, err error) { r.finishSession(session, err) + session.doneErr = err session.doneC <- err close(session.doneC) } diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go index d4caa11..66c22a3 100644 --- a/internal/parquet/recorder_test.go +++ b/internal/parquet/recorder_test.go @@ -111,6 +111,27 @@ func TestRecorderFailsOnQueueOverflow(t *testing.T) { } } +func TestRecorderStopReturnsTerminalErrorOnRepeatedCalls(t *testing.T) { + recorder := NewRecorder(RecorderConfig{}) + session := newRecordingSession(1) + session.doneErr = ErrRecorderQueueFull + close(session.doneC) + + recorder.mu.Lock() + recorder.active = session + recorder.status = Status{ + Active: true, + LastError: ErrRecorderQueueFull, + } + recorder.mu.Unlock() + + for i := 0; i < 2; i++ { + if err := recorder.Stop(); !errors.Is(err, ErrRecorderQueueFull) { + t.Fatalf("Stop() call %d error = %v, want %v", i+1, err, ErrRecorderQueueFull) + } + } +} + func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row { return streamrow.Row{ Seq: seq, |
