diff options
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/catprocessor.go | 32 | ||||
| -rw-r--r-- | internal/io/fs/directprocessor.go | 100 | ||||
| -rw-r--r-- | internal/io/fs/grepprocessor.go | 70 | ||||
| -rw-r--r-- | internal/io/fs/mapprocessor.go | 53 |
4 files changed, 135 insertions, 120 deletions
diff --git a/internal/io/fs/catprocessor.go b/internal/io/fs/catprocessor.go index 4430488..b062c7f 100644 --- a/internal/io/fs/catprocessor.go +++ b/internal/io/fs/catprocessor.go @@ -10,18 +10,18 @@ import ( // CatProcessor handles cat-style output type CatProcessor struct { - plain bool - noColor bool - hostname string + plain bool + noColor bool + hostname string isFirstLine bool } // NewCatProcessor creates a new cat processor func NewCatProcessor(plain, noColor bool, hostname string) *CatProcessor { return &CatProcessor{ - plain: plain, - noColor: noColor, - hostname: hostname, + plain: plain, + noColor: noColor, + hostname: hostname, isFirstLine: true, } } @@ -34,12 +34,16 @@ func (cp *CatProcessor) Cleanup() error { return nil } +// ProcessLine processes a single line for cat output. +// In plain mode, it preserves the original line exactly including line endings. +// In non-plain mode, it formats the line according to DTail protocol with optional colorization. +// Returns the formatted line and true (cat always outputs all lines). func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) { // Update stats for matched line (cat always matches all lines) if stats != nil { stats.updateLineMatched() } - + // Format output to match existing behavior if cp.plain { // In plain mode, preserve the original line exactly as it is @@ -48,7 +52,7 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s copy(result, line) return result, true } - + // Format exactly like original basehandler.go for non-plain mode // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}¬ var transmittedPerc int @@ -58,17 +62,17 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s transmittedPerc = 100 count = stats.totalLineCount() } - + // Build the protocol line protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s", protocol.FieldDelimiter, cp.hostname, protocol.FieldDelimiter, transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter, sourceID, protocol.FieldDelimiter, string(line)) - + // Apply ANSI color formatting if not in plain mode and not noColor mode if !cp.plain && !cp.noColor { colorized := brush.Colorfy(protocolLine) - + // Add color reset prefix for all lines except the first var result []byte if cp.isFirstLine { @@ -86,12 +90,12 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s } return result, true } - + // No color formatting result := make([]byte, len(protocolLine)+1) copy(result, protocolLine) result[len(protocolLine)] = '\n' - + return result, true } @@ -102,4 +106,4 @@ func (cp *CatProcessor) Flush() []byte { return []byte("\x1b[39m\x1b[49m\x1b[49m\x1b[39m") } return nil -}
\ No newline at end of file +} diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go index 762fa8f..9c564e7 100644 --- a/internal/io/fs/directprocessor.go +++ b/internal/io/fs/directprocessor.go @@ -63,9 +63,9 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, if catProcessor, ok := dp.processor.(*CatProcessor); ok && catProcessor.plain { return dp.processReaderPreservingLineEndings(ctx, reader, filePath) } - + scanner := bufio.NewScanner(reader) - + // Set buffer size respecting MaxLineLength configuration maxLineLength := config.Server.MaxLineLength initialBufSize := 64 * 1024 @@ -73,7 +73,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, initialBufSize = maxLineLength } scanner.Buffer(make([]byte, initialBufSize), maxLineLength) - + lineNum := 0 for scanner.Scan() { select { @@ -81,35 +81,35 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, return ctx.Err() default: } - + lineNum++ line := scanner.Bytes() - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - - // Process line directly + + // Process line directly if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } } - + // Flush any buffered output if final := dp.processor.Flush(); len(final) > 0 { if _, err := dp.output.Write(final); err != nil { return err } } - + return scanner.Err() } @@ -121,20 +121,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex lineNum := 0 maxLineLength := config.Server.MaxLineLength warnedAboutLongLine := false - - + for { select { case <-ctx.Done(): return ctx.Err() default: } - + n, err := reader.Read(buf) if n > 0 { data := append(remaining, buf[:n]...) remaining = remaining[:0] - + // Process complete lines for { // Find next line ending (LF or CRLF) @@ -153,24 +152,24 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex splitLine := make([]byte, maxLineLength+1) copy(splitLine, data[:maxLineLength]) splitLine[maxLineLength] = '\n' - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process the split line if result, shouldSend := dp.processor.ProcessLine(splitLine, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } - + // Continue with remaining data data = data[maxLineLength:] continue @@ -180,19 +179,20 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex break } } - - line := data[:lfIndex+1] // Include the LF - data = data[lfIndex+1:] // Continue with remaining data - + + // Extract the line including its original line ending (CRLF or LF) + line := data[:lfIndex+1] // Include the LF (and CR if present before it) + data = data[lfIndex+1:] // Continue with remaining data + // Reset warning flag for new line warnedAboutLongLine = false - + // Check if this line exceeds max length and needs to be split if len(line) > maxLineLength { // Split the long line into chunks lineContent := line[:len(line)-1] // Remove the LF lineEnding := line[len(line)-1:] // Keep the LF - + for len(lineContent) > 0 { lineNum++ var chunk []byte @@ -207,18 +207,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex copy(chunk[len(lineContent):], lineEnding) lineContent = nil } - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process the chunk if result, shouldSend := dp.processor.ProcessLine(chunk, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -228,18 +228,18 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } else { // Normal line processing lineNum++ - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + // Process line directly (line includes original line ending) if result, shouldSend := dp.processor.ProcessLine(line, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -248,12 +248,12 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } } } - + if err == io.EOF { // Process any remaining data as the last line, respecting line length limit for len(remaining) > 0 { lineNum++ - + var lineToProcess []byte if len(remaining) > maxLineLength { // Split the remaining data @@ -266,17 +266,17 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex lineToProcess = remaining remaining = nil } - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } - + if result, shouldSend := dp.processor.ProcessLine(lineToProcess, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() @@ -285,19 +285,19 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex } break } - + if err != nil { return err } } - + // Flush any buffered output if final := dp.processor.Flush(); len(final) > 0 { if _, err := dp.output.Write(final); err != nil { return err } } - + return nil } @@ -323,7 +323,7 @@ func (dp *DirectProcessor) ProcessFileWithTailing(ctx context.Context, filePath func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) error { // Track our current position in the file var lastSize int64 - + for { select { case <-ctx.Done(): @@ -334,7 +334,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro if err != nil { continue } - + currentSize := fileInfo.Size() if currentSize > lastSize { // File has new content, read it @@ -342,19 +342,19 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro if err != nil { continue } - + // Seek to where we left off if _, err := file.Seek(lastSize, 0); err != nil { file.Close() continue } - + // Process new content if err := dp.processNewContent(ctx, file, filePath); err != nil { file.Close() continue } - + lastSize = currentSize file.Close() } @@ -365,35 +365,35 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro // processNewContent processes new content that was added to the file func (dp *DirectProcessor) processNewContent(ctx context.Context, file *os.File, filePath string) error { scanner := bufio.NewScanner(file) - + // Start line counting from where we left off (simplified approach) lineNum := 1 - + for scanner.Scan() { select { case <-ctx.Done(): return ctx.Err() default: } - + lineBuf := scanner.Bytes() if result, shouldSend := dp.processor.ProcessLine(lineBuf, lineNum, filePath, dp.stats, dp.sourceID); shouldSend { if _, err := dp.output.Write(result); err != nil { return err } - + // Update transmission stats if dp.stats != nil { dp.stats.updateLineTransmitted() } } lineNum++ - + // Update position stats if dp.stats != nil { dp.stats.updatePosition() } } - + return scanner.Err() -}
\ No newline at end of file +} diff --git a/internal/io/fs/grepprocessor.go b/internal/io/fs/grepprocessor.go index 6b34dc1..ed1c271 100644 --- a/internal/io/fs/grepprocessor.go +++ b/internal/io/fs/grepprocessor.go @@ -15,38 +15,38 @@ type GrepProcessor struct { plain bool noColor bool hostname string - + // Context handling beforeContext int afterContext int maxCount int - + // State for context processing - matchCount int - afterRemaining int - beforeBuffer [][]byte - beforeLineNums []int + matchCount int + afterRemaining int + beforeBuffer [][]byte + beforeLineNums []int } // NewGrepProcessor creates a new grep processor func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, beforeContext, afterContext, maxCount int) *GrepProcessor { gp := &GrepProcessor{ - regex: re, - plain: plain, - noColor: noColor, - hostname: hostname, - beforeContext: beforeContext, - afterContext: afterContext, - maxCount: maxCount, - matchCount: 0, + regex: re, + plain: plain, + noColor: noColor, + hostname: hostname, + beforeContext: beforeContext, + afterContext: afterContext, + maxCount: maxCount, + matchCount: 0, afterRemaining: 0, } - + if beforeContext > 0 { gp.beforeBuffer = make([][]byte, 0, beforeContext) gp.beforeLineNums = make([]int, 0, beforeContext) } - + return gp } @@ -58,10 +58,12 @@ func (gp *GrepProcessor) Cleanup() error { return nil } +// ProcessLine processes a single line for grep filtering with context support. +// Returns formatted output for matching lines and their context, or nil for non-matching lines. +// Handles before/after context lines and respects maxCount limit. func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) { isMatch := gp.regex.Match(line) - - + // Handle lines that don't match the regex if !isMatch { // Handle after context lines (only for non-matching lines) @@ -78,7 +80,7 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string, // Make a copy of the line for buffering lineCopy := make([]byte, len(line)) copy(lineCopy, line) - + // Add to buffer, removing oldest if at capacity if len(gp.beforeBuffer) >= gp.beforeContext { gp.beforeBuffer = gp.beforeBuffer[1:] @@ -89,23 +91,23 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string, } return nil, false } - + // Line matches the regex gp.matchCount++ - + // Check if we've reached maxCount if gp.maxCount > 0 && gp.matchCount > gp.maxCount { return nil, false } - + // Update stats for matched line if stats != nil { stats.updateLineMatched() } - + // Build result with before context, current line, and set up after context var result []byte - + // First, output any before context lines if gp.beforeContext > 0 { for i, beforeLine := range gp.beforeBuffer { @@ -117,16 +119,16 @@ func (gp *GrepProcessor) ProcessLine(line []byte, lineNum int, filePath string, gp.beforeBuffer = gp.beforeBuffer[:0] gp.beforeLineNums = gp.beforeLineNums[:0] } - + // Add the matching line formatted := gp.formatLine(line, lineNum, filePath, stats, sourceID) result = append(result, formatted...) - + // Set up after context (only if we're not already in after context mode) if gp.afterContext > 0 && gp.afterRemaining == 0 { gp.afterRemaining = gp.afterContext } - + return result, true } @@ -143,7 +145,7 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s result[len(line)] = '\n' return result } - + // Format exactly like original basehandler.go for non-plain mode // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}¬ var transmittedPerc int @@ -152,14 +154,14 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s transmittedPerc = stats.transmittedPerc() count = stats.totalLineCount() } - + // Build the protocol line protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s", protocol.FieldDelimiter, gp.hostname, protocol.FieldDelimiter, transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter, sourceID, protocol.FieldDelimiter, string(line)) - - // Apply ANSI color formatting if not in plain mode and not noColor mode + + // Apply ANSI color formatting if not in plain mode and not noColor mode. if !gp.plain && !gp.noColor { colorized := brush.Colorfy(protocolLine) result := make([]byte, len(colorized)+1) @@ -167,11 +169,11 @@ func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, s result[len(colorized)] = '\n' return result } - + // No color formatting result := make([]byte, len(protocolLine)+1) copy(result, protocolLine) result[len(protocolLine)] = '\n' - + return result -}
\ No newline at end of file +} diff --git a/internal/io/fs/mapprocessor.go b/internal/io/fs/mapprocessor.go index b1b2283..a2e051d 100644 --- a/internal/io/fs/mapprocessor.go +++ b/internal/io/fs/mapprocessor.go @@ -16,15 +16,15 @@ import ( // MapProcessor handles MapReduce-style aggregation type MapProcessor struct { - plain bool - hostname string - query *mapr.Query - parser logformat.Parser - groupSet *mapr.GroupSet - buffer []byte - output io.Writer - lastSerialized time.Time - serializeFunc func(groupSet *mapr.GroupSet) + plain bool + hostname string + query *mapr.Query + parser logformat.Parser + groupSet *mapr.GroupSet + buffer []byte + output io.Writer + lastSerialized time.Time + serializeFunc func(groupSet *mapr.GroupSet) } // NewMapProcessor creates a new map processor @@ -63,10 +63,10 @@ func NewMapProcessor(plain bool, hostname string, queryStr string, output io.Wri output: output, lastSerialized: time.Now(), } - + // Set up serialization function mp.serializeFunc = mp.defaultSerializeFunc - + return mp, nil } @@ -83,10 +83,13 @@ func (mp *MapProcessor) Cleanup() error { return nil } +// ProcessLine processes a single line for MapReduce aggregation. +// Parses the line, applies WHERE and SET clauses, aggregates matching fields, +// and handles periodic serialization. Returns nil (no immediate output for MapReduce). func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) { // Convert line to string and parse fields maprLine := strings.TrimSpace(string(line)) - + fields, err := mp.parser.MakeFields(maprLine) if err != nil { // Should fields be ignored anyway? @@ -95,12 +98,12 @@ func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, s } return nil, false } - + // Apply WHERE clause filter if !mp.query.WhereClause(fields) { return nil, false } - + // Apply SET clause (add additional fields) if len(mp.query.Set) > 0 { if err := mp.query.SetClause(fields); err != nil { @@ -108,20 +111,23 @@ func (mp *MapProcessor) ProcessLine(line []byte, lineNum int, filePath string, s return nil, false } } - + // Aggregate the fields mp.aggregateFields(fields) - + // Check if we should serialize results periodically (every 5 seconds by default) now := time.Now() if now.Sub(mp.lastSerialized) >= mp.query.Interval { mp.periodicSerialize() mp.lastSerialized = now } - + return nil, false // No immediate output for MapReduce - output happens periodically } +// aggregateFields groups parsed fields by the GROUP BY clause and aggregates values +// according to the SELECT operations. Creates a group key from GROUP BY fields +// and updates the corresponding aggregation set with SELECT field values. func (mp *MapProcessor) aggregateFields(fields map[string]string) { var sb strings.Builder for i, field := range mp.query.GroupBy { @@ -160,12 +166,15 @@ func (mp *MapProcessor) periodicSerialize() { mp.groupSet = mapr.NewGroupSet() } -// defaultSerializeFunc implements the default serialization behavior +// defaultSerializeFunc implements the default serialization behavior for MapReduce results. +// This function is called periodically to send aggregated data to the client. +// It uses a channel-based approach to serialize the group set and format output +// according to the DTail protocol (A|serialized_data¬) for transmission. func (mp *MapProcessor) defaultSerializeFunc(groupSet *mapr.GroupSet) { // Use a channel to collect serialized data ch := make(chan string, 100) done := make(chan struct{}) - + go func() { defer close(done) for msg := range ch { @@ -175,14 +184,14 @@ func (mp *MapProcessor) defaultSerializeFunc(groupSet *mapr.GroupSet) { output.WriteString(protocol.FieldDelimiter) output.WriteString(msg) output.WriteByte(protocol.MessageDelimiter) - + // Write to output immediately if mp.output != nil { mp.output.Write([]byte(output.String())) } } }() - + // Serialize the group set ctx := context.Background() groupSet.Serialize(ctx, ch) @@ -196,4 +205,4 @@ func (mp *MapProcessor) Flush() []byte { mp.serializeFunc(mp.groupSet) } return nil // Output is handled by serializeFunc -}
\ No newline at end of file +} |
