summaryrefslogtreecommitdiff
path: root/internal/server/handlers/mapcommand.go
blob: c3e600e299c1315518b42718c2e93ed56561757f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package handlers

import (
	"context"
	"strings"

	"github.com/mimecast/dtail/internal/mapr/server"
)

// Map command implements the mapreduce command server side.
type mapCommand struct {
	aggregate *server.Aggregate
	server    *ServerHandler
}

// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int, args []string) (mapCommand, *server.Aggregate, error) {
	m := mapCommand{server: serverHandler}

	queryStr := strings.Join(args[1:], " ")
	aggregate, err := server.NewAggregate(queryStr)
	if err != nil {
		return m, nil, err
	}

	m.aggregate = aggregate
	return m, aggregate, nil

}

func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
	m.aggregate.Start(ctx, aggregatedMessages)
}