diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-26 11:26:53 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-07 13:31:15 +0000 |
| commit | 0945da8dfefcbb723eecea0e5f4eafff63398253 (patch) | |
| tree | f06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/mapr/client | |
| parent | 2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff) | |
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/mapr/client')
| -rw-r--r-- | internal/mapr/client/aggregate.go | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 3f2b7a5..1272a19 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,10 +1,11 @@ package client import ( - "github.com/mimecast/dtail/internal/logger" - "github.com/mimecast/dtail/internal/mapr" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/mapr" ) // Aggregate mapreduce data on the DTail client side. @@ -15,7 +16,6 @@ type Aggregate struct { group *mapr.GroupSet // This represents the merged aggregated data of all servers. globalGroup *mapr.GlobalGroupSet - stop chan struct{} // The server we aggregate the data for (logging and debugging purposes only) server string } @@ -26,20 +26,12 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou query: query, group: mapr.NewGroupSet(), globalGroup: globalGroup, - stop: make(chan struct{}), server: server, } } // Aggregate data from mapr log line into local (and global) group sets. func (a *Aggregate) Aggregate(parts []string) { - select { - case <-a.stop: - logger.Error("Client aggregator stopped for server, not processing new data", a.server) - return - default: - } - groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { @@ -87,14 +79,3 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { return fields } - -// Stop the client side mapreduce aggregator. -func (a *Aggregate) Stop() { - logger.Debug("Stopping client mapreduce aggregator") - close(a.stop) - - err := a.globalGroup.Merge(a.query, a.group) - if err != nil { - panic(err) - } -} |
