summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-26 20:57:53 +0300
committerPaul Buetow <paul@buetow.org>2025-06-26 20:57:53 +0300
commit4a657e44e7111d7d3b9a9ba5e453901e19af2ecb (patch)
tree5cc8571e00a29ab7656633984fb9893ca369ccec /internal/io
parentee5250441a2d241dc1a980dfd051a12f2db898cf (diff)
fix: resolve package conflicts and update documentation
- Move main package files to benchmarks/cmd/ to fix test failures - Update CLAUDE.md with comprehensive benchmarking and profiling instructions - Fix unused imports in serverless.go - Remove experimental buffered pipe/copy implementations - Remove outdated documentation files All integration tests now pass successfully. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/bufferedcopy/bufferedcopy.go82
-rw-r--r--internal/io/bufferedpipe/bufferedpipe.go204
-rw-r--r--internal/io/bufferedpipe/bufferedpipe_test.go223
3 files changed, 0 insertions, 509 deletions
diff --git a/internal/io/bufferedcopy/bufferedcopy.go b/internal/io/bufferedcopy/bufferedcopy.go
deleted file mode 100644
index a139942..0000000
--- a/internal/io/bufferedcopy/bufferedcopy.go
+++ /dev/null
@@ -1,82 +0,0 @@
-// Package bufferedcopy provides a safe bidirectional copy operation that prevents deadlocks
-package bufferedcopy
-
-import (
- "context"
- "io"
- "sync"
-)
-
-// BidirectionalCopy performs bidirectional copying between two io.ReadWriter interfaces
-// using goroutines and channels to prevent deadlocks that occur with direct io.Copy
-func BidirectionalCopy(ctx context.Context, a, b io.ReadWriter) error {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- errChan := make(chan error, 2)
- var wg sync.WaitGroup
- wg.Add(2)
-
- // Copy from A to B
- go func() {
- defer wg.Done()
- err := copyWithContext(ctx, b, a)
- if err != nil && err != context.Canceled {
- errChan <- err
- cancel()
- }
- }()
-
- // Copy from B to A
- go func() {
- defer wg.Done()
- err := copyWithContext(ctx, a, b)
- if err != nil && err != context.Canceled {
- errChan <- err
- cancel()
- }
- }()
-
- // Wait for completion
- wg.Wait()
- close(errChan)
-
- // Return first error if any
- for err := range errChan {
- return err
- }
-
- return nil
-}
-
-// copyWithContext performs io.Copy with context cancellation support
-func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) error {
- // Use a reasonable buffer size
- buf := make([]byte, 32*1024)
-
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- nr, readErr := src.Read(buf)
- if nr > 0 {
- nw, writeErr := dst.Write(buf[:nr])
- if writeErr != nil {
- return writeErr
- }
- if nw != nr {
- return io.ErrShortWrite
- }
- }
-
- if readErr != nil {
- if readErr == io.EOF {
- return nil
- }
- return readErr
- }
- }
-} \ No newline at end of file
diff --git a/internal/io/bufferedpipe/bufferedpipe.go b/internal/io/bufferedpipe/bufferedpipe.go
deleted file mode 100644
index 3207778..0000000
--- a/internal/io/bufferedpipe/bufferedpipe.go
+++ /dev/null
@@ -1,204 +0,0 @@
-// Package bufferedpipe provides a bidirectional pipe with buffering to prevent deadlocks
-package bufferedpipe
-
-import (
- "io"
- "sync"
-)
-
-// BufferedPipe provides bidirectional data transfer with buffering to prevent deadlocks
-// that can occur with direct io.Copy operations in opposite directions.
-type BufferedPipe struct {
- bufferSize int
- done chan struct{}
- once sync.Once
-}
-
-// New creates a new BufferedPipe with the specified buffer size for each direction
-func New(bufferSize int) *BufferedPipe {
- return &BufferedPipe{
- bufferSize: bufferSize,
- done: make(chan struct{}),
- }
-}
-
-// ConnectBidirectional connects two io.ReadWriter interfaces bidirectionally
-// It returns when either side closes or an error occurs
-func (bp *BufferedPipe) ConnectBidirectional(a, b io.ReadWriter) error {
- var wg sync.WaitGroup
- wg.Add(2)
-
- errChan := make(chan error, 2)
-
- // Create buffered channels for data transfer
- aToB := make(chan []byte, 10)
- bToA := make(chan []byte, 10)
-
- // Goroutine to handle shutdown
- shutdown := make(chan struct{})
- go func() {
- select {
- case <-bp.done:
- case <-shutdown:
- }
- close(aToB)
- close(bToA)
- }()
-
- // Copy from A to B with buffering
- go func() {
- defer wg.Done()
- defer close(shutdown)
-
- buffer := make([]byte, bp.bufferSize)
- for {
- select {
- case <-bp.done:
- return
- default:
- }
-
- n, err := a.Read(buffer)
- if err != nil {
- if err != io.EOF {
- errChan <- err
- }
- return
- }
-
- if n > 0 {
- data := make([]byte, n)
- copy(data, buffer[:n])
-
- select {
- case aToB <- data:
- case <-bp.done:
- return
- }
- }
- }
- }()
-
- // Copy from B to A with buffering
- go func() {
- defer wg.Done()
-
- buffer := make([]byte, bp.bufferSize)
- for {
- select {
- case <-bp.done:
- return
- default:
- }
-
- n, err := b.Read(buffer)
- if err != nil {
- if err != io.EOF {
- errChan <- err
- }
- return
- }
-
- if n > 0 {
- data := make([]byte, n)
- copy(data, buffer[:n])
-
- select {
- case bToA <- data:
- case <-bp.done:
- return
- }
- }
- }
- }()
-
- // Writer goroutines
- go func() {
- for data := range aToB {
- _, err := b.Write(data)
- if err != nil {
- errChan <- err
- return
- }
- }
- }()
-
- go func() {
- for data := range bToA {
- _, err := a.Write(data)
- if err != nil {
- errChan <- err
- return
- }
- }
- }()
-
- // Wait for completion
- go func() {
- wg.Wait()
- close(errChan)
- }()
-
- // Return first error if any
- for err := range errChan {
- bp.Close()
- return err
- }
-
- return nil
-}
-
-// Close closes the BufferedPipe
-func (bp *BufferedPipe) Close() error {
- bp.once.Do(func() {
- close(bp.done)
- })
- return nil
-}
-
-// CopyBuffered performs a single direction copy with buffering to prevent deadlocks
-// This is a simpler alternative when only one direction is needed
-func CopyBuffered(dst io.Writer, src io.Reader, bufferSize int) (int64, error) {
- // Use a goroutine with a channel to buffer the data
- dataChan := make(chan []byte, 10)
- errChan := make(chan error, 1)
-
- // Reader goroutine
- go func() {
- defer close(dataChan)
- buffer := make([]byte, bufferSize)
-
- for {
- n, err := src.Read(buffer)
- if n > 0 {
- data := make([]byte, n)
- copy(data, buffer[:n])
- dataChan <- data
- }
- if err != nil {
- if err != io.EOF {
- errChan <- err
- }
- return
- }
- }
- }()
-
- // Writer (main goroutine)
- var written int64
- for data := range dataChan {
- n, err := dst.Write(data)
- written += int64(n)
- if err != nil {
- return written, err
- }
- }
-
- // Check for read errors
- select {
- case err := <-errChan:
- return written, err
- default:
- return written, nil
- }
-} \ No newline at end of file
diff --git a/internal/io/bufferedpipe/bufferedpipe_test.go b/internal/io/bufferedpipe/bufferedpipe_test.go
deleted file mode 100644
index 4ab1d5c..0000000
--- a/internal/io/bufferedpipe/bufferedpipe_test.go
+++ /dev/null
@@ -1,223 +0,0 @@
-package bufferedpipe
-
-import (
- "bytes"
- "fmt"
- "io"
- "sync"
- "testing"
- "time"
-)
-
-// TestBufferedPipeNoDeadlock verifies that BufferedPipe prevents deadlocks
-func TestBufferedPipeNoDeadlock(t *testing.T) {
- tests := []struct {
- name string
- dataSize int
- bufferSize int
- }{
- {
- name: "small_data",
- dataSize: 1024,
- bufferSize: 4096,
- },
- {
- name: "exact_buffer",
- dataSize: 4096,
- bufferSize: 4096,
- },
- {
- name: "large_data",
- dataSize: 1024 * 1024, // 1MB
- bufferSize: 64 * 1024, // 64KB buffer
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Create test data
- testData := bytes.Repeat([]byte("x"), tt.dataSize)
-
- // Create two buffers to act as endpoints
- var bufA, bufB bytes.Buffer
- bufA.Write(testData)
-
- // Create buffered pipe
- bp := New(tt.bufferSize)
-
- // Set up completion tracking
- done := make(chan error, 1)
-
- go func() {
- err := bp.ConnectBidirectional(&bufA, &bufB)
- done <- err
- }()
-
- // Wait for completion or timeout
- select {
- case err := <-done:
- if err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- // Verify data was transferred
- if bufB.Len() != tt.dataSize {
- t.Errorf("Data size mismatch: got %d, want %d", bufB.Len(), tt.dataSize)
- }
- case <-time.After(5 * time.Second):
- t.Fatal("Operation timed out - possible deadlock")
- }
- })
- }
-}
-
-// TestBidirectionalTransfer tests simultaneous bidirectional data transfer
-func TestBidirectionalTransfer(t *testing.T) {
- dataA := bytes.Repeat([]byte("A"), 100*1024) // 100KB from A
- dataB := bytes.Repeat([]byte("B"), 100*1024) // 100KB from B
-
- // Create endpoints
- endpointA := &mockEndpoint{
- toSend: dataA,
- received: new(bytes.Buffer),
- }
- endpointB := &mockEndpoint{
- toSend: dataB,
- received: new(bytes.Buffer),
- }
-
- // Create buffered pipe
- bp := New(32 * 1024) // 32KB buffer
-
- // Connect bidirectionally
- done := make(chan error, 1)
- go func() {
- err := bp.ConnectBidirectional(endpointA, endpointB)
- done <- err
- }()
-
- // Wait for completion
- select {
- case err := <-done:
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- case <-time.After(5 * time.Second):
- t.Fatal("Bidirectional transfer timed out")
- }
-
- // Verify data was exchanged correctly
- if !bytes.Equal(endpointA.received.Bytes(), dataB) {
- t.Error("Endpoint A did not receive correct data from B")
- }
- if !bytes.Equal(endpointB.received.Bytes(), dataA) {
- t.Error("Endpoint B did not receive correct data from A")
- }
-}
-
-// mockEndpoint simulates a bidirectional endpoint
-type mockEndpoint struct {
- toSend []byte
- sendPos int
- received *bytes.Buffer
- mu sync.Mutex
-}
-
-func (m *mockEndpoint) Read(p []byte) (int, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- if m.sendPos >= len(m.toSend) {
- return 0, io.EOF
- }
-
- n := copy(p, m.toSend[m.sendPos:])
- m.sendPos += n
- return n, nil
-}
-
-func (m *mockEndpoint) Write(p []byte) (int, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
- return m.received.Write(p)
-}
-
-// BenchmarkBufferedPipe benchmarks the buffered pipe performance
-func BenchmarkBufferedPipe(b *testing.B) {
- sizes := []int{
- 1024, // 1KB
- 64 * 1024, // 64KB
- 1024 * 1024, // 1MB
- }
-
- for _, size := range sizes {
- b.Run(fmt.Sprintf("size_%dKB", size/1024), func(b *testing.B) {
- testData := bytes.Repeat([]byte("x"), size)
-
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- var bufA, bufB bytes.Buffer
- bufA.Write(testData)
-
- bp := New(64 * 1024) // 64KB buffer
-
- done := make(chan error, 1)
- go func() {
- err := bp.ConnectBidirectional(&bufA, &bufB)
- done <- err
- }()
-
- select {
- case <-done:
- // Success
- case <-time.After(1 * time.Second):
- b.Fatal("Transfer timed out")
- }
- }
- })
- }
-}
-
-// TestPipeBufferOperations tests the internal pipe buffer operations
-func TestPipeBufferOperations(t *testing.T) {
- p := newPipe(10) // Small buffer for testing
-
- // Test write and read
- data := []byte("hello")
- n, err := p.write(data)
- if err != nil || n != len(data) {
- t.Fatalf("Write failed: %v, wrote %d bytes", err, n)
- }
-
- buf := make([]byte, 10)
- n, err = p.read(buf)
- if err != nil || n != len(data) {
- t.Fatalf("Read failed: %v, read %d bytes", err, n)
- }
-
- if !bytes.Equal(buf[:n], data) {
- t.Errorf("Data mismatch: got %s, want %s", buf[:n], data)
- }
-
- // Test wrap-around
- data2 := []byte("world123") // This will wrap around
- n, err = p.write(data2)
- if err != nil || n != len(data2) {
- t.Fatalf("Write wrap-around failed: %v, wrote %d bytes", err, n)
- }
-
- n, err = p.read(buf)
- if err != nil || n != len(data2) {
- t.Fatalf("Read wrap-around failed: %v, read %d bytes", err, n)
- }
-
- if !bytes.Equal(buf[:n], data2) {
- t.Errorf("Wrap-around data mismatch: got %s, want %s", buf[:n], data2)
- }
-
- // Test close
- p.close()
- _, err = p.read(buf)
- if err != io.EOF {
- t.Errorf("Expected EOF after close, got %v", err)
- }
-} \ No newline at end of file