1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package fs
import (
"bytes"
"context"
"time"
"github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// AggregateLineProcessor feeds lines to an existing aggregate via channels
type AggregateLineProcessor struct {
linesCh chan<- *line.Line
re regex.Regex
hostname string
ltx lcontext.LContext
lineNum int
isTailing bool // Whether this is for a tail operation that should keep running
}
// NewAggregateLineProcessor creates a processor that feeds lines to an aggregate
func NewAggregateLineProcessor(linesCh chan<- *line.Line, re regex.Regex, hostname string, ltx lcontext.LContext) *AggregateLineProcessor {
return &AggregateLineProcessor{
linesCh: linesCh,
re: re,
hostname: hostname,
ltx: ltx,
lineNum: 0,
isTailing: false,
}
}
// NewAggregateLineProcessorForTail creates a processor for tail operations that feeds lines to an aggregate
func NewAggregateLineProcessorForTail(linesCh chan<- *line.Line, re regex.Regex, hostname string, ltx lcontext.LContext) *AggregateLineProcessor {
return &AggregateLineProcessor{
linesCh: linesCh,
re: re,
hostname: hostname,
ltx: ltx,
lineNum: 0,
isTailing: true,
}
}
func (p *AggregateLineProcessor) ProcessLine(lineBuf []byte, lineNum int, filePath string, stats *stats, sourceID string) (result []byte, shouldSend bool) {
p.lineNum++
// For MapReduce operations, don't apply regex filtering here - let the aggregate handle it
// The aggregate's log parser and WHERE clause will do the proper filtering
// Create a line object similar to what the channel-based system creates
// Make a copy of the line buffer to avoid issues with slice reuse
lineCopy := make([]byte, len(lineBuf))
copy(lineCopy, lineBuf)
content := bytes.NewBuffer(lineCopy)
l := line.New(content, uint64(p.lineNum), 100, sourceID)
// Send the line to the aggregate via the channel (blocking send to avoid data loss)
p.linesCh <- l
// Don't send output directly since the aggregate will handle serialization
return nil, false
}
func (p *AggregateLineProcessor) Flush() []byte {
// For tail operations, don't close the channel as we want to keep following
if !p.isTailing {
// Close the lines channel to signal end of input
// Add a small delay to ensure all lines are processed before closing
time.Sleep(constants.ProcessorSleepDuration)
close(p.linesCh)
}
return nil
}
func (p *AggregateLineProcessor) Initialize(ctx context.Context) error {
return nil
}
func (p *AggregateLineProcessor) Cleanup() error {
return nil
}
|