summaryrefslogtreecommitdiff
path: root/mapr/globalgroupset.go
diff options
context:
space:
mode:
Diffstat (limited to 'mapr/globalgroupset.go')
-rw-r--r--mapr/globalgroupset.go100
1 files changed, 100 insertions, 0 deletions
diff --git a/mapr/globalgroupset.go b/mapr/globalgroupset.go
new file mode 100644
index 0000000..cfab506
--- /dev/null
+++ b/mapr/globalgroupset.go
@@ -0,0 +1,100 @@
+package mapr
+
+import (
+ "fmt"
+)
+
+// GlobalGroupSet is used on the dtail client to merge multiple group sets
+// (one group set per remote server) to one single global group set.
+type GlobalGroupSet struct {
+ GroupSet
+ semaphore chan struct{}
+}
+
+// NewGlobalGroupSet creates a new empty global group set.
+func NewGlobalGroupSet() *GlobalGroupSet {
+ g := GlobalGroupSet{
+ semaphore: make(chan struct{}, 1),
+ }
+ g.InitSet()
+
+ return &g
+}
+
+// String representation of the global group set.
+func (g *GlobalGroupSet) String() string {
+ return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String())
+}
+
+// Merge (blocking) a group set into the global group set.
+func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.merge(query, group)
+}
+
+// MergeNoblock merges (non-blocking) a group set into the global group set.
+func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) {
+ select {
+ case g.semaphore <- struct{}{}:
+ err := g.merge(query, group)
+ <-g.semaphore
+ return true, err
+ default:
+ return false, nil
+ }
+}
+
+// Merge a group set into the global group set.
+func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
+ for groupKey, set := range group.sets {
+ s := g.GetSet(groupKey)
+ if err := s.Merge(query, set); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// IsEmpty determines whether the global group set has any data in it.
+func (g *GlobalGroupSet) IsEmpty() bool {
+ return g.NumSets() == 0
+}
+
+// NumSets determines the number of sets.
+func (g *GlobalGroupSet) NumSets() int {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return len(g.sets)
+}
+
+// SwapOut teturn the underlying group set and create a new empty one, so
+// that the global group set is empty again and can aggregate new data.
+func (g *GlobalGroupSet) SwapOut() *GroupSet {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ set := &GroupSet{sets: g.sets}
+ g.InitSet()
+
+ return set
+}
+
+// WriteResult writes the result of a mapreduce aggregation to an outfile.
+func (g *GlobalGroupSet) WriteResult(query *Query) error {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.GroupSet.WriteResult(query)
+}
+
+// Result returns the result of the mapreduce aggregation as a string.
+func (g *GlobalGroupSet) Result(query *Query) (string, int, error) {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.GroupSet.Result(query)
+}