diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/logformat/default.go | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/logformat/default.go')
| -rw-r--r-- | internal/mapr/logformat/default.go | 237 |
1 files changed, 196 insertions, 41 deletions
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index a499bc5..396a589 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/protocol" ) @@ -11,62 +12,216 @@ type defaultParser struct { hostname string timeZoneName string timeZoneOffset string + fieldsCapacity int + + wantStar bool + wantLine bool + wantEmpty bool + wantHostname bool + wantServer bool + wantTimezone bool + wantTimeOffset bool + wantSeverity bool + wantLogLevel bool + wantTime bool + wantDate bool + wantHour bool + wantMinute bool + wantSecond bool + wantPID bool + wantCaller bool + wantCPUs bool + wantGoroutines bool + wantCGOCalls bool + wantLoadAvg bool + wantUptime bool + + allDynamicFields bool + dynamicFields map[string]struct{} } func newDefaultParser(hostname, timeZoneName string, timeZoneOffset int) (*defaultParser, error) { - return &defaultParser{ + parser := &defaultParser{ hostname: hostname, timeZoneName: timeZoneName, timeZoneOffset: fmt.Sprintf("%d", timeZoneOffset), - }, nil + } + parser.configureFieldPlan(mapr.ParserFieldPlan{AllFields: true}) + return parser, nil +} + +func (p *defaultParser) setQuery(query *mapr.Query) { + p.configureFieldPlan(query.ParserFieldPlan()) } func (p *defaultParser) MakeFields(maprLine string) (map[string]string, error) { - splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, p.fieldsCapacity) + tokenIndex := 0 + start := 0 + delimiter := protocol.FieldDelimiter[0] - if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || - !strings.HasPrefix(splitted[0], "INFO") { + for { + token, next, done := scanDelimitedField(maprLine, start, delimiter) + switch { + case tokenIndex == 0: + if !strings.HasPrefix(token, "INFO") { + return nil, ErrIgnoreFields + } + p.addDefaultFields(fields, maprLine) + if p.wantSeverity { + fields["$severity"] = token + } + if p.wantLogLevel { + fields["$loglevel"] = token + } + case tokenIndex == 1: + if p.wantTime { + fields["$time"] = token + } + if len(token) == 15 { + // Example: 20211002-071209 + if p.wantDate { + fields["$date"] = token[0:8] + } + if p.wantHour { + fields["$hour"] = token[9:11] + } + if p.wantMinute { + fields["$minute"] = token[11:13] + } + if p.wantSecond { + fields["$second"] = token[13:] + } + } + case tokenIndex == 2: + if p.wantPID { + fields["$pid"] = token + } + case tokenIndex == 3: + if p.wantCaller { + fields["$caller"] = token + } + case tokenIndex == 4: + if p.wantCPUs { + fields["$cpus"] = token + } + case tokenIndex == 5: + if p.wantGoroutines { + fields["$goroutines"] = token + } + case tokenIndex == 6: + if p.wantCGOCalls { + fields["$cgocalls"] = token + } + case tokenIndex == 7: + if p.wantLoadAvg { + fields["$loadavg"] = token + } + case tokenIndex == 8: + if p.wantUptime { + fields["$uptime"] = token + } + case tokenIndex == 9: + if !strings.HasPrefix(token, "MAPREDUCE:") { + return nil, ErrIgnoreFields + } + default: + if err := p.addKeyValueField(fields, token); err != nil { + return fields, err + } + } + + tokenIndex++ + if done { + break + } + start = next + } + + if tokenIndex < 11 { // Not a DTail mapreduce log line. return nil, ErrIgnoreFields } - fields := make(map[string]string, len(splitted)+8) - - fields["*"] = "*" - fields["$line"] = maprLine - fields["$empty"] = "" - fields["$hostname"] = p.hostname - fields["$server"] = p.hostname - fields["$timezone"] = p.timeZoneName - fields["$timeoffset"] = p.timeZoneOffset - - fields["$severity"] = splitted[0] - fields["$loglevel"] = splitted[0] - - time := splitted[1] - fields["$time"] = time - if len(time) == 15 { - // Example: 20211002-071209 - fields["$date"] = time[0:8] - fields["$hour"] = time[9:11] - fields["$minute"] = time[11:13] - fields["$second"] = time[13:] + return fields, nil +} + +func (p *defaultParser) addDefaultFields(fields map[string]string, maprLine string) { + if p.wantStar { + fields["*"] = "*" } - fields["$pid"] = splitted[2] - fields["$caller"] = splitted[3] - fields["$cpus"] = splitted[4] - fields["$goroutines"] = splitted[5] - fields["$cgocalls"] = splitted[6] - fields["$loadavg"] = splitted[7] - fields["$uptime"] = splitted[8] - - for _, kv := range splitted[10:] { - keyAndValue := strings.SplitN(kv, "=", 2) - if len(keyAndValue) != 2 { - return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) - } - fields[keyAndValue[0]] = keyAndValue[1] + if p.wantLine { + fields["$line"] = maprLine + } + if p.wantEmpty { + fields["$empty"] = "" + } + if p.wantHostname { + fields["$hostname"] = p.hostname + } + if p.wantServer { + fields["$server"] = p.hostname } + if p.wantTimezone { + fields["$timezone"] = p.timeZoneName + } + if p.wantTimeOffset { + fields["$timeoffset"] = p.timeZoneOffset + } +} - return fields, nil +func (p *defaultParser) addDynamicField(fields map[string]string, key string, value string) { + if p.allDynamicFields { + fields[key] = value + return + } + if _, ok := p.dynamicFields[key]; ok { + fields[key] = value + } +} + +func (p *defaultParser) addKeyValueField(fields map[string]string, token string) error { + keyAndValueIndex := strings.IndexByte(token, '=') + if keyAndValueIndex < 0 { + return fmt.Errorf("Unable to parse key-value token '%s'", token) + } + p.addDynamicField(fields, token[:keyAndValueIndex], token[keyAndValueIndex+1:]) + return nil +} + +func (p *defaultParser) configureFieldPlan(plan mapr.ParserFieldPlan) { + p.fieldsCapacity = plan.Capacity() + p.dynamicFields = nil + p.allDynamicFields = plan.AllFields + + p.wantStar = plan.Needs("*") + p.wantLine = plan.Needs("$line") + p.wantEmpty = plan.Needs("$empty") + p.wantHostname = plan.Needs("$hostname") + p.wantServer = plan.Needs("$server") + p.wantTimezone = plan.Needs("$timezone") + p.wantTimeOffset = plan.Needs("$timeoffset") + p.wantSeverity = plan.Needs("$severity") + p.wantLogLevel = plan.Needs("$loglevel") + p.wantTime = plan.Needs("$time") + p.wantDate = plan.Needs("$date") + p.wantHour = plan.Needs("$hour") + p.wantMinute = plan.Needs("$minute") + p.wantSecond = plan.Needs("$second") + p.wantPID = plan.Needs("$pid") + p.wantCaller = plan.Needs("$caller") + p.wantCPUs = plan.Needs("$cpus") + p.wantGoroutines = plan.Needs("$goroutines") + p.wantCGOCalls = plan.Needs("$cgocalls") + p.wantLoadAvg = plan.Needs("$loadavg") + p.wantUptime = plan.Needs("$uptime") + + if plan.AllFields { + return + } + + p.dynamicFields = make(map[string]struct{}, len(plan.Fields)) + for field := range plan.Fields { + p.dynamicFields[field] = struct{}{} + } } |
