blob: 10372da8f16c8696f8664248d968cc4e2242dde4 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package handlers
import (
"context"
"strings"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
aggregate *server.Aggregate
server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int, args []string) (mapCommand, *server.Aggregate, error) {
mapCommand := mapCommand{
server: serverHandler,
}
queryStr := strings.Join(args[1:], " ")
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
return mapCommand, nil, err
}
mapCommand.aggregate = aggregate
return mapCommand, aggregate, nil
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
m.aggregate.Start(ctx, aggregatedMessages)
}
|