1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package handlers
import (
"context"
"strings"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
aggregate *server.Aggregate
turboAggregate *server.TurboAggregate
server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
// If turbo mode is enabled AND we're in server mode (not serverless), create a TurboAggregate
// Turbo mode is a server-side optimization and should not be used in serverless mode
dlog.Server.Debug("MapReduce mode check", "turboModeEnable", config.Server.TurboModeEnable, "serverless", serverHandler.serverless)
if config.Server.TurboModeEnable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
turboAggregate, err := server.NewTurboAggregate(queryStr)
if err != nil {
return m, nil, nil, err
}
m.turboAggregate = turboAggregate
return m, nil, turboAggregate, nil
}
// Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
return m, nil, nil, err
}
m.aggregate = aggregate
return m, aggregate, nil, nil
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
if m.turboAggregate != nil {
m.turboAggregate.Start(ctx, aggregatedMessages)
} else {
m.aggregate.Start(ctx, aggregatedMessages)
}
}
|