summaryrefslogtreecommitdiff
path: root/internal/mapr/client
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/mapr/client
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/mapr/client')
-rw-r--r--internal/mapr/client/aggregate.go25
1 files changed, 3 insertions, 22 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 3f2b7a5..1272a19 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,10 +1,11 @@
package client
import (
- "github.com/mimecast/dtail/internal/logger"
- "github.com/mimecast/dtail/internal/mapr"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/mapr"
)
// Aggregate mapreduce data on the DTail client side.
@@ -15,7 +16,6 @@ type Aggregate struct {
group *mapr.GroupSet
// This represents the merged aggregated data of all servers.
globalGroup *mapr.GlobalGroupSet
- stop chan struct{}
// The server we aggregate the data for (logging and debugging purposes only)
server string
}
@@ -26,20 +26,12 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou
query: query,
group: mapr.NewGroupSet(),
globalGroup: globalGroup,
- stop: make(chan struct{}),
server: server,
}
}
// Aggregate data from mapr log line into local (and global) group sets.
func (a *Aggregate) Aggregate(parts []string) {
- select {
- case <-a.stop:
- logger.Error("Client aggregator stopped for server, not processing new data", a.server)
- return
- default:
- }
-
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
@@ -87,14 +79,3 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
return fields
}
-
-// Stop the client side mapreduce aggregator.
-func (a *Aggregate) Stop() {
- logger.Debug("Stopping client mapreduce aggregator")
- close(a.stop)
-
- err := a.globalGroup.Merge(a.query, a.group)
- if err != nil {
- panic(err)
- }
-}