summaryrefslogtreecommitdiff
path: root/internal/mapr/logformat/default.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
commit13b21feb07c86f65760f7338f284f3b492364cd9 (patch)
treec9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/logformat/default.go
parentda8e581617a0240626d2bc922916416440e65bae (diff)
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/logformat/default.go')
-rw-r--r--internal/mapr/logformat/default.go237
1 files changed, 196 insertions, 41 deletions
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index a499bc5..396a589 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
+ "github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -11,62 +12,216 @@ type defaultParser struct {
hostname string
timeZoneName string
timeZoneOffset string
+ fieldsCapacity int
+
+ wantStar bool
+ wantLine bool
+ wantEmpty bool
+ wantHostname bool
+ wantServer bool
+ wantTimezone bool
+ wantTimeOffset bool
+ wantSeverity bool
+ wantLogLevel bool
+ wantTime bool
+ wantDate bool
+ wantHour bool
+ wantMinute bool
+ wantSecond bool
+ wantPID bool
+ wantCaller bool
+ wantCPUs bool
+ wantGoroutines bool
+ wantCGOCalls bool
+ wantLoadAvg bool
+ wantUptime bool
+
+ allDynamicFields bool
+ dynamicFields map[string]struct{}
}
func newDefaultParser(hostname, timeZoneName string, timeZoneOffset int) (*defaultParser, error) {
- return &defaultParser{
+ parser := &defaultParser{
hostname: hostname,
timeZoneName: timeZoneName,
timeZoneOffset: fmt.Sprintf("%d", timeZoneOffset),
- }, nil
+ }
+ parser.configureFieldPlan(mapr.ParserFieldPlan{AllFields: true})
+ return parser, nil
+}
+
+func (p *defaultParser) setQuery(query *mapr.Query) {
+ p.configureFieldPlan(query.ParserFieldPlan())
}
func (p *defaultParser) MakeFields(maprLine string) (map[string]string, error) {
- splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+ fields := make(map[string]string, p.fieldsCapacity)
+ tokenIndex := 0
+ start := 0
+ delimiter := protocol.FieldDelimiter[0]
- if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") ||
- !strings.HasPrefix(splitted[0], "INFO") {
+ for {
+ token, next, done := scanDelimitedField(maprLine, start, delimiter)
+ switch {
+ case tokenIndex == 0:
+ if !strings.HasPrefix(token, "INFO") {
+ return nil, ErrIgnoreFields
+ }
+ p.addDefaultFields(fields, maprLine)
+ if p.wantSeverity {
+ fields["$severity"] = token
+ }
+ if p.wantLogLevel {
+ fields["$loglevel"] = token
+ }
+ case tokenIndex == 1:
+ if p.wantTime {
+ fields["$time"] = token
+ }
+ if len(token) == 15 {
+ // Example: 20211002-071209
+ if p.wantDate {
+ fields["$date"] = token[0:8]
+ }
+ if p.wantHour {
+ fields["$hour"] = token[9:11]
+ }
+ if p.wantMinute {
+ fields["$minute"] = token[11:13]
+ }
+ if p.wantSecond {
+ fields["$second"] = token[13:]
+ }
+ }
+ case tokenIndex == 2:
+ if p.wantPID {
+ fields["$pid"] = token
+ }
+ case tokenIndex == 3:
+ if p.wantCaller {
+ fields["$caller"] = token
+ }
+ case tokenIndex == 4:
+ if p.wantCPUs {
+ fields["$cpus"] = token
+ }
+ case tokenIndex == 5:
+ if p.wantGoroutines {
+ fields["$goroutines"] = token
+ }
+ case tokenIndex == 6:
+ if p.wantCGOCalls {
+ fields["$cgocalls"] = token
+ }
+ case tokenIndex == 7:
+ if p.wantLoadAvg {
+ fields["$loadavg"] = token
+ }
+ case tokenIndex == 8:
+ if p.wantUptime {
+ fields["$uptime"] = token
+ }
+ case tokenIndex == 9:
+ if !strings.HasPrefix(token, "MAPREDUCE:") {
+ return nil, ErrIgnoreFields
+ }
+ default:
+ if err := p.addKeyValueField(fields, token); err != nil {
+ return fields, err
+ }
+ }
+
+ tokenIndex++
+ if done {
+ break
+ }
+ start = next
+ }
+
+ if tokenIndex < 11 {
// Not a DTail mapreduce log line.
return nil, ErrIgnoreFields
}
- fields := make(map[string]string, len(splitted)+8)
-
- fields["*"] = "*"
- fields["$line"] = maprLine
- fields["$empty"] = ""
- fields["$hostname"] = p.hostname
- fields["$server"] = p.hostname
- fields["$timezone"] = p.timeZoneName
- fields["$timeoffset"] = p.timeZoneOffset
-
- fields["$severity"] = splitted[0]
- fields["$loglevel"] = splitted[0]
-
- time := splitted[1]
- fields["$time"] = time
- if len(time) == 15 {
- // Example: 20211002-071209
- fields["$date"] = time[0:8]
- fields["$hour"] = time[9:11]
- fields["$minute"] = time[11:13]
- fields["$second"] = time[13:]
+ return fields, nil
+}
+
+func (p *defaultParser) addDefaultFields(fields map[string]string, maprLine string) {
+ if p.wantStar {
+ fields["*"] = "*"
}
- fields["$pid"] = splitted[2]
- fields["$caller"] = splitted[3]
- fields["$cpus"] = splitted[4]
- fields["$goroutines"] = splitted[5]
- fields["$cgocalls"] = splitted[6]
- fields["$loadavg"] = splitted[7]
- fields["$uptime"] = splitted[8]
-
- for _, kv := range splitted[10:] {
- keyAndValue := strings.SplitN(kv, "=", 2)
- if len(keyAndValue) != 2 {
- return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv)
- }
- fields[keyAndValue[0]] = keyAndValue[1]
+ if p.wantLine {
+ fields["$line"] = maprLine
+ }
+ if p.wantEmpty {
+ fields["$empty"] = ""
+ }
+ if p.wantHostname {
+ fields["$hostname"] = p.hostname
+ }
+ if p.wantServer {
+ fields["$server"] = p.hostname
}
+ if p.wantTimezone {
+ fields["$timezone"] = p.timeZoneName
+ }
+ if p.wantTimeOffset {
+ fields["$timeoffset"] = p.timeZoneOffset
+ }
+}
- return fields, nil
+func (p *defaultParser) addDynamicField(fields map[string]string, key string, value string) {
+ if p.allDynamicFields {
+ fields[key] = value
+ return
+ }
+ if _, ok := p.dynamicFields[key]; ok {
+ fields[key] = value
+ }
+}
+
+func (p *defaultParser) addKeyValueField(fields map[string]string, token string) error {
+ keyAndValueIndex := strings.IndexByte(token, '=')
+ if keyAndValueIndex < 0 {
+ return fmt.Errorf("Unable to parse key-value token '%s'", token)
+ }
+ p.addDynamicField(fields, token[:keyAndValueIndex], token[keyAndValueIndex+1:])
+ return nil
+}
+
+func (p *defaultParser) configureFieldPlan(plan mapr.ParserFieldPlan) {
+ p.fieldsCapacity = plan.Capacity()
+ p.dynamicFields = nil
+ p.allDynamicFields = plan.AllFields
+
+ p.wantStar = plan.Needs("*")
+ p.wantLine = plan.Needs("$line")
+ p.wantEmpty = plan.Needs("$empty")
+ p.wantHostname = plan.Needs("$hostname")
+ p.wantServer = plan.Needs("$server")
+ p.wantTimezone = plan.Needs("$timezone")
+ p.wantTimeOffset = plan.Needs("$timeoffset")
+ p.wantSeverity = plan.Needs("$severity")
+ p.wantLogLevel = plan.Needs("$loglevel")
+ p.wantTime = plan.Needs("$time")
+ p.wantDate = plan.Needs("$date")
+ p.wantHour = plan.Needs("$hour")
+ p.wantMinute = plan.Needs("$minute")
+ p.wantSecond = plan.Needs("$second")
+ p.wantPID = plan.Needs("$pid")
+ p.wantCaller = plan.Needs("$caller")
+ p.wantCPUs = plan.Needs("$cpus")
+ p.wantGoroutines = plan.Needs("$goroutines")
+ p.wantCGOCalls = plan.Needs("$cgocalls")
+ p.wantLoadAvg = plan.Needs("$loadavg")
+ p.wantUptime = plan.Needs("$uptime")
+
+ if plan.AllFields {
+ return
+ }
+
+ p.dynamicFields = make(map[string]struct{}, len(plan.Fields))
+ for field := range plan.Fields {
+ p.dynamicFields[field] = struct{}{}
+ }
}