summaryrefslogtreecommitdiff
path: root/internal/clients/maprclient.go
blob: e6cc47130006f08797a070878ea0d1978b367ee2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package clients

import (
	"context"
	"errors"
	"fmt"
	"runtime"
	"strings"
	"time"

	"github.com/mimecast/dtail/internal/clients/handlers"
	"github.com/mimecast/dtail/internal/color"
	"github.com/mimecast/dtail/internal/config"
	"github.com/mimecast/dtail/internal/constants"
	"github.com/mimecast/dtail/internal/io/dlog"
	"github.com/mimecast/dtail/internal/mapr"
	"github.com/mimecast/dtail/internal/omode"
)

// MaprClientMode determines how MapReduce results are accumulated between
// periodic reporting intervals. This affects whether results build up over
// time or reset for each interval.
type MaprClientMode int

const (
	// DefaultMode uses the default behavior based on client mode and output settings.
	// Cumulative for MapClient or when outfile is specified, non-cumulative otherwise.
	DefaultMode MaprClientMode = iota
	
	// CumulativeMode accumulates results across intervals, adding new results
	// to previous totals. Useful for building aggregate statistics over time.
	CumulativeMode MaprClientMode = iota
	
	// NonCumulativeMode resets results for each interval, showing only the
	// data processed during that specific time period.
	NonCumulativeMode MaprClientMode = iota
)

// MaprClient provides distributed MapReduce functionality for log analysis
// and aggregation across multiple servers. It supports SQL-like queries with
// SELECT, FROM, WHERE, GROUP BY, and HAVING clauses for complex log analysis.
//
// Key features:
// - SQL-like query syntax for intuitive log analysis
// - Distributed aggregation with server-side local processing
// - Client-side final aggregation of results from all servers
// - Periodic result reporting with configurable intervals
// - Support for both cumulative and interval-based result modes
// - Output to files or terminal with configurable row limits
//
// MaprClient directly embeds baseClient for core functionality and implements
// specialized command generation and result processing for MapReduce operations.
type MaprClient struct {
	baseClient
	
	// globalGroup manages the merged aggregation results from all servers,
	// performing final client-side aggregation and result formatting
	globalGroup *mapr.GlobalGroupSet
	
	// query contains the parsed SQL-like query structure with all clauses
	// and configuration options extracted from the query string
	query *mapr.Query
	
	// cumulative determines whether results accumulate across intervals
	// (true) or reset for each reporting period (false)
	cumulative bool
	
	// lastResult caches the last formatted result string to avoid
	// duplicate output when results haven't changed
	lastResult string
}

// NewMaprClient creates a new MaprClient configured for distributed MapReduce operations.
// This constructor parses the SQL-like query, validates the configuration, and sets up
// the client for aggregation operations with the specified accumulation mode.
//
// Parameters:
//   args: Complete configuration arguments including servers, query string, and options
//   maprClientMode: How to handle result accumulation between intervals
//
// Returns:
//   *MaprClient: Configured client ready to start MapReduce operations
//   error: Query parsing or configuration error, if any
//
// Configuration process:
// - Validates and parses the SQL-like query string
// - Determines retry behavior based on mode and output settings
// - Sets cumulative mode based on maprClientMode parameter
// - Configures regex pattern based on query table specification
// - Initializes global aggregation state and server connections
//
// The returned client is fully initialized and ready to call Start().
func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) {
	if args.QueryStr == "" {
		return nil, errors.New("No mapreduce query specified, use '-query' flag")
	}

	query, err := mapr.NewQuery(args.QueryStr)
	if err != nil {
		dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err)
	}

	// Don't retry connection if in tail mode and no outfile specified.
	retry := args.Mode == omode.TailClient && !query.HasOutfile()

	var cumulative bool
	switch maprClientMode {
	case CumulativeMode:
		cumulative = true
	case NonCumulativeMode:
		cumulative = false
	default:
		// Result is comulative if we are in MapClient mode or with outfile
		cumulative = args.Mode == omode.MapClient || query.HasOutfile()
	}

	dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)

	c := MaprClient{
		baseClient: baseClient{
			Args:       args,
			throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
			retry:      retry,
		},
		query:      query,
		cumulative: cumulative,
	}

	switch c.query.Table {
	case "", ".":
		c.RegexStr = "."
	case "*":
		c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:\\|")
	default:
		c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
	}

	c.globalGroup = mapr.NewGlobalGroupSet()
	c.baseClient.init()
	c.baseClient.makeConnections(c)

	return &c, nil
}

// Start begins the MapReduce operation by launching periodic result reporting
// and initiating connections to all servers. This method coordinates the entire
// MapReduce lifecycle including query execution, result aggregation, and output.
//
// Parameters:
//   ctx: Context for cancellation and timeout control
//   statsCh: Channel for receiving statistics display requests
//
// Returns:
//   int: Exit status code (0 for success, non-zero for various error conditions)
//
// Operation flow:
// 1. Starts periodic result reporting in a separate goroutine
// 2. Launches base client connections to all servers
// 3. If in cumulative mode, reports final aggregated results
// 4. Returns the highest status code from any server connection
func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
	go c.periodicReportResults(ctx)

	status = c.baseClient.Start(ctx, statsCh)
	if c.cumulative {
		dlog.Client.Debug("Received final mapreduce result")
		c.reportResults()
	}

	return
}

// makeHandler creates a MapReduce-specific handler for processing aggregation
// operations on the specified server. This method implements the maker interface
// requirement and provides the handler used for MapReduce query execution.
//
// Parameters:
//   server: The server hostname/address for this handler
//
// Returns:
//   handlers.Handler: A MaprHandler configured for the specified server and query
//
// The returned handler manages MapReduce protocol communication, query execution,
// and local aggregation on the server side before sending results back to the client.
func (c MaprClient) makeHandler(server string) handlers.Handler {
	return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}

// makeCommands generates the appropriate DTail server commands for MapReduce
// operations. This method implements the maker interface requirement and creates
// commands for distributed query execution across all specified files.
//
// Returns:
//   []string: List of commands to send to DTail servers
//
// Command generation process:
// 1. Creates a "map" command with the raw query string
// 2. Determines the appropriate mode (cat or tail) based on client configuration
// 3. Generates file-specific commands with regex patterns and timeouts
// 4. Includes all necessary options for proper server-side execution
//
// The generated commands follow the DTail protocol format and enable
// distributed MapReduce query execution across all target servers.
func (c MaprClient) makeCommands() (commands []string) {
	commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
	modeStr := "cat"
	if c.Mode == omode.TailClient {
		modeStr = "tail"
	}

	for _, file := range strings.Split(c.What, ",") {
		regex, err := c.Regex.Serialize()
		if err != nil {
			dlog.Client.FatalPanic(err)
		}
		if c.Timeout > 0 {
			commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
				modeStr, file, regex))
			continue
		}
		commands = append(commands, fmt.Sprintf("%s:%s %s %s",
			modeStr, c.Args.SerializeOptions(), file, regex))
	}
	return
}

// periodicReportResults runs in a separate goroutine to provide regular
// result reporting at configured intervals. This method handles the timing
// and coordination of result aggregation and output during long-running
// MapReduce operations.
//
// Parameters:
//   ctx: Context for cancellation control
//
// Operation flow:
// 1. Waits for an initial ramp-up period (half the configured interval)
// 2. Reports results at regular intervals until context cancellation
// 3. Ensures results are available before the first reporting period
//
// This method is essential for providing real-time feedback during
// long-running aggregation operations.
func (c *MaprClient) periodicReportResults(ctx context.Context) {
	rampUpSleep := c.query.Interval / 2
	dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
	time.Sleep(rampUpSleep)

	for {
		select {
		case <-time.After(c.query.Interval):
			dlog.Client.Debug("Gathering interim mapreduce result")
			c.reportResults()
		case <-ctx.Done():
			return
		}
	}
}

// reportResults outputs the current aggregation results either to a file
// or to the terminal, depending on the query configuration. This method
// handles the final result formatting and output routing.
//
// Output routing:
// - If query specifies an output file, writes results to that file
// - Otherwise, formats and prints results to the terminal
//
// This method is called both periodically during operation and once
// at the end for final result output.
func (c *MaprClient) reportResults() {
	if c.query.HasOutfile() {
		c.writeResultsToOutfile()
		return
	}
	c.printResults()
}

// printResults formats and displays aggregation results to the terminal
// with appropriate formatting, coloring, and row limiting. This method
// handles all aspects of terminal output including duplicate detection
// and user-friendly result presentation.
//
// Terminal output features:
// - Colored query display when terminal colors are enabled
// - Automatic row limiting for terminal display (default 10 rows)
// - Duplicate result detection to avoid redundant output
// - Warning messages when results exceed display limits
// - Proper formatting of aggregated data tables
//
// This method is called when no output file is specified in the query.
func (c *MaprClient) printResults() {
	var result string
	var err error
	var numRows int
	rowsLimit := constants.MapReduceUnlimited

	if c.query.Limit == constants.MapReduceUnlimited {
		// Limit output to 10 rows when the result is printed to stdout.
		// This can be overriden with the limit clause though.
		rowsLimit = constants.DefaultMapReduceRowsLimit
	}

	if c.cumulative {
		result, numRows, err = c.globalGroup.Result(c.query, rowsLimit)
	} else {
		result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit)
	}
	if err != nil {
		dlog.Client.FatalPanic(err)
	}

	if result == c.lastResult {
		dlog.Client.Debug("Result hasn't changed compared to last time...")
		return
	}
	c.lastResult = result

	if numRows == 0 {
		dlog.Client.Debug("Empty result set this time...")
		return
	}

	rawQuery := c.query.RawQuery
	if config.Client.TermColorsEnable {
		rawQuery = color.PaintStrWithAttr(rawQuery,
			config.Client.TermColors.MaprTable.RawQueryFg,
			config.Client.TermColors.MaprTable.RawQueryBg,
			config.Client.TermColors.MaprTable.RawQueryAttr)
	}
	dlog.Client.Raw(fmt.Sprintf("%s\n", rawQuery))

	if rowsLimit > 0 && numRows > rowsLimit {
		dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+
			"to %d rows! Use 'limit' clause to override!", numRows, rowsLimit))
	}
	dlog.Client.Raw(fmt.Sprintf("%s\n", result))
}

// writeResultsToOutfile saves aggregation results to the file specified
// in the query configuration. This method handles file output with proper
// accumulation mode handling for persistent result storage.
//
// File output behavior:
// - Cumulative mode: Appends/updates results in the output file
// - Non-cumulative mode: Writes interval-specific results
// - Proper error handling for file operations
//
// This method is called when the query specifies an output file path,
// enabling long-term storage and analysis of aggregation results.
func (c *MaprClient) writeResultsToOutfile() {
	if c.cumulative {
		if err := c.globalGroup.WriteResult(c.query); err != nil {
			dlog.Client.FatalPanic(err)
		}
		return
	}
	if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
		dlog.Client.FatalPanic(err)
	}
}