1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
package handlers
import (
"bytes"
"context"
"strings"
"testing"
"time"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/line"
userserver "github.com/mimecast/dtail/internal/user/server"
)
func TestDecodeGeneratedMessage(t *testing.T) {
generation, message := decodeGeneratedMessage(encodeGeneratedMessage(7, "hello"))
if generation != 7 {
t.Fatalf("unexpected generation: %d", generation)
}
if message != "hello" {
t.Fatalf("unexpected message: %q", message)
}
}
func TestBaseHandlerReadDropsStaleServerMessage(t *testing.T) {
handler := newGenerationTestHandler(2)
handler.serverMessages <- encodeGeneratedMessage(1, "stale\n")
handler.serverMessages <- encodeGeneratedMessage(2, "fresh\n")
got := readHandlerOutput(t, &handler)
if strings.Contains(got, "stale") {
t.Fatalf("unexpected stale output: %q", got)
}
if !strings.Contains(got, "fresh") {
t.Fatalf("expected current output, got %q", got)
}
}
func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) {
handler := newGenerationTestHandler(3)
handler.maprMessages <- encodeGeneratedMessage(2, "old aggregate")
handler.maprMessages <- encodeGeneratedMessage(3, "new aggregate")
got := readHandlerOutput(t, &handler)
if strings.Contains(got, "old aggregate") {
t.Fatalf("unexpected stale aggregate output: %q", got)
}
if !strings.Contains(got, "new aggregate") {
t.Fatalf("expected current aggregate output, got %q", got)
}
}
func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) {
handler := &ServerHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
maprMessages: make(chan string),
},
}
generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7)
generated <- "final aggregate"
closed := make(chan struct{})
go func() {
closeGenerated()
close(closed)
}()
select {
case <-closed:
t.Fatal("closeGenerated returned before mapreduce payload was forwarded")
case <-time.After(20 * time.Millisecond):
}
select {
case message := <-handler.maprMessages:
generation, payload := decodeGeneratedMessage(message)
if generation != 7 {
t.Fatalf("unexpected generation: %d", generation)
}
if payload != "final aggregate" {
t.Fatalf("unexpected payload: %q", payload)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for forwarded mapreduce payload")
}
select {
case <-closed:
case <-time.After(time.Second):
t.Fatal("timed out waiting for closeGenerated to finish")
}
}
func TestBaseHandlerReadDropsStaleLine(t *testing.T) {
handler := newGenerationTestHandler(4)
staleLine := line.New(bytes.NewBufferString("stale line"), 1, 100, "app.log")
staleLine.Generation = 3
currentLine := line.New(bytes.NewBufferString("fresh line"), 2, 100, "app.log")
currentLine.Generation = 4
handler.lines <- staleLine
handler.lines <- currentLine
got := readHandlerOutput(t, &handler)
if strings.Contains(got, "stale line") {
t.Fatalf("unexpected stale line output: %q", got)
}
if !strings.Contains(got, "fresh line") {
t.Fatalf("expected current line output, got %q", got)
}
}
func TestTurboManagerTryReadDropsStaleGeneration(t *testing.T) {
resetServerLogger(t)
manager := turboManager{
mode: true,
lines: make(chan []byte, 2),
}
manager.lines <- encodeGeneratedBytes(1, []byte("stale"))
manager.lines <- encodeGeneratedBytes(2, []byte("fresh"))
buf := make([]byte, 32)
n, handled := manager.tryRead(buf, &userserver.User{Name: "turbo-test"}, func(generation uint64) bool {
return generation != 0 && generation != 2
})
if !handled {
t.Fatalf("expected turbo read to be handled")
}
if got := string(buf[:n]); got != "fresh" {
t.Fatalf("unexpected turbo output: %q", got)
}
}
func newGenerationTestHandler(activeGeneration uint64) baseHandler {
return baseHandler{
done: internal.NewDone(),
lines: make(chan *line.Line, 2),
serverMessages: make(chan string, 2),
maprMessages: make(chan string, 2),
hostname: "testhost",
activeGeneration: func() uint64 {
return activeGeneration
},
}
}
func readHandlerOutput(t *testing.T, handler *baseHandler) string {
t.Helper()
buf := make([]byte, 256)
n, err := handler.Read(buf)
if err != nil {
t.Fatalf("Read() error = %v", err)
}
return string(buf[:n])
}
|