1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package clients
import (
"dtail/clients/remote"
"dtail/discovery"
"dtail/logger"
"dtail/omode"
"dtail/ssh/client"
"regexp"
"sync"
"time"
gossh "golang.org/x/crypto/ssh"
)
// This is the main client data structure.
type baseClient struct {
Args
// To display client side stats
stats *stats
// List of remote servers to connect to.
servers []string
// We have one connection per remote server.
connections []*remote.Connection
// SSH auth methods to use to connect to the remote servers.
sshAuthMethods []gossh.AuthMethod
// To deal with SSH host keys
hostKeyCallback *client.HostKeyCallback
// To stop the client.
stop chan struct{}
// To indicate that the client has stopped.
stopped chan struct{}
// Throttle how fast we initiate SSH connections concurrently
throttleCh chan struct{}
// Retry connection upon failure?
retry bool
// Connection helper.
maker connectionMaker
}
func (c *baseClient) init(maker connectionMaker) {
logger.Info("Initiating base client")
c.maker = maker
//c.connections = make(map[string]*remote.Connection)
c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.TrustAllHosts, c.throttleCh)
// Retrieve a shuffled list of remote dtail servers.
shuffleServers := true
discoveryService := discovery.New(c.Discovery, c.ServersStr, shuffleServers)
for _, server := range discoveryService.ServerList() {
c.connections = append(c.connections, c.maker.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback))
}
if _, err := regexp.Compile(c.Regex); err != nil {
logger.FatalExit(c.Regex, "Can't test compile regex", err)
}
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
go c.hostKeyCallback.PromptAddHosts(c.stop)
// Periodically print out connection stats to the client.
c.stats = newTailStats(len(c.connections))
go c.stats.periodicLogStats(c.throttleCh, c.stop)
}
func (c *baseClient) Start(wg *sync.WaitGroup) (status int) {
if wg != nil {
defer wg.Done()
}
active := make(chan struct{}, len(c.connections))
var wg2 sync.WaitGroup
wg2.Add(len(c.connections))
for i, conn := range c.connections {
go func(i int, conn *remote.Connection) {
active <- struct{}{}
defer func() {
logger.Debug(conn.Server, "Disconnected completely...")
<-active
}()
wg2.Done()
for {
conn.Start(c.throttleCh, c.stats.connectionsEstCh)
if !c.retry {
return
}
time.Sleep(time.Second * 2)
logger.Debug(conn.Server, "Reconencting")
conn = c.maker.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback)
c.connections[i] = conn
}
}(i, conn)
}
wg2.Wait()
c.waitUntilDone(active)
return
}
func (c *baseClient) waitUntilDone(active chan struct{}) {
defer close(c.stopped)
if c.Mode != omode.TailClient {
c.waitUntilZero(active)
logger.Info("All connections stopped")
return
}
<-c.stop
logger.Info("Stopping client")
for _, conn := range c.connections {
conn.Stop()
}
c.waitUntilZero(active)
}
func (c *baseClient) waitUntilZero(active chan struct{}) {
for {
logger.Debug("Active connections", len(active))
if len(active) == 0 {
return
}
time.Sleep(time.Second)
}
}
func (c *baseClient) Stop() {
close(c.stop)
<-c.WaitC()
}
func (c *baseClient) WaitC() <-chan struct{} {
return c.stopped
}
|