1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
package fs
import (
"bytes"
"context"
"io"
"time"
"github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
)
// Reusable timer to reduce allocations - PBO optimization
var sharedTimer = time.NewTimer(constants.ProcessorSleepDuration)
// ChunkedReader reads data in large chunks and processes it line by line
// This replaces the byte-by-byte reading approach for better performance
type ChunkedReader struct {
reader io.Reader
buffer []byte
remaining []byte // Partial line from previous chunk
chunkSize int
eof bool
// PBO optimization: Pre-allocate line buffer to reduce allocations
lineBuffer []byte
lineLen int
}
// NewChunkedReader creates a new chunked reader with the specified chunk size
func NewChunkedReader(reader io.Reader, chunkSize int) *ChunkedReader {
if chunkSize <= 0 {
chunkSize = constants.DefaultChunkSize // Default 64KB chunks
}
return &ChunkedReader{
reader: reader,
buffer: make([]byte, chunkSize),
chunkSize: chunkSize,
// PBO optimization: Pre-allocate line buffer
lineBuffer: make([]byte, 0, constants.LineBufferInitialCapacity), // 8KB initial capacity
}
}
// ProcessLines reads data in chunks and processes it line by line, sending complete lines
// to the rawLines channel. This mimics the behavior of the original byte-by-byte approach.
func (cr *ChunkedReader) ProcessLines(ctx context.Context, rawLines chan *bytes.Buffer,
maxLineLength int, filePath string, serverMessages chan<- string, seekEOF bool) error {
message := pool.BytesBuffer.Get().(*bytes.Buffer)
warnedAboutLongLine := false
for {
// Read next chunk if we don't have remaining data
if len(cr.remaining) == 0 && !cr.eof {
n, err := cr.reader.Read(cr.buffer)
if err != nil {
if err == io.EOF {
if !seekEOF {
// Not in tailing mode - end of file means we're done
cr.eof = true
if message.Len() > 0 {
// Send any remaining data as the last line
select {
case rawLines <- message:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
} else {
// In tailing mode - EOF means wait and try again
// Use shared timer to reduce allocations - PBO optimization
if !sharedTimer.Stop() {
// Drain timer channel if it fired
select {
case <-sharedTimer.C:
default:
}
}
sharedTimer.Reset(constants.ProcessorSleepDuration)
select {
case <-ctx.Done():
return ctx.Err()
case <-sharedTimer.C:
// Continue reading after brief pause
continue
}
}
}
return err
}
// Combine any leftover partial line with new data
if message.Len() > 0 {
// We had a partial line from previous iteration
newData := make([]byte, message.Len()+n)
copy(newData, message.Bytes())
copy(newData[message.Len():], cr.buffer[:n])
cr.remaining = newData
message.Reset()
} else {
cr.remaining = cr.buffer[:n]
}
}
// If we have no more data and reached EOF, we're done
if len(cr.remaining) == 0 && cr.eof {
if message.Len() > 0 {
select {
case rawLines <- message:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Process data and extract complete lines - PBO optimized
// Reset line buffer for this chunk
cr.lineBuffer = cr.lineBuffer[:0]
cr.lineLen = 0
for _, b := range cr.remaining {
// Use pre-allocated buffer to reduce byte-by-byte WriteByte calls
if cr.lineLen < len(cr.lineBuffer) {
cr.lineBuffer[cr.lineLen] = b
} else {
cr.lineBuffer = append(cr.lineBuffer, b)
}
cr.lineLen++
switch b {
case '\n':
// Send the complete line using Write for bulk operation
message.Write(cr.lineBuffer[:cr.lineLen])
select {
case rawLines <- message:
message = pool.BytesBuffer.Get().(*bytes.Buffer)
warnedAboutLongLine = false
// Reset line buffer for next line
cr.lineLen = 0
case <-ctx.Done():
return ctx.Err()
}
default:
// Check line length limit
if cr.lineLen >= maxLineLength {
if !warnedAboutLongLine {
serverMessages <- dlog.Common.Warn(filePath,
"Long log line, splitting into multiple lines") + "\n"
warnedAboutLongLine = true
}
// Add newline to current buffer and send
if cr.lineLen < len(cr.lineBuffer) {
cr.lineBuffer[cr.lineLen] = '\n'
} else {
cr.lineBuffer = append(cr.lineBuffer, '\n')
}
cr.lineLen++
message.Write(cr.lineBuffer[:cr.lineLen])
select {
case rawLines <- message:
message = pool.BytesBuffer.Get().(*bytes.Buffer)
// Reset line buffer for next line
cr.lineLen = 0
case <-ctx.Done():
return ctx.Err()
}
}
}
}
// If we have remaining data in line buffer, add it to message
if cr.lineLen > 0 {
message.Write(cr.lineBuffer[:cr.lineLen])
}
// Clear the remaining buffer - any partial line is now in the message buffer
cr.remaining = nil
}
}
|