summaryrefslogtreecommitdiff
path: root/internal/mapr/logformat
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
commit13b21feb07c86f65760f7338f284f3b492364cd9 (patch)
treec9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/logformat
parentda8e581617a0240626d2bc922916416440e65bae (diff)
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/logformat')
-rw-r--r--internal/mapr/logformat/csv.go40
-rw-r--r--internal/mapr/logformat/default.go237
-rw-r--r--internal/mapr/logformat/default_benchmark_test.go44
-rw-r--r--internal/mapr/logformat/default_test.go35
-rw-r--r--internal/mapr/logformat/delimited.go12
-rw-r--r--internal/mapr/logformat/generic.go11
-rw-r--r--internal/mapr/logformat/generickv.go34
-rw-r--r--internal/mapr/logformat/parser.go20
8 files changed, 346 insertions, 87 deletions
diff --git a/internal/mapr/logformat/csv.go b/internal/mapr/logformat/csv.go
index b8f565c..ecb1f8b 100644
--- a/internal/mapr/logformat/csv.go
+++ b/internal/mapr/logformat/csv.go
@@ -2,7 +2,6 @@ package logformat
import (
"fmt"
- "strings"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -29,27 +28,38 @@ func (p *csvParser) MakeFields(maprLine string) (map[string]string, error) {
return nil, ErrIgnoreFields
}
- fields := make(map[string]string, 7+len(p.header))
- fields["*"] = "*"
- fields["$hostname"] = p.hostname
- fields["$server"] = p.hostname
- fields["$line"] = maprLine
- fields["$empty"] = ""
- fields["$timezone"] = p.timeZoneName
- fields["$timeoffset"] = p.timeZoneOffset
-
- splitted := strings.Split(maprLine, protocol.CSVDelimiter)
- for i, value := range splitted {
- if i >= len(p.header) {
+ fields := make(map[string]string, p.fieldsCapacity)
+ p.addDefaultFields(fields, maprLine)
+ start := 0
+ column := 0
+ delimiter := protocol.CSVDelimiter[0]
+
+ for {
+ value, next, done := scanDelimitedField(maprLine, start, delimiter)
+ if column >= len(p.header) {
return fields, fmt.Errorf("CSV file seems corrupted, more fields than header values?")
}
- fields[p.header[i]] = value
+ p.addDynamicField(fields, p.header[column], value)
+ column++
+ if done {
+ break
+ }
+ start = next
}
return fields, nil
}
func (p *csvParser) parseHeader(maprLine string) {
- p.header = strings.Split(maprLine, protocol.CSVDelimiter)
+ start := 0
+ delimiter := protocol.CSVDelimiter[0]
+ for {
+ header, next, done := scanDelimitedField(maprLine, start, delimiter)
+ p.header = append(p.header, header)
+ if done {
+ break
+ }
+ start = next
+ }
p.hasHeader = true
}
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index a499bc5..396a589 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
+ "github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -11,62 +12,216 @@ type defaultParser struct {
hostname string
timeZoneName string
timeZoneOffset string
+ fieldsCapacity int
+
+ wantStar bool
+ wantLine bool
+ wantEmpty bool
+ wantHostname bool
+ wantServer bool
+ wantTimezone bool
+ wantTimeOffset bool
+ wantSeverity bool
+ wantLogLevel bool
+ wantTime bool
+ wantDate bool
+ wantHour bool
+ wantMinute bool
+ wantSecond bool
+ wantPID bool
+ wantCaller bool
+ wantCPUs bool
+ wantGoroutines bool
+ wantCGOCalls bool
+ wantLoadAvg bool
+ wantUptime bool
+
+ allDynamicFields bool
+ dynamicFields map[string]struct{}
}
func newDefaultParser(hostname, timeZoneName string, timeZoneOffset int) (*defaultParser, error) {
- return &defaultParser{
+ parser := &defaultParser{
hostname: hostname,
timeZoneName: timeZoneName,
timeZoneOffset: fmt.Sprintf("%d", timeZoneOffset),
- }, nil
+ }
+ parser.configureFieldPlan(mapr.ParserFieldPlan{AllFields: true})
+ return parser, nil
+}
+
+func (p *defaultParser) setQuery(query *mapr.Query) {
+ p.configureFieldPlan(query.ParserFieldPlan())
}
func (p *defaultParser) MakeFields(maprLine string) (map[string]string, error) {
- splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+ fields := make(map[string]string, p.fieldsCapacity)
+ tokenIndex := 0
+ start := 0
+ delimiter := protocol.FieldDelimiter[0]
- if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") ||
- !strings.HasPrefix(splitted[0], "INFO") {
+ for {
+ token, next, done := scanDelimitedField(maprLine, start, delimiter)
+ switch {
+ case tokenIndex == 0:
+ if !strings.HasPrefix(token, "INFO") {
+ return nil, ErrIgnoreFields
+ }
+ p.addDefaultFields(fields, maprLine)
+ if p.wantSeverity {
+ fields["$severity"] = token
+ }
+ if p.wantLogLevel {
+ fields["$loglevel"] = token
+ }
+ case tokenIndex == 1:
+ if p.wantTime {
+ fields["$time"] = token
+ }
+ if len(token) == 15 {
+ // Example: 20211002-071209
+ if p.wantDate {
+ fields["$date"] = token[0:8]
+ }
+ if p.wantHour {
+ fields["$hour"] = token[9:11]
+ }
+ if p.wantMinute {
+ fields["$minute"] = token[11:13]
+ }
+ if p.wantSecond {
+ fields["$second"] = token[13:]
+ }
+ }
+ case tokenIndex == 2:
+ if p.wantPID {
+ fields["$pid"] = token
+ }
+ case tokenIndex == 3:
+ if p.wantCaller {
+ fields["$caller"] = token
+ }
+ case tokenIndex == 4:
+ if p.wantCPUs {
+ fields["$cpus"] = token
+ }
+ case tokenIndex == 5:
+ if p.wantGoroutines {
+ fields["$goroutines"] = token
+ }
+ case tokenIndex == 6:
+ if p.wantCGOCalls {
+ fields["$cgocalls"] = token
+ }
+ case tokenIndex == 7:
+ if p.wantLoadAvg {
+ fields["$loadavg"] = token
+ }
+ case tokenIndex == 8:
+ if p.wantUptime {
+ fields["$uptime"] = token
+ }
+ case tokenIndex == 9:
+ if !strings.HasPrefix(token, "MAPREDUCE:") {
+ return nil, ErrIgnoreFields
+ }
+ default:
+ if err := p.addKeyValueField(fields, token); err != nil {
+ return fields, err
+ }
+ }
+
+ tokenIndex++
+ if done {
+ break
+ }
+ start = next
+ }
+
+ if tokenIndex < 11 {
// Not a DTail mapreduce log line.
return nil, ErrIgnoreFields
}
- fields := make(map[string]string, len(splitted)+8)
-
- fields["*"] = "*"
- fields["$line"] = maprLine
- fields["$empty"] = ""
- fields["$hostname"] = p.hostname
- fields["$server"] = p.hostname
- fields["$timezone"] = p.timeZoneName
- fields["$timeoffset"] = p.timeZoneOffset
-
- fields["$severity"] = splitted[0]
- fields["$loglevel"] = splitted[0]
-
- time := splitted[1]
- fields["$time"] = time
- if len(time) == 15 {
- // Example: 20211002-071209
- fields["$date"] = time[0:8]
- fields["$hour"] = time[9:11]
- fields["$minute"] = time[11:13]
- fields["$second"] = time[13:]
+ return fields, nil
+}
+
+func (p *defaultParser) addDefaultFields(fields map[string]string, maprLine string) {
+ if p.wantStar {
+ fields["*"] = "*"
}
- fields["$pid"] = splitted[2]
- fields["$caller"] = splitted[3]
- fields["$cpus"] = splitted[4]
- fields["$goroutines"] = splitted[5]
- fields["$cgocalls"] = splitted[6]
- fields["$loadavg"] = splitted[7]
- fields["$uptime"] = splitted[8]
-
- for _, kv := range splitted[10:] {
- keyAndValue := strings.SplitN(kv, "=", 2)
- if len(keyAndValue) != 2 {
- return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv)
- }
- fields[keyAndValue[0]] = keyAndValue[1]
+ if p.wantLine {
+ fields["$line"] = maprLine
+ }
+ if p.wantEmpty {
+ fields["$empty"] = ""
+ }
+ if p.wantHostname {
+ fields["$hostname"] = p.hostname
+ }
+ if p.wantServer {
+ fields["$server"] = p.hostname
}
+ if p.wantTimezone {
+ fields["$timezone"] = p.timeZoneName
+ }
+ if p.wantTimeOffset {
+ fields["$timeoffset"] = p.timeZoneOffset
+ }
+}
- return fields, nil
+func (p *defaultParser) addDynamicField(fields map[string]string, key string, value string) {
+ if p.allDynamicFields {
+ fields[key] = value
+ return
+ }
+ if _, ok := p.dynamicFields[key]; ok {
+ fields[key] = value
+ }
+}
+
+func (p *defaultParser) addKeyValueField(fields map[string]string, token string) error {
+ keyAndValueIndex := strings.IndexByte(token, '=')
+ if keyAndValueIndex < 0 {
+ return fmt.Errorf("Unable to parse key-value token '%s'", token)
+ }
+ p.addDynamicField(fields, token[:keyAndValueIndex], token[keyAndValueIndex+1:])
+ return nil
+}
+
+func (p *defaultParser) configureFieldPlan(plan mapr.ParserFieldPlan) {
+ p.fieldsCapacity = plan.Capacity()
+ p.dynamicFields = nil
+ p.allDynamicFields = plan.AllFields
+
+ p.wantStar = plan.Needs("*")
+ p.wantLine = plan.Needs("$line")
+ p.wantEmpty = plan.Needs("$empty")
+ p.wantHostname = plan.Needs("$hostname")
+ p.wantServer = plan.Needs("$server")
+ p.wantTimezone = plan.Needs("$timezone")
+ p.wantTimeOffset = plan.Needs("$timeoffset")
+ p.wantSeverity = plan.Needs("$severity")
+ p.wantLogLevel = plan.Needs("$loglevel")
+ p.wantTime = plan.Needs("$time")
+ p.wantDate = plan.Needs("$date")
+ p.wantHour = plan.Needs("$hour")
+ p.wantMinute = plan.Needs("$minute")
+ p.wantSecond = plan.Needs("$second")
+ p.wantPID = plan.Needs("$pid")
+ p.wantCaller = plan.Needs("$caller")
+ p.wantCPUs = plan.Needs("$cpus")
+ p.wantGoroutines = plan.Needs("$goroutines")
+ p.wantCGOCalls = plan.Needs("$cgocalls")
+ p.wantLoadAvg = plan.Needs("$loadavg")
+ p.wantUptime = plan.Needs("$uptime")
+
+ if plan.AllFields {
+ return
+ }
+
+ p.dynamicFields = make(map[string]struct{}, len(plan.Fields))
+ for field := range plan.Fields {
+ p.dynamicFields[field] = struct{}{}
+ }
}
diff --git a/internal/mapr/logformat/default_benchmark_test.go b/internal/mapr/logformat/default_benchmark_test.go
new file mode 100644
index 0000000..b3ae400
--- /dev/null
+++ b/internal/mapr/logformat/default_benchmark_test.go
@@ -0,0 +1,44 @@
+package logformat
+
+import (
+ "testing"
+
+ "github.com/mimecast/dtail/internal/mapr"
+)
+
+func BenchmarkDefaultParserMakeFields(b *testing.B) {
+ input := "INFO|20211002-072342|1|default_benchmark_test.go:0|8|14|7|0.21|471h0m21s|" +
+ "MAPREDUCE:STATS|foo=bar|bar=baz|qux=quux|alpha=beta|gamma=delta"
+
+ b.Run("all_fields", func(b *testing.B) {
+ parser, err := NewParser("default", nil)
+ if err != nil {
+ b.Fatalf("Unable to create parser: %s", err.Error())
+ }
+
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ if _, err := parser.MakeFields(input); err != nil {
+ b.Fatalf("Unable to parse input: %s", err.Error())
+ }
+ }
+ })
+
+ b.Run("query_specific", func(b *testing.B) {
+ q, err := mapr.NewQuery(`select count(foo) from STATS where bar eq "baz"`)
+ if err != nil {
+ b.Fatalf("Unable to create query: %s", err.Error())
+ }
+ parser, err := NewParser("default", q)
+ if err != nil {
+ b.Fatalf("Unable to create parser: %s", err.Error())
+ }
+
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ if _, err := parser.MakeFields(input); err != nil {
+ b.Fatalf("Unable to parse input: %s", err.Error())
+ }
+ }
+ })
+}
diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go
index edf238f..6417c2f 100644
--- a/internal/mapr/logformat/default_test.go
+++ b/internal/mapr/logformat/default_test.go
@@ -3,6 +3,8 @@ package logformat
import (
"fmt"
"testing"
+
+ "github.com/mimecast/dtail/internal/mapr"
)
func TestDefaultLogFormat(t *testing.T) {
@@ -95,3 +97,36 @@ func TestDefaultLogFormat(t *testing.T) {
t.Errorf("Expected fiending field 'foo', but found it\n")
}
}
+
+func TestDefaultLogFormatQuerySpecificFields(t *testing.T) {
+ q, err := mapr.NewQuery(`select count(foo) from STATS where $hostname eq "testhost"`)
+ if err != nil {
+ t.Fatalf("Unable to create query: %s", err.Error())
+ }
+
+ parser, err := NewParser("default", q)
+ if err != nil {
+ t.Fatalf("Unable to create parser: %s", err.Error())
+ }
+
+ fields, err := parser.MakeFields(
+ "INFO|20211002-072342|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|foo=bar|bar=baz",
+ )
+ if err != nil {
+ t.Fatalf("Parser unable to make fields: %s", err.Error())
+ }
+
+ requiredFields := []string{"foo", "$hostname"}
+ for _, field := range requiredFields {
+ if _, ok := fields[field]; !ok {
+ t.Errorf("Expected query-specific field '%s' to be present", field)
+ }
+ }
+
+ omittedFields := []string{"bar", "$time", "$pid", "$line"}
+ for _, field := range omittedFields {
+ if _, ok := fields[field]; ok {
+ t.Errorf("Expected query-specific field '%s' to be omitted", field)
+ }
+ }
+}
diff --git a/internal/mapr/logformat/delimited.go b/internal/mapr/logformat/delimited.go
new file mode 100644
index 0000000..2fa0639
--- /dev/null
+++ b/internal/mapr/logformat/delimited.go
@@ -0,0 +1,12 @@
+package logformat
+
+import "strings"
+
+func scanDelimitedField(input string, start int, delimiter byte) (token string, next int, done bool) {
+ index := strings.IndexByte(input[start:], delimiter)
+ if index < 0 {
+ return input[start:], len(input), true
+ }
+ end := start + index
+ return input[start:end], end + 1, false
+}
diff --git a/internal/mapr/logformat/generic.go b/internal/mapr/logformat/generic.go
index 1350eff..ecb9e75 100644
--- a/internal/mapr/logformat/generic.go
+++ b/internal/mapr/logformat/generic.go
@@ -15,15 +15,8 @@ func newGenericParser(hostname, timeZoneName string, timeZoneOffset int) (*gener
}
func (p *genericParser) MakeFields(maprLine string) (map[string]string, error) {
- fields := make(map[string]string, 3)
-
- fields["*"] = "*"
- fields["$hostname"] = p.hostname
- fields["$server"] = p.hostname
- fields["$line"] = maprLine
- fields["$empty"] = ""
- fields["$timezone"] = p.timeZoneName
- fields["$timeoffset"] = p.timeZoneOffset
+ fields := make(map[string]string, p.fieldsCapacity)
+ p.addDefaultFields(fields, maprLine)
return fields, nil
}
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
index bd9aad5..b5da8c1 100644
--- a/internal/mapr/logformat/generickv.go
+++ b/internal/mapr/logformat/generickv.go
@@ -1,10 +1,6 @@
package logformat
-import (
- "strings"
-
- "github.com/mimecast/dtail/internal/protocol"
-)
+import "github.com/mimecast/dtail/internal/protocol"
type genericKVParser struct {
defaultParser
@@ -21,24 +17,20 @@ func newGenericKVParser(hostname, timeZoneName string, timeZoneOffset int) (*gen
}
func (p *genericKVParser) MakeFields(maprLine string) (map[string]string, error) {
- splitted := strings.Split(maprLine, protocol.FieldDelimiter)
- fields := make(map[string]string, len(splitted))
-
- fields["*"] = "*"
- fields["$line"] = maprLine
- fields["$empty"] = ""
- fields["$hostname"] = p.hostname
- fields["$server"] = p.hostname
- fields["$timezone"] = p.timeZoneName
- fields["$timeoffset"] = p.timeZoneOffset
-
- for _, kv := range splitted[0:] {
- keyAndValue := strings.SplitN(kv, "=", 2)
- if len(keyAndValue) != 2 {
- //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv)
+ fields := make(map[string]string, p.fieldsCapacity)
+ p.addDefaultFields(fields, maprLine)
+ start := 0
+ delimiter := protocol.FieldDelimiter[0]
+
+ for {
+ token, next, done := scanDelimitedField(maprLine, start, delimiter)
+ if err := p.addKeyValueField(fields, token); err != nil {
continue
}
- fields[keyAndValue[0]] = keyAndValue[1]
+ if done {
+ break
+ }
+ start = next
}
return fields, nil
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index b6ed87d..d7db826 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -20,6 +20,10 @@ type Parser interface {
MakeFields(string) (map[string]string, error)
}
+type queryAwareParser interface {
+ setQuery(*mapr.Query)
+}
+
// ParserFactory builds a Parser for a specific log format.
type ParserFactory func(hostname, timeZoneName string, timeZoneOffset int) (Parser, error)
@@ -86,7 +90,9 @@ func NewParser(logFormatName string, query *mapr.Query) (Parser, error) {
timeZoneName, timeZoneOffset := now.Zone()
if parserFactory, found := getParserFactory(logFormatName); found {
- return parserFactory(hostname, timeZoneName, timeZoneOffset)
+ parser, err := parserFactory(hostname, timeZoneName, timeZoneOffset)
+ configureParserQuery(parser, query)
+ return parser, err
}
defaultFactory, found := getParserFactory("default")
@@ -99,5 +105,17 @@ func NewParser(logFormatName string, query *mapr.Query) (Parser, error) {
return p, fmt.Errorf("No '%s' mapr log format and problem creating default one: %v",
logFormatName, err)
}
+ configureParserQuery(p, query)
return p, fmt.Errorf("No '%s' mapr log format", logFormatName)
}
+
+func configureParserQuery(parser Parser, query *mapr.Query) {
+ if parser == nil {
+ return
+ }
+ queryAware, ok := parser.(queryAwareParser)
+ if !ok {
+ return
+ }
+ queryAware.setQuery(query)
+}