From 0945da8dfefcbb723eecea0e5f4eafff63398253 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20B=C3=BCtow?= Date: Sun, 26 Jan 2020 11:26:53 +0000 Subject: Introduce drun command, refactor code to use context package --- internal/mapr/aggregateset.go | 5 +- internal/mapr/client/aggregate.go | 25 +------ internal/mapr/groupset.go | 5 +- internal/mapr/logformat/parser.go | 2 +- internal/mapr/query.go | 2 +- internal/mapr/server/aggregate.go | 141 ++++++++++++++++++++++---------------- internal/mapr/wherecondition.go | 2 +- 7 files changed, 95 insertions(+), 87 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 2096c3c..7fb4c17 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -1,6 +1,7 @@ package mapr import ( + "context" "fmt" "strconv" "strings" @@ -64,7 +65,7 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { } // Serialize the aggregate set so it can be sent over the wire. -func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) { +func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { //logger.Trace("Serialising mapr.AggregateSet", s) var sb strings.Builder @@ -87,7 +88,7 @@ func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan st select { case ch <- sb.String(): - case <-stop: + case <-ctx.Done(): } } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 3f2b7a5..1272a19 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,10 +1,11 @@ package client import ( - "github.com/mimecast/dtail/internal/logger" - "github.com/mimecast/dtail/internal/mapr" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/mapr" ) // Aggregate mapreduce data on the DTail client side. @@ -15,7 +16,6 @@ type Aggregate struct { group *mapr.GroupSet // This represents the merged aggregated data of all servers. globalGroup *mapr.GlobalGroupSet - stop chan struct{} // The server we aggregate the data for (logging and debugging purposes only) server string } @@ -26,20 +26,12 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou query: query, group: mapr.NewGroupSet(), globalGroup: globalGroup, - stop: make(chan struct{}), server: server, } } // Aggregate data from mapr log line into local (and global) group sets. func (a *Aggregate) Aggregate(parts []string) { - select { - case <-a.stop: - logger.Error("Client aggregator stopped for server, not processing new data", a.server) - return - default: - } - groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { @@ -87,14 +79,3 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { return fields } - -// Stop the client side mapreduce aggregator. -func (a *Aggregate) Stop() { - logger.Debug("Stopping client mapreduce aggregator") - close(a.stop) - - err := a.globalGroup.Merge(a.query, a.group) - if err != nil { - panic(err) - } -} diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index d8f9379..e9e0d37 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -1,6 +1,7 @@ package mapr import ( + "context" "errors" "fmt" "io/ioutil" @@ -46,9 +47,9 @@ func (g *GroupSet) GetSet(groupKey string) *AggregateSet { } // Serialize the group set (e.g. to send it over the wire). -func (g *GroupSet) Serialize(ch chan<- string, stop chan struct{}) { +func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) { for groupKey, set := range g.sets { - set.Serialize(groupKey, ch, stop) + set.Serialize(ctx, groupKey, ch) } } diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index 5730d29..09c706b 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -1,9 +1,9 @@ package logformat import ( - "github.com/mimecast/dtail/internal/logger" "errors" "fmt" + "github.com/mimecast/dtail/internal/io/logger" "os" "reflect" "strings" diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 3805d15..0127be3 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -1,9 +1,9 @@ package mapr import ( - "github.com/mimecast/dtail/internal/logger" "errors" "fmt" + "github.com/mimecast/dtail/internal/io/logger" "strconv" "strings" "time" diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 900756e..922dcbd 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -1,26 +1,28 @@ package server import ( - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/fs" - "github.com/mimecast/dtail/internal/logger" - "github.com/mimecast/dtail/internal/mapr" - "github.com/mimecast/dtail/internal/mapr/logformat" + "context" "os" "strings" "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/mapr/logformat" ) // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { // Log lines to process (parsing MAPREDUCE lines). - Lines chan fs.LineRead + Lines chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string - // Signals to exit goroutine. - stop chan struct{} // Signals to serialize data. serialize chan struct{} + // Signals to flush data. + flush chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -28,7 +30,7 @@ type Aggregate struct { } // NewAggregate return a new server side aggregator. -func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) { +func NewAggregate(queryStr string) (*Aggregate, error) { query, err := mapr.NewQuery(queryStr) if err != nil { return nil, err @@ -47,76 +49,98 @@ func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) } a := Aggregate{ - Lines: make(chan fs.LineRead, 100), - stop: make(chan struct{}), + Lines: make(chan line.Line, 100), serialize: make(chan struct{}), + flush: make(chan struct{}), hostname: s[0], query: query, parser: logParser, } - go a.periodicAggregateTimer() - - fieldsCh := make(chan map[string]string) - go a.readFields(fieldsCh, maprLines) - go a.readLines(fieldsCh) - return &a, nil } -func (a *Aggregate) periodicAggregateTimer() { +// Start an aggregation run. +func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { + fieldsCh := a.linesToFields(ctx) + go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) + a.periodicAggregateTimer(ctx) +} + +func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { for { select { case <-time.After(a.query.Interval): - a.Serialize() - case <-a.stop: + a.Serialize(ctx) + case <-ctx.Done(): return } } } -func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) - for { - select { - case fields := <-fieldsCh: - a.aggregate(group, fields) - case <-a.serialize: - logger.Info("Serializing mapreduce result") - group.Serialize(maprLines, a.stop) - logger.Info("Done serializing mapreduce result") - group = mapr.NewGroupSet() - case <-a.stop: - return + go func() { + defer close(fieldsCh) + + for { + select { + case line, ok := <-a.Lines: + if !ok { + return + } + + maprLine := strings.TrimSpace(string(line.Content)) + fields, err := a.parser.MakeFields(maprLine) + + if err != nil { + logger.Error(err) + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } } - } + }() + + return fieldsCh } -func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) { +func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { + group := mapr.NewGroupSet() + for { select { - case line, ok := <-a.Lines: + case fields, ok := <-fieldsCh: if !ok { + logger.Info("Serializing mapreduce result (final)") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + logger.Info("Done serializing mapreduce result (final)") return } - - maprLine := strings.TrimSpace(string(line.Content)) - fields, err := a.parser.MakeFields(maprLine) - - if err != nil { - logger.Error(err) - continue - } - if !a.query.WhereClause(fields) { - continue - } - - select { - case fieldsCh <- fields: - case <-a.stop: - } - case <-a.stop: + a.aggregate(group, fields) + case <-a.serialize: + logger.Info("Serializing mapreduce result") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + logger.Info("Done serializing mapreduce result") + case <-a.flush: + logger.Info("Flushing mapreduce result") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + a.flush <- struct{}{} + logger.Info("Done flushing mapreduce result") + case <-ctx.Done(): return } } @@ -157,14 +181,15 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { } // Serialize all the aggregated data. -func (a *Aggregate) Serialize() { +func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: - case <-a.stop: + case <-ctx.Done(): } } -// Close the aggregator. -func (a *Aggregate) Close() { - close(a.stop) +// Flush all data. +func (a *Aggregate) Flush() { + a.flush <- struct{}{} + <-a.flush } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index e1f4e5b..ab46bed 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -1,9 +1,9 @@ package mapr import ( - "github.com/mimecast/dtail/internal/logger" "errors" "fmt" + "github.com/mimecast/dtail/internal/io/logger" "strconv" "strings" ) -- cgit v1.2.3