summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/catprocessor.go32
-rw-r--r--internal/io/fs/directprocessor.go100
-rw-r--r--internal/io/fs/grepprocessor.go70
-rw-r--r--internal/io/fs/mapprocessor.go53
4 files changed, 135 insertions, 120 deletions
diff --git a/internal/io/fs/catprocessor.go b/internal/io/fs/catprocessor.go
index 4430488..b062c7f 100644
--- a/internal/io/fs/catprocessor.go
+++ b/internal/io/fs/catprocessor.go
@@ -10,18 +10,18 @@ import (
// CatProcessor handles cat-style output
type CatProcessor struct {
- plain bool
- noColor bool
- hostname string
+ plain bool
+ noColor bool
+ hostname string
isFirstLine bool
}
// NewCatProcessor creates a new cat processor
func NewCatProcessor(plain, noColor bool, hostname string) *CatProcessor {
return &CatProcessor{
- plain: plain,
- noColor: noColor,
- hostname: hostname,
+ plain: plain,
+ noColor: noColor,
+ hostname: hostname,
isFirstLine: true,
}
}
@@ -34,12 +34,16 @@ func (cp *CatProcessor) Cleanup() error {
return nil
}
+// ProcessLine processes a single line for cat output.
+// In plain mode, it preserves the original line exactly including line endings.
+// In non-plain mode, it formats the line according to DTail protocol with optional colorization.
+// Returns the formatted line and true (cat always outputs all lines).
func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) {
// Update stats for matched line (cat always matches all lines)
if stats != nil {
stats.updateLineMatched()
}
-
+
// Format output to match existing behavior
if cp.plain {
// In plain mode, preserve the original line exactly as it is
@@ -48,7 +52,7 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
copy(result, line)
return result, true
}
-
+
// Format exactly like original basehandler.go for non-plain mode
// REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}¬
var transmittedPerc int
@@ -58,17 +62,17 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
transmittedPerc = 100
count = stats.totalLineCount()
}
-
+
// Build the protocol line
protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
protocol.FieldDelimiter, cp.hostname, protocol.FieldDelimiter,
transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
sourceID, protocol.FieldDelimiter, string(line))
-
+
// Apply ANSI color formatting if not in plain mode and not noColor mode
if !cp.plain && !cp.noColor {
colorized := brush.Colorfy(protocolLine)
-
+
// Add color reset prefix for all lines except the first
var result []byte
if cp.isFirstLine {
@@ -86,12 +90,12 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
}
return result, true
}
-
+
// No color formatting
result := make([]byte, len(protocolLine)+1)
copy(result, protocolLine)
result[len(protocolLine)] = '\n'
-
+
return result, true
}
@@ -102,4 +106,4 @@ func (cp *CatProcessor) Flush() []byte {
return []byte("\x1b[39m\x1b[49m\x1b[49m\x1b[39m")
}
return nil
-} \ No newline at end of file
+}
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index 762fa8f..9c564e7 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -63,9 +63,9 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
if catProcessor, ok := dp.processor.(*CatProcessor); ok && catProcessor.plain {
return dp.processReaderPreservingLineEndings(ctx, reader, filePath)
}
-
+
scanner := bufio.NewScanner(reader)
-
+
// Set buffer size respecting MaxLineLength configuration
maxLineLength := config.Server.MaxLineLength
initialBufSize := 64 * 1024
@@ -73,7 +73,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
initialBufSize = maxLineLength
}
scanner.Buffer(make([]byte, initialBufSize), maxLineLength)
-
+
lineNum := 0
for scanner.Scan() {
select {
@@ -81,35 +81,35 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
return ctx.Err()
default:
}
-
+
lineNum++
line := scanner.Bytes()
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
- // Process line directly
+
+ // Process line directly
if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
}
-
+
// Flush any buffered output
if final := dp.processor.Flush(); len(final) > 0 {
if _, err := dp.output.Write(final); err != nil {
return err
}
}
-
+
return scanner.Err()
}
@@ -121,20 +121,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
lineNum := 0
maxLineLength := config.Server.MaxLineLength
warnedAboutLongLine := false
-
-
+
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
-
+
n, err := reader.Read(buf)
if n > 0 {
data := append(remaining, buf[:n]...)
remaining = remaining[:0]
-
+
// Process complete lines
for {
// Find next line ending (LF or CRLF)
@@ -153,24 +152,24 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
splitLine := make([]byte, maxLineLength+1)
copy(splitLine, data[:maxLineLength])
splitLine[maxLineLength] = '\n'
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process the split line
if result, shouldSend := dp.processor.ProcessLine(splitLine, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
-
+
// Continue with remaining data
data = data[maxLineLength:]
continue
@@ -180,19 +179,20 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
break
}
}
-
- line := data[:lfIndex+1] // Include the LF
- data = data[lfIndex+1:] // Continue with remaining data
-
+
+ // Extract the line including its original line ending (CRLF or LF)
+ line := data[:lfIndex+1] // Include the LF (and CR if present before it)
+ data = data[lfIndex+1:] // Continue with remaining data
+
// Reset warning flag for new line
warnedAboutLongLine = false
-
+
// Check if this line exceeds max length and needs to be split
if len(line) > maxLineLength {
// Split the long line into chunks
lineContent := line[:len(line)-1] // Remove the LF
lineEnding := line[len(line)-1:] // Keep the LF
-
+
for len(lineContent) > 0 {
lineNum++
var chunk []byte
@@ -207,18 +207,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
copy(chunk[len(lineContent):], lineEnding)
lineContent = nil
}
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process the chunk
if result, shouldSend := dp.processor.ProcessLine(chunk, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -228,18 +228,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
} else {
// Normal line processing
lineNum++
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
// Process line directly (line includes original line ending)
if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -248,12 +248,12 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
}
}
}
-
+
if err == io.EOF {
// Process any remaining data as the last line, respecting line length limit
for len(remaining) > 0 {
lineNum++
-
+
var lineToProcess []byte
if len(remaining) > maxLineLength {
// Split the remaining data
@@ -266,17 +266,17 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
lineToProcess = remaining
remaining = nil
}
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
-
+
if result, shouldSend := dp.processor.ProcessLine(lineToProcess, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
@@ -285,19 +285,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
}
break
}
-
+
if err != nil {
return err
}
}
-
+
// Flush any buffered output
if final := dp.processor.Flush(); len(final) > 0 {
if _, err := dp.output.Write(final); err != nil {
return err
}
}
-
+
return nil
}
@@ -323,7 +323,7 @@ func (dp *DirectProcessor) ProcessFileWithTailing(ctx context.Context, filePath
func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) error {
// Track our current position in the file
var lastSize int64
-
+
for {
select {
case <-ctx.Done():
@@ -334,7 +334,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
if err != nil {
continue
}
-
+
currentSize := fileInfo.Size()
if currentSize > lastSize {
// File has new content, read it
@@ -342,19 +342,19 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
if err != nil {
continue
}
-
+
// Seek to where we left off
if _, err := file.Seek(lastSize, 0); err != nil {
file.Close()
continue
}
-
+
// Process new content
if err := dp.processNewContent(ctx, file, filePath); err != nil {
file.Close()
continue
}
-
+
lastSize = currentSize
file.Close()
}
@@ -365,35 +365,35 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
// processNewContent processes new content that was added to the file
func (dp *DirectProcessor) processNewContent(ctx context.Context, file *os.File, filePath string) error {
scanner := bufio.NewScanner(file)
-
+
// Start line counting from where we left off (simplified approach)
lineNum := 1
-
+
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
-
+
lineBuf := scanner.Bytes()
if result, shouldSend := dp.processor.ProcessLine(lineBuf, lineNum, filePath, dp.stats, dp.sourceID); shouldSend {
if _, err := dp.output.Write(result); err != nil {
return err
}
-
+
// Update transmission stats
if dp.stats != nil {
dp.stats.updateLineTransmitted()
}
}
lineNum++
-
+
// Update position stats
if dp.stats != nil {
dp.stats.updatePosition()
}
}
-
+
return scanner.Err()
-} \ No newline at end of file
+}
diff --git a/internal/io/fs/grepprocessor.go b/internal/io/fs/grepprocessor.go
index 6b34dc1..ed1c271 100644
--- a/internal/io/fs/grepprocessor.go
+++ b/internal/io/fs/grepprocessor.go
@@ -15,38 +15,38 @@ type GrepProcessor struct {
plain bool
noColor bool
hostname string
-
+
// Context handling
beforeContext int
afterContext int
maxCount int
-
+
// State for context processing
- matchCount int
- afterRemaining int
- beforeBuffer [][]byte
- beforeLineNums []int
+ matchCount int
+ afterRemaining int
+ beforeBuffer [][]byte
+ beforeLineNums []int
}
// NewGrepProcessor creates a new grep processor
func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, beforeContext, afterContext, maxCount int) *GrepProcessor {
gp := &GrepProcessor{
- regex: re,
- plain: plain,
- noColor: noColor,
- hostname: hostname,
- beforeContext: beforeContext,
- afterContext: afterContext,
- maxCount: maxCount,
- matchCount: 0,
+ regex: re,
+ plain: plain,
+ noColor: noColor,
+ hostname: hostname,
+ beforeContext: beforeContext,
+ afterContext: afterContext,
+ maxCount: maxCount,
+ matchCount: 0,
afterRemaining: 0,
}
-
+
if beforeContext > 0 {
gp.beforeBuffer = make([][]byte, 0, beforeContext)
gp.beforeLineNums = make([]int, 0, beforeContext)
}
-
+
return gp
}
@@ -58,10 +58,12 @@ func (gp *GrepProcessor) Cleanup() error {
return nil
}
+// ProcessLine processes a single line for grep filtering with context support.
+// Returns formatted output for matching lines and their context, or nil for non-matching lines.
+// Handles before/after context lines and respects maxCount limit.
func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) {
isMatch := gp.regex.Match(line)
-
-
+
// Handle lines that don't match the regex
if !isMatch {
// Handle after context lines (only for non-matching lines)
@@ -78,7 +80,7 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string,
// Make a copy of the line for buffering
lineCopy := make([]byte, len(line))
copy(lineCopy, line)
-
+
// Add to buffer, removing oldest if at capacity
if len(gp.beforeBuffer) >= gp.beforeContext {
gp.beforeBuffer = gp.beforeBuffer[1:]
@@ -89,23 +91,23 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string,
}
return nil, false
}
-
+
// Line matches the regex
gp.matchCount++
-
+
// Check if we've reached maxCount
if gp.maxCount > 0 && gp.matchCount > gp.maxCount {
return nil, false
}
-
+
// Update stats for matched line
if stats != nil {
stats.updateLineMatched()
}
-
+
// Build result with before context, current line, and set up after context
var result []byte
-
+
// First, output any before context lines
if gp.beforeContext > 0 {
for i, beforeLine := range gp.beforeBuffer {
@@ -117,16 +119,16 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string,
gp.beforeBuffer = gp.beforeBuffer[:0]
gp.beforeLineNums = gp.beforeLineNums[:0]
}
-
+
// Add the matching line
formatted := gp.formatLine(line, lineNum, filePath, stats, sourceID)
result = append(result, formatted...)
-
+
// Set up after context (only if we're not already in after context mode)
if gp.afterContext > 0 && gp.afterRemaining == 0 {
gp.afterRemaining = gp.afterContext
}
-
+
return result, true
}
@@ -143,7 +145,7 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s
result[len(line)] = '\n'
return result
}
-
+
// Format exactly like original basehandler.go for non-plain mode
// REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}¬
var transmittedPerc int
@@ -152,14 +154,14 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s
transmittedPerc = stats.transmittedPerc()
count = stats.totalLineCount()
}
-
+
// Build the protocol line
protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
protocol.FieldDelimiter, gp.hostname, protocol.FieldDelimiter,
transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
sourceID, protocol.FieldDelimiter, string(line))
-
- // Apply ANSI color formatting if not in plain mode and not noColor mode
+
+ // Apply ANSI color formatting if not in plain mode and not noColor mode.
if !gp.plain && !gp.noColor {
colorized := brush.Colorfy(protocolLine)
result := make([]byte, len(colorized)+1)
@@ -167,11 +169,11 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s
result[len(colorized)] = '\n'
return result
}
-
+
// No color formatting
result := make([]byte, len(protocolLine)+1)
copy(result, protocolLine)
result[len(protocolLine)] = '\n'
-
+
return result
-} \ No newline at end of file
+}
diff --git a/internal/io/fs/mapprocessor.go b/internal/io/fs/mapprocessor.go
index b1b2283..a2e051d 100644
--- a/internal/io/fs/mapprocessor.go
+++ b/internal/io/fs/mapprocessor.go
@@ -16,15 +16,15 @@ import (
// MapProcessor handles MapReduce-style aggregation
type MapProcessor struct {
- plain bool
- hostname string
- query *mapr.Query
- parser logformat.Parser
- groupSet *mapr.GroupSet
- buffer []byte
- output io.Writer
- lastSerialized time.Time
- serializeFunc func(groupSet *mapr.GroupSet)
+ plain bool
+ hostname string
+ query *mapr.Query
+ parser logformat.Parser
+ groupSet *mapr.GroupSet
+ buffer []byte
+ output io.Writer
+ lastSerialized time.Time
+ serializeFunc func(groupSet *mapr.GroupSet)
}
// NewMapProcessor creates a new map processor
@@ -63,10 +63,10 @@ func NewMapProcessor(plain bool, hostname string, queryStr string, output io.Wri
output: output,
lastSerialized: time.Now(),
}
-
+
// Set up serialization function
mp.serializeFunc = mp.defaultSerializeFunc
-
+
return mp, nil
}
@@ -83,10 +83,13 @@ func (mp *MapProcessor) Cleanup() error {
return nil
}
+// ProcessLine processes a single line for MapReduce aggregation.
+// Parses the line, applies WHERE and SET clauses, aggregates matching fields,
+// and handles periodic serialization. Returns nil (no immediate output for MapReduce).
func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) {
// Convert line to string and parse fields
maprLine := strings.TrimSpace(string(line))
-
+
fields, err := mp.parser.MakeFields(maprLine)
if err != nil {
// Should fields be ignored anyway?
@@ -95,12 +98,12 @@ func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
}
return nil, false
}
-
+
// Apply WHERE clause filter
if !mp.query.WhereClause(fields) {
return nil, false
}
-
+
// Apply SET clause (add additional fields)
if len(mp.query.Set) > 0 {
if err := mp.query.SetClause(fields); err != nil {
@@ -108,20 +111,23 @@ func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
return nil, false
}
}
-
+
// Aggregate the fields
mp.aggregateFields(fields)
-
+
// Check if we should serialize results periodically (every 5 seconds by default)
now := time.Now()
if now.Sub(mp.lastSerialized) >= mp.query.Interval {
mp.periodicSerialize()
mp.lastSerialized = now
}
-
+
return nil, false // No immediate output for MapReduce - output happens periodically
}
+// aggregateFields groups parsed fields by the GROUP BY clause and aggregates values
+// according to the SELECT operations. Creates a group key from GROUP BY fields
+// and updates the corresponding aggregation set with SELECT field values.
func (mp *MapProcessor) aggregateFields(fields map[string]string) {
var sb strings.Builder
for i, field := range mp.query.GroupBy {
@@ -160,12 +166,15 @@ func (mp *MapProcessor) periodicSerialize() {
mp.groupSet = mapr.NewGroupSet()
}
-// defaultSerializeFunc implements the default serialization behavior
+// defaultSerializeFunc implements the default serialization behavior for MapReduce results.
+// This function is called periodically to send aggregated data to the client.
+// It uses a channel-based approach to serialize the group set and format output
+// according to the DTail protocol (A|serialized_data¬) for transmission.
func (mp *MapProcessor) defaultSerializeFunc(groupSet *mapr.GroupSet) {
// Use a channel to collect serialized data
ch := make(chan string, 100)
done := make(chan struct{})
-
+
go func() {
defer close(done)
for msg := range ch {
@@ -175,14 +184,14 @@ func (mp *MapProcessor) defaultSerializeFunc(groupSet *mapr.GroupSet) {
output.WriteString(protocol.FieldDelimiter)
output.WriteString(msg)
output.WriteByte(protocol.MessageDelimiter)
-
+
// Write to output immediately
if mp.output != nil {
mp.output.Write([]byte(output.String()))
}
}
}()
-
+
// Serialize the group set
ctx := context.Background()
groupSet.Serialize(ctx, ch)
@@ -196,4 +205,4 @@ func (mp *MapProcessor) Flush() []byte {
mp.serializeFunc(mp.groupSet)
}
return nil // Output is handled by serializeFunc
-} \ No newline at end of file
+}