summaryrefslogtreecommitdiff
path: root/internal/io/fs/chunkedreader.go
blob: 7775a582880916a3fc0a240a8082f77004752b80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package fs

import (
	"bytes"
	"context"
	"io"
	"time"

	"github.com/mimecast/dtail/internal/constants"
	"github.com/mimecast/dtail/internal/io/dlog"
	"github.com/mimecast/dtail/internal/io/pool"
)

// Reusable timer to reduce allocations - PBO optimization
var sharedTimer = time.NewTimer(constants.ProcessorSleepDuration)

// ChunkedReader reads data in large chunks and processes it line by line
// This replaces the byte-by-byte reading approach for better performance
type ChunkedReader struct {
	reader     io.Reader
	buffer     []byte
	remaining  []byte // Partial line from previous chunk
	chunkSize  int
	eof        bool
	// PBO optimization: Pre-allocate line buffer to reduce allocations
	lineBuffer []byte
	lineLen    int
}

// NewChunkedReader creates a new chunked reader with the specified chunk size
func NewChunkedReader(reader io.Reader, chunkSize int) *ChunkedReader {
	if chunkSize <= 0 {
		chunkSize = constants.DefaultChunkSize // Default 64KB chunks
	}
	return &ChunkedReader{
		reader:     reader,
		buffer:     make([]byte, chunkSize),
		chunkSize:  chunkSize,
		// PBO optimization: Pre-allocate line buffer
		lineBuffer: make([]byte, 0, constants.LineBufferInitialCapacity), // 8KB initial capacity
	}
}

// ProcessLines reads data in chunks and processes it line by line, sending complete lines
// to the rawLines channel. This mimics the behavior of the original byte-by-byte approach.
func (cr *ChunkedReader) ProcessLines(ctx context.Context, rawLines chan *bytes.Buffer, 
	maxLineLength int, filePath string, serverMessages chan<- string, seekEOF bool) error {
	
	message := pool.BytesBuffer.Get().(*bytes.Buffer)
	warnedAboutLongLine := false
	
	for {
		// Read next chunk if we don't have remaining data
		if len(cr.remaining) == 0 && !cr.eof {
			n, err := cr.reader.Read(cr.buffer)
			if err != nil {
				if err == io.EOF {
					if !seekEOF {
						// Not in tailing mode - end of file means we're done
						cr.eof = true
						if message.Len() > 0 {
							// Send any remaining data as the last line
							select {
							case rawLines <- message:
							case <-ctx.Done():
								return ctx.Err()
							}
						}
						return nil
					} else {
						// In tailing mode - EOF means wait and try again
						// Use shared timer to reduce allocations - PBO optimization
						if !sharedTimer.Stop() {
							// Drain timer channel if it fired
							select {
							case <-sharedTimer.C:
							default:
							}
						}
						sharedTimer.Reset(constants.ProcessorSleepDuration)
						select {
						case <-ctx.Done():
							return ctx.Err()
						case <-sharedTimer.C:
							// Continue reading after brief pause
							continue
						}
					}
				}
				return err
			}
			// Combine any leftover partial line with new data
			if message.Len() > 0 {
				// We had a partial line from previous iteration
				newData := make([]byte, message.Len()+n)
				copy(newData, message.Bytes())
				copy(newData[message.Len():], cr.buffer[:n])
				cr.remaining = newData
				message.Reset()
			} else {
				cr.remaining = cr.buffer[:n]
			}
		}
		
		// If we have no more data and reached EOF, we're done
		if len(cr.remaining) == 0 && cr.eof {
			if message.Len() > 0 {
				select {
				case rawLines <- message:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		}
		
		// Process data and extract complete lines - PBO optimized
		// Reset line buffer for this chunk
		cr.lineBuffer = cr.lineBuffer[:0]
		cr.lineLen = 0
		
		for _, b := range cr.remaining {
			// Use pre-allocated buffer to reduce byte-by-byte WriteByte calls
			if cr.lineLen < len(cr.lineBuffer) {
				cr.lineBuffer[cr.lineLen] = b
			} else {
				cr.lineBuffer = append(cr.lineBuffer, b)
			}
			cr.lineLen++
			
			switch b {
			case '\n':
				// Send the complete line using Write for bulk operation
				message.Write(cr.lineBuffer[:cr.lineLen])
				select {
				case rawLines <- message:
					message = pool.BytesBuffer.Get().(*bytes.Buffer)
					warnedAboutLongLine = false
					// Reset line buffer for next line
					cr.lineLen = 0
				case <-ctx.Done():
					return ctx.Err()
				}
			default:
				// Check line length limit
				if cr.lineLen >= maxLineLength {
					if !warnedAboutLongLine {
						serverMessages <- dlog.Common.Warn(filePath,
							"Long log line, splitting into multiple lines") + "\n"
						warnedAboutLongLine = true
					}
					// Add newline to current buffer and send
					if cr.lineLen < len(cr.lineBuffer) {
						cr.lineBuffer[cr.lineLen] = '\n'
					} else {
						cr.lineBuffer = append(cr.lineBuffer, '\n')
					}
					cr.lineLen++
					message.Write(cr.lineBuffer[:cr.lineLen])
					select {
					case rawLines <- message:
						message = pool.BytesBuffer.Get().(*bytes.Buffer)
						// Reset line buffer for next line
						cr.lineLen = 0
					case <-ctx.Done():
						return ctx.Err()
					}
				}
			}
		}
		
		// If we have remaining data in line buffer, add it to message
		if cr.lineLen > 0 {
			message.Write(cr.lineBuffer[:cr.lineLen])
		}
		
		// Clear the remaining buffer - any partial line is now in the message buffer
		cr.remaining = nil
	}
}