summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/parquet/recorder.go7
-rw-r--r--internal/parquet/recorder_test.go21
2 files changed, 27 insertions, 1 deletions
diff --git a/internal/parquet/recorder.go b/internal/parquet/recorder.go
index f489b68..4c201e9 100644
--- a/internal/parquet/recorder.go
+++ b/internal/parquet/recorder.go
@@ -73,6 +73,7 @@ type recordingSession struct {
mu sync.Mutex
accepting bool
stopCause error
+ doneErr error
stopOnce sync.Once
}
@@ -161,7 +162,10 @@ func (r *Recorder) Stop() error {
}
session.stop(nil)
- return <-session.doneC
+ if err := <-session.doneC; err != nil {
+ return err
+ }
+ return session.doneErr
}
// Status returns a snapshot of the recorder state.
@@ -268,6 +272,7 @@ func (r *Recorder) abortSession(session *recordingSession, writer rowWriter, err
func (r *Recorder) completeSession(session *recordingSession, err error) {
r.finishSession(session, err)
+ session.doneErr = err
session.doneC <- err
close(session.doneC)
}
diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go
index d4caa11..66c22a3 100644
--- a/internal/parquet/recorder_test.go
+++ b/internal/parquet/recorder_test.go
@@ -111,6 +111,27 @@ func TestRecorderFailsOnQueueOverflow(t *testing.T) {
}
}
+func TestRecorderStopReturnsTerminalErrorOnRepeatedCalls(t *testing.T) {
+ recorder := NewRecorder(RecorderConfig{})
+ session := newRecordingSession(1)
+ session.doneErr = ErrRecorderQueueFull
+ close(session.doneC)
+
+ recorder.mu.Lock()
+ recorder.active = session
+ recorder.status = Status{
+ Active: true,
+ LastError: ErrRecorderQueueFull,
+ }
+ recorder.mu.Unlock()
+
+ for i := 0; i < 2; i++ {
+ if err := recorder.Stop(); !errors.Is(err, ErrRecorderQueueFull) {
+ t.Fatalf("Stop() call %d error = %v, want %v", i+1, err, ErrRecorderQueueFull)
+ }
+ }
+}
+
func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row {
return streamrow.Row{
Seq: seq,