diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/logformat | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/logformat')
| -rw-r--r-- | internal/mapr/logformat/csv.go | 40 | ||||
| -rw-r--r-- | internal/mapr/logformat/default.go | 237 | ||||
| -rw-r--r-- | internal/mapr/logformat/default_benchmark_test.go | 44 | ||||
| -rw-r--r-- | internal/mapr/logformat/default_test.go | 35 | ||||
| -rw-r--r-- | internal/mapr/logformat/delimited.go | 12 | ||||
| -rw-r--r-- | internal/mapr/logformat/generic.go | 11 | ||||
| -rw-r--r-- | internal/mapr/logformat/generickv.go | 34 | ||||
| -rw-r--r-- | internal/mapr/logformat/parser.go | 20 |
8 files changed, 346 insertions, 87 deletions
diff --git a/internal/mapr/logformat/csv.go b/internal/mapr/logformat/csv.go index b8f565c..ecb1f8b 100644 --- a/internal/mapr/logformat/csv.go +++ b/internal/mapr/logformat/csv.go @@ -2,7 +2,6 @@ package logformat import ( "fmt" - "strings" "github.com/mimecast/dtail/internal/protocol" ) @@ -29,27 +28,38 @@ func (p *csvParser) MakeFields(maprLine string) (map[string]string, error) { return nil, ErrIgnoreFields } - fields := make(map[string]string, 7+len(p.header)) - fields["*"] = "*" - fields["$hostname"] = p.hostname - fields["$server"] = p.hostname - fields["$line"] = maprLine - fields["$empty"] = "" - fields["$timezone"] = p.timeZoneName - fields["$timeoffset"] = p.timeZoneOffset - - splitted := strings.Split(maprLine, protocol.CSVDelimiter) - for i, value := range splitted { - if i >= len(p.header) { + fields := make(map[string]string, p.fieldsCapacity) + p.addDefaultFields(fields, maprLine) + start := 0 + column := 0 + delimiter := protocol.CSVDelimiter[0] + + for { + value, next, done := scanDelimitedField(maprLine, start, delimiter) + if column >= len(p.header) { return fields, fmt.Errorf("CSV file seems corrupted, more fields than header values?") } - fields[p.header[i]] = value + p.addDynamicField(fields, p.header[column], value) + column++ + if done { + break + } + start = next } return fields, nil } func (p *csvParser) parseHeader(maprLine string) { - p.header = strings.Split(maprLine, protocol.CSVDelimiter) + start := 0 + delimiter := protocol.CSVDelimiter[0] + for { + header, next, done := scanDelimitedField(maprLine, start, delimiter) + p.header = append(p.header, header) + if done { + break + } + start = next + } p.hasHeader = true } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index a499bc5..396a589 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/protocol" ) @@ -11,62 +12,216 @@ type defaultParser struct { hostname string timeZoneName string timeZoneOffset string + fieldsCapacity int + + wantStar bool + wantLine bool + wantEmpty bool + wantHostname bool + wantServer bool + wantTimezone bool + wantTimeOffset bool + wantSeverity bool + wantLogLevel bool + wantTime bool + wantDate bool + wantHour bool + wantMinute bool + wantSecond bool + wantPID bool + wantCaller bool + wantCPUs bool + wantGoroutines bool + wantCGOCalls bool + wantLoadAvg bool + wantUptime bool + + allDynamicFields bool + dynamicFields map[string]struct{} } func newDefaultParser(hostname, timeZoneName string, timeZoneOffset int) (*defaultParser, error) { - return &defaultParser{ + parser := &defaultParser{ hostname: hostname, timeZoneName: timeZoneName, timeZoneOffset: fmt.Sprintf("%d", timeZoneOffset), - }, nil + } + parser.configureFieldPlan(mapr.ParserFieldPlan{AllFields: true}) + return parser, nil +} + +func (p *defaultParser) setQuery(query *mapr.Query) { + p.configureFieldPlan(query.ParserFieldPlan()) } func (p *defaultParser) MakeFields(maprLine string) (map[string]string, error) { - splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, p.fieldsCapacity) + tokenIndex := 0 + start := 0 + delimiter := protocol.FieldDelimiter[0] - if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || - !strings.HasPrefix(splitted[0], "INFO") { + for { + token, next, done := scanDelimitedField(maprLine, start, delimiter) + switch { + case tokenIndex == 0: + if !strings.HasPrefix(token, "INFO") { + return nil, ErrIgnoreFields + } + p.addDefaultFields(fields, maprLine) + if p.wantSeverity { + fields["$severity"] = token + } + if p.wantLogLevel { + fields["$loglevel"] = token + } + case tokenIndex == 1: + if p.wantTime { + fields["$time"] = token + } + if len(token) == 15 { + // Example: 20211002-071209 + if p.wantDate { + fields["$date"] = token[0:8] + } + if p.wantHour { + fields["$hour"] = token[9:11] + } + if p.wantMinute { + fields["$minute"] = token[11:13] + } + if p.wantSecond { + fields["$second"] = token[13:] + } + } + case tokenIndex == 2: + if p.wantPID { + fields["$pid"] = token + } + case tokenIndex == 3: + if p.wantCaller { + fields["$caller"] = token + } + case tokenIndex == 4: + if p.wantCPUs { + fields["$cpus"] = token + } + case tokenIndex == 5: + if p.wantGoroutines { + fields["$goroutines"] = token + } + case tokenIndex == 6: + if p.wantCGOCalls { + fields["$cgocalls"] = token + } + case tokenIndex == 7: + if p.wantLoadAvg { + fields["$loadavg"] = token + } + case tokenIndex == 8: + if p.wantUptime { + fields["$uptime"] = token + } + case tokenIndex == 9: + if !strings.HasPrefix(token, "MAPREDUCE:") { + return nil, ErrIgnoreFields + } + default: + if err := p.addKeyValueField(fields, token); err != nil { + return fields, err + } + } + + tokenIndex++ + if done { + break + } + start = next + } + + if tokenIndex < 11 { // Not a DTail mapreduce log line. return nil, ErrIgnoreFields } - fields := make(map[string]string, len(splitted)+8) - - fields["*"] = "*" - fields["$line"] = maprLine - fields["$empty"] = "" - fields["$hostname"] = p.hostname - fields["$server"] = p.hostname - fields["$timezone"] = p.timeZoneName - fields["$timeoffset"] = p.timeZoneOffset - - fields["$severity"] = splitted[0] - fields["$loglevel"] = splitted[0] - - time := splitted[1] - fields["$time"] = time - if len(time) == 15 { - // Example: 20211002-071209 - fields["$date"] = time[0:8] - fields["$hour"] = time[9:11] - fields["$minute"] = time[11:13] - fields["$second"] = time[13:] + return fields, nil +} + +func (p *defaultParser) addDefaultFields(fields map[string]string, maprLine string) { + if p.wantStar { + fields["*"] = "*" } - fields["$pid"] = splitted[2] - fields["$caller"] = splitted[3] - fields["$cpus"] = splitted[4] - fields["$goroutines"] = splitted[5] - fields["$cgocalls"] = splitted[6] - fields["$loadavg"] = splitted[7] - fields["$uptime"] = splitted[8] - - for _, kv := range splitted[10:] { - keyAndValue := strings.SplitN(kv, "=", 2) - if len(keyAndValue) != 2 { - return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) - } - fields[keyAndValue[0]] = keyAndValue[1] + if p.wantLine { + fields["$line"] = maprLine + } + if p.wantEmpty { + fields["$empty"] = "" + } + if p.wantHostname { + fields["$hostname"] = p.hostname + } + if p.wantServer { + fields["$server"] = p.hostname } + if p.wantTimezone { + fields["$timezone"] = p.timeZoneName + } + if p.wantTimeOffset { + fields["$timeoffset"] = p.timeZoneOffset + } +} - return fields, nil +func (p *defaultParser) addDynamicField(fields map[string]string, key string, value string) { + if p.allDynamicFields { + fields[key] = value + return + } + if _, ok := p.dynamicFields[key]; ok { + fields[key] = value + } +} + +func (p *defaultParser) addKeyValueField(fields map[string]string, token string) error { + keyAndValueIndex := strings.IndexByte(token, '=') + if keyAndValueIndex < 0 { + return fmt.Errorf("Unable to parse key-value token '%s'", token) + } + p.addDynamicField(fields, token[:keyAndValueIndex], token[keyAndValueIndex+1:]) + return nil +} + +func (p *defaultParser) configureFieldPlan(plan mapr.ParserFieldPlan) { + p.fieldsCapacity = plan.Capacity() + p.dynamicFields = nil + p.allDynamicFields = plan.AllFields + + p.wantStar = plan.Needs("*") + p.wantLine = plan.Needs("$line") + p.wantEmpty = plan.Needs("$empty") + p.wantHostname = plan.Needs("$hostname") + p.wantServer = plan.Needs("$server") + p.wantTimezone = plan.Needs("$timezone") + p.wantTimeOffset = plan.Needs("$timeoffset") + p.wantSeverity = plan.Needs("$severity") + p.wantLogLevel = plan.Needs("$loglevel") + p.wantTime = plan.Needs("$time") + p.wantDate = plan.Needs("$date") + p.wantHour = plan.Needs("$hour") + p.wantMinute = plan.Needs("$minute") + p.wantSecond = plan.Needs("$second") + p.wantPID = plan.Needs("$pid") + p.wantCaller = plan.Needs("$caller") + p.wantCPUs = plan.Needs("$cpus") + p.wantGoroutines = plan.Needs("$goroutines") + p.wantCGOCalls = plan.Needs("$cgocalls") + p.wantLoadAvg = plan.Needs("$loadavg") + p.wantUptime = plan.Needs("$uptime") + + if plan.AllFields { + return + } + + p.dynamicFields = make(map[string]struct{}, len(plan.Fields)) + for field := range plan.Fields { + p.dynamicFields[field] = struct{}{} + } } diff --git a/internal/mapr/logformat/default_benchmark_test.go b/internal/mapr/logformat/default_benchmark_test.go new file mode 100644 index 0000000..b3ae400 --- /dev/null +++ b/internal/mapr/logformat/default_benchmark_test.go @@ -0,0 +1,44 @@ +package logformat + +import ( + "testing" + + "github.com/mimecast/dtail/internal/mapr" +) + +func BenchmarkDefaultParserMakeFields(b *testing.B) { + input := "INFO|20211002-072342|1|default_benchmark_test.go:0|8|14|7|0.21|471h0m21s|" + + "MAPREDUCE:STATS|foo=bar|bar=baz|qux=quux|alpha=beta|gamma=delta" + + b.Run("all_fields", func(b *testing.B) { + parser, err := NewParser("default", nil) + if err != nil { + b.Fatalf("Unable to create parser: %s", err.Error()) + } + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if _, err := parser.MakeFields(input); err != nil { + b.Fatalf("Unable to parse input: %s", err.Error()) + } + } + }) + + b.Run("query_specific", func(b *testing.B) { + q, err := mapr.NewQuery(`select count(foo) from STATS where bar eq "baz"`) + if err != nil { + b.Fatalf("Unable to create query: %s", err.Error()) + } + parser, err := NewParser("default", q) + if err != nil { + b.Fatalf("Unable to create parser: %s", err.Error()) + } + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if _, err := parser.MakeFields(input); err != nil { + b.Fatalf("Unable to parse input: %s", err.Error()) + } + } + }) +} diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index edf238f..6417c2f 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -3,6 +3,8 @@ package logformat import ( "fmt" "testing" + + "github.com/mimecast/dtail/internal/mapr" ) func TestDefaultLogFormat(t *testing.T) { @@ -95,3 +97,36 @@ func TestDefaultLogFormat(t *testing.T) { t.Errorf("Expected fiending field 'foo', but found it\n") } } + +func TestDefaultLogFormatQuerySpecificFields(t *testing.T) { + q, err := mapr.NewQuery(`select count(foo) from STATS where $hostname eq "testhost"`) + if err != nil { + t.Fatalf("Unable to create query: %s", err.Error()) + } + + parser, err := NewParser("default", q) + if err != nil { + t.Fatalf("Unable to create parser: %s", err.Error()) + } + + fields, err := parser.MakeFields( + "INFO|20211002-072342|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|foo=bar|bar=baz", + ) + if err != nil { + t.Fatalf("Parser unable to make fields: %s", err.Error()) + } + + requiredFields := []string{"foo", "$hostname"} + for _, field := range requiredFields { + if _, ok := fields[field]; !ok { + t.Errorf("Expected query-specific field '%s' to be present", field) + } + } + + omittedFields := []string{"bar", "$time", "$pid", "$line"} + for _, field := range omittedFields { + if _, ok := fields[field]; ok { + t.Errorf("Expected query-specific field '%s' to be omitted", field) + } + } +} diff --git a/internal/mapr/logformat/delimited.go b/internal/mapr/logformat/delimited.go new file mode 100644 index 0000000..2fa0639 --- /dev/null +++ b/internal/mapr/logformat/delimited.go @@ -0,0 +1,12 @@ +package logformat + +import "strings" + +func scanDelimitedField(input string, start int, delimiter byte) (token string, next int, done bool) { + index := strings.IndexByte(input[start:], delimiter) + if index < 0 { + return input[start:], len(input), true + } + end := start + index + return input[start:end], end + 1, false +} diff --git a/internal/mapr/logformat/generic.go b/internal/mapr/logformat/generic.go index 1350eff..ecb9e75 100644 --- a/internal/mapr/logformat/generic.go +++ b/internal/mapr/logformat/generic.go @@ -15,15 +15,8 @@ func newGenericParser(hostname, timeZoneName string, timeZoneOffset int) (*gener } func (p *genericParser) MakeFields(maprLine string) (map[string]string, error) { - fields := make(map[string]string, 3) - - fields["*"] = "*" - fields["$hostname"] = p.hostname - fields["$server"] = p.hostname - fields["$line"] = maprLine - fields["$empty"] = "" - fields["$timezone"] = p.timeZoneName - fields["$timeoffset"] = p.timeZoneOffset + fields := make(map[string]string, p.fieldsCapacity) + p.addDefaultFields(fields, maprLine) return fields, nil } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go index bd9aad5..b5da8c1 100644 --- a/internal/mapr/logformat/generickv.go +++ b/internal/mapr/logformat/generickv.go @@ -1,10 +1,6 @@ package logformat -import ( - "strings" - - "github.com/mimecast/dtail/internal/protocol" -) +import "github.com/mimecast/dtail/internal/protocol" type genericKVParser struct { defaultParser @@ -21,24 +17,20 @@ func newGenericKVParser(hostname, timeZoneName string, timeZoneOffset int) (*gen } func (p *genericKVParser) MakeFields(maprLine string) (map[string]string, error) { - splitted := strings.Split(maprLine, protocol.FieldDelimiter) - fields := make(map[string]string, len(splitted)) - - fields["*"] = "*" - fields["$line"] = maprLine - fields["$empty"] = "" - fields["$hostname"] = p.hostname - fields["$server"] = p.hostname - fields["$timezone"] = p.timeZoneName - fields["$timeoffset"] = p.timeZoneOffset - - for _, kv := range splitted[0:] { - keyAndValue := strings.SplitN(kv, "=", 2) - if len(keyAndValue) != 2 { - //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv) + fields := make(map[string]string, p.fieldsCapacity) + p.addDefaultFields(fields, maprLine) + start := 0 + delimiter := protocol.FieldDelimiter[0] + + for { + token, next, done := scanDelimitedField(maprLine, start, delimiter) + if err := p.addKeyValueField(fields, token); err != nil { continue } - fields[keyAndValue[0]] = keyAndValue[1] + if done { + break + } + start = next } return fields, nil diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index b6ed87d..d7db826 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -20,6 +20,10 @@ type Parser interface { MakeFields(string) (map[string]string, error) } +type queryAwareParser interface { + setQuery(*mapr.Query) +} + // ParserFactory builds a Parser for a specific log format. type ParserFactory func(hostname, timeZoneName string, timeZoneOffset int) (Parser, error) @@ -86,7 +90,9 @@ func NewParser(logFormatName string, query *mapr.Query) (Parser, error) { timeZoneName, timeZoneOffset := now.Zone() if parserFactory, found := getParserFactory(logFormatName); found { - return parserFactory(hostname, timeZoneName, timeZoneOffset) + parser, err := parserFactory(hostname, timeZoneName, timeZoneOffset) + configureParserQuery(parser, query) + return parser, err } defaultFactory, found := getParserFactory("default") @@ -99,5 +105,17 @@ func NewParser(logFormatName string, query *mapr.Query) (Parser, error) { return p, fmt.Errorf("No '%s' mapr log format and problem creating default one: %v", logFormatName, err) } + configureParserQuery(p, query) return p, fmt.Errorf("No '%s' mapr log format", logFormatName) } + +func configureParserQuery(parser Parser, query *mapr.Query) { + if parser == nil { + return + } + queryAware, ok := parser.(queryAwareParser) + if !ok { + return + } + queryAware.setQuery(query) +} |
