diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/connectors/serverless.go | 1 | ||||
| -rw-r--r-- | internal/config/client.go | 3 | ||||
| -rw-r--r-- | internal/config/env.go | 20 | ||||
| -rw-r--r-- | internal/config/initializer.go | 8 | ||||
| -rw-r--r-- | internal/io/dlog/dlog.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/catfile.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 26 | ||||
| -rw-r--r-- | internal/io/fs/tailfile.go | 5 | ||||
| -rw-r--r-- | internal/mapr/logformat/parser.go | 4 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 96 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 39 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 4 | ||||
| -rw-r--r-- | internal/ssh/client/authmethods.go | 21 | ||||
| -rw-r--r-- | internal/ssh/client/knownhostscallback.go | 6 |
16 files changed, 141 insertions, 104 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 2ff490a..431247a 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -47,7 +47,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() - if err := s.handle(ctx, cancel); err != nil { dlog.Client.Warn(err) } diff --git a/internal/config/client.go b/internal/config/client.go index 86f97f0..9f4df97 100644 --- a/internal/config/client.go +++ b/internal/config/client.go @@ -104,9 +104,6 @@ type termColors struct { type ClientConfig struct { TermColorsEnable bool `json:",omitempty"` TermColors termColors `json:",omitempty"` - // When unit testing in Jenkins you don't want to touch files in ~jenkins - // during integration tests really. - SSHDontAddHostsToKnownHostsFile bool `json:",omitempty"` } // Create a new default client configuration. diff --git a/internal/config/env.go b/internal/config/env.go index 88b831d..804a10a 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -2,6 +2,26 @@ package config import "os" +// Env returns true when a given environment variable is set to "yes". func Env(env string) bool { return "yes" == os.Getenv(env) } + +// Hostname returns the current hostname. It can be overriden with +// DTAIL_HOSTNAME_OVERRIDE environment variable (useful for integration tests). +func Hostname() (string, error) { + hostname := os.Getenv("DTAIL_HOSTNAME_OVERRIDE") + if len(hostname) > 0 { + return hostname, nil + } + return os.Hostname() +} + +// SSHKnownHostsFile returns the known hosts file path (useful for integration tests) +func SSHKnownHostsFile() string { + if len(os.Getenv("DTAIL_SSH_KNOWN_HOSTS_FILE")) > 0 { + return os.Getenv("DTAIL_SSH_KNOWN_HOSTS_FILE") + } else { + return os.Getenv("HOME") + "/.ssh/known_hosts" + } +} diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 4d6a73b..024464e 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -21,7 +21,7 @@ type initializer struct { type transformCb func(*initializer, *Args, []string) error func (in *initializer) parseConfig(args *Args) error { - if strings.ToUpper(args.ConfigFile) == "NONE" { + if strings.ToLower(args.ConfigFile) == "none" { return nil } @@ -82,9 +82,9 @@ func (in *initializer) transformConfig(sourceProcess source.Source, args *Args, // There are some special options which can be set by environment variable. func (in *initializer) readEnvironmentVars() { - if Env("DTAIL_SSH_DONT_ADD_HOSTS_TO_KNOWNHOSTS_FILE") || - Env("DTAIL_JENKINS") { - in.Client.SSHDontAddHostsToKnownHostsFile = true + if Env("DTAIL_RUN_INTEGRATION_TESTS") { + os.Setenv("DTAIL_HOSTNAME_OVERRIDE", "integrationtest") + os.Setenv("DTAIL_SSH_KNOWN_HOSTS_FILE", "./known_hosts") } } diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 5e0c3a1..ff2cef4 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -78,7 +78,7 @@ type DLog struct { // new creates a new DTail logger. func new(sourceProcess, sourcePackage source.Source) *DLog { - hostname, err := os.Hostname() + hostname, err := config.Hostname() if err != nil { panic(err) } diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index 01c15ba..e4676f3 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,9 +6,7 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, - limiter chan struct{}) CatFile { - +func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile { return CatFile{ readFile: readFile{ filePath: filePath, @@ -17,7 +15,6 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string, retry: false, canSkipLines: false, seekEOF: false, - limiter: limiter, }, } } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 28cbe58..5815aa3 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -38,7 +38,6 @@ type readFile struct { canSkipLines bool // Seek to the EOF before processing file? seekEOF bool - limiter chan struct{} } // String returns the string representation of the readFile @@ -66,25 +65,7 @@ func (f readFile) Retry() bool { func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { - dlog.Common.Debug("readFile", f) - defer func() { - select { - case <-f.limiter: - default: - } - }() - - select { - case f.limiter <- struct{}{}: - default: - select { - case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, - "Server limit reached. Queuing file..."): - case <-ctx.Done(): - return nil - } - f.limiter <- struct{}{} - } + dlog.Common.Trace("readFile", f) fd, err := os.Open(f.filePath) if err != nil { @@ -156,7 +137,9 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { return } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, + truncate <-chan struct{}) error { + var offset uint64 reader, err := f.makeReader(fd) if err != nil { @@ -250,6 +233,7 @@ func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *by return } if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + //dlog.Common.Trace("TODO", "lines", lines, len(lines), cap(lines)) select { case lines <- filteredLine: case <-ctx.Done(): diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index b03b45d..7a40ac4 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,9 +6,7 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, - limiter chan struct{}) TailFile { - +func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile { return TailFile{ readFile: readFile{ filePath: filePath, @@ -17,7 +15,6 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string, retry: true, canSkipLines: true, seekEOF: true, - limiter: limiter, }, } } diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index 129081d..d6aac8c 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -3,11 +3,11 @@ package logformat import ( "errors" "fmt" - "os" "reflect" "strings" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/mapr" ) @@ -26,7 +26,7 @@ type Parser struct { // NewParser returns a new log parser. func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { - hostname, err := os.Hostname() + hostname, err := config.Hostname() if err != nil { return nil, err } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 97fee11..4162828 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -2,7 +2,6 @@ package server import ( "context" - "os" "strings" "time" @@ -20,6 +19,7 @@ type Aggregate struct { done *internal.Done // NextLinesCh can be used to use a new line ch. NextLinesCh chan chan line.Line + linesCh chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. @@ -37,7 +37,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return nil, err } - fqdn, err := os.Hostname() + fqdn, err := config.Hostname() if err != nil { dlog.Common.Error(err) } @@ -113,58 +113,84 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } +func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { + + dlog.Common.Trace("nextLine", "entry", line, ok, noMoreChannels) + select { + case line, ok = <-a.linesCh: + if !ok { + // Channel is closed, go to next channel. + select { + case a.linesCh = <-a.NextLinesCh: + default: + noMoreChannels = true + } + } + default: + // No new line from current lines channel. Try next one. + select { + case newLinesCh := <-a.NextLinesCh: + oldLinesCh := a.linesCh + go func() { a.NextLinesCh <- oldLinesCh }() + a.linesCh = newLinesCh + default: + // No new lines channel found. + } + } + dlog.Common.Trace("nextLine", "exit", line, ok, noMoreChannels) + + return +} + func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { fieldsCh := make(chan map[string]string) go func() { defer close(fieldsCh) - var lines chan line.Line // Gather first lines channel (first input file) select { - case lines = <-a.NextLinesCh: + case a.linesCh = <-a.NextLinesCh: case <-ctx.Done(): return } for { select { - case line, ok := <-lines: - if !ok { - select { - case lines = <-a.NextLinesCh: - // Have a new lines channel (e.g. new input file) - case <-ctx.Done(): - default: - // No new lines channel found. - return - } - } + case <-ctx.Done(): + return + default: + } - maprLine := strings.TrimSpace(line.Content.String()) - fields, err := a.parser.MakeFields(maprLine) - // Can't recycle it here yet, as field slices are still - // TODO: Add unit test reading from multiple mapreduce files lines. - // TODO: Add capability to recycle this bytes buffer. - //pool.RecycleBytesBuffer(line.Content) - - if err != nil { - // Should fields be ignored anyway? - if err != logformat.ErrIgnoreFields { - dlog.Common.Error(fields, err) - } - continue - } - if !a.query.WhereClause(fields) { - continue + // Gather first lines channel (first input file) + line, ok, noMoreChannels := a.nextLine() + if !ok { + if noMoreChannels { + break } + time.Sleep(time.Millisecond * 100) + } + + maprLine := strings.TrimSpace(line.Content.String()) + fields, err := a.parser.MakeFields(maprLine) + // Can't recycle it here yet, as field slices are still + // MAYBETODO: Add capability to recycle this bytes buffer. + //pool.RecycleBytesBuffer(line.Content) - select { - case fieldsCh <- fields: - case <-ctx.Done(): + if err != nil { + // Should fields be ignored anyway? + if err != logformat.ErrIgnoreFields { + dlog.Common.Error(fields, err) } + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: case <-ctx.Done(): - return } } }() diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 6d10d17..53bf375 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -114,7 +114,6 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { pool.RecycleBytesBuffer(line.Content) case <-time.After(time.Second): - // Once in a while check whether we are done. select { case <-h.done.Done(): err = io.EOF diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index 6dd9872..e7f7762 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -2,10 +2,10 @@ package handlers import ( "context" - "os" "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" @@ -32,7 +32,7 @@ func NewHealthHandler(user *user.User) *HealthHandler { } h.handleCommandCb = h.handleHealthCommand - fqdn, err := os.Hostname() + fqdn, err := config.Hostname() if err != nil { dlog.Server.FatalPanic(err) } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 4728a55..51077fc 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -109,18 +109,51 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC r.readFile(ctx, ltx, path, globID, re) } +func (*readCommand) limit(ctx context.Context, limiter chan struct{}, message string) { + select { + case <-ctx.Done(): + return + } +} + func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { dlog.Server.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader + var limiter chan struct{} + switch r.mode { case omode.TailClient: - reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter) + reader = fs.NewTailFile(path, globID, r.server.serverMessages) + limiter = r.server.tailLimiter case omode.GrepClient, omode.CatClient: - reader = fs.NewCatFile(path, globID, r.server.serverMessages, r.server.catLimiter) + reader = fs.NewCatFile(path, globID, r.server.serverMessages) + limiter = r.server.catLimiter default: - reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter) + reader = fs.NewTailFile(path, globID, r.server.serverMessages) + limiter = r.server.tailLimiter + } + + defer func() { + select { + case <-limiter: + default: + } + }() + + select { + case limiter <- struct{}{}: + case <-ctx.Done(): + return + default: + dlog.Server.Info("Server limit hit, queueing file", len(limiter), path) + select { + case limiter <- struct{}{}: + dlog.Server.Info("Server limit OK now, processing file", len(limiter), path) + case <-ctx.Done(): + return + } } lines := r.server.lines diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 36574a9..75a8acc 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -2,10 +2,10 @@ package handlers import ( "context" - "os" "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" @@ -43,7 +43,7 @@ func NewServerHandler(user *user.User, catLimiter, } h.handleCommandCb = h.handleUserCommand - fqdn, err := os.Hostname() + fqdn, err := config.Hostname() if err != nil { dlog.Server.FatalPanic(err) } diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go index 37f8382..2ee32ad 100644 --- a/internal/ssh/client/authmethods.go +++ b/internal/ssh/client/authmethods.go @@ -29,19 +29,14 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) { var sshAuthMethods []gossh.AuthMethod - knownHostsPath := os.Getenv("HOME") + "/.ssh/known_hosts" - knownHostsCallback, err := NewKnownHostsCallback(knownHostsPath, trustAllHosts, + knownHostsFile := config.SSHKnownHostsFile() + knownHostsCallback, err := NewKnownHostsCallback(knownHostsFile, trustAllHosts, throttleCh) if err != nil { - dlog.Client.FatalPanic(knownHostsPath, err) + dlog.Client.FatalPanic(knownHostsFile, err) } - dlog.Client.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsPath) - /* - if config.Client.ExperimentalFeaturesEnable { - sshAuthMethods = append(sshAuthMethods, gossh.Password("experimental feature test")) - dlog.Client.Debug("initKnownHostsAuthMethods", "Added experimental method to list of auth methods") - } - */ + + dlog.Client.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsFile) // First try to read custom private key path. if privateKeyPath != "" { @@ -100,11 +95,7 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, dlog.Client.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err) - // This is only a panic when we expect to do something about it. - if !config.Client.SSHDontAddHostsToKnownHostsFile { - dlog.Client.FatalPanic("Unable to find private SSH key information") - } - + dlog.Client.FatalPanic("Unable to find private SSH key information") // Never reach this point. return sshAuthMethods, knownHostsCallback } diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go index 2aa0168..dd58925 100644 --- a/internal/ssh/client/knownhostscallback.go +++ b/internal/ssh/client/knownhostscallback.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/prompt" @@ -216,11 +215,6 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) { func (c KnownHostsCallback) trustHosts(hosts []unknownHost) { tmpKnownHostsPath := fmt.Sprintf("%s.tmp", c.knownHostsPath) - if config.Client.SSHDontAddHostsToKnownHostsFile { - dlog.Common.Verbose("Not adding hosts to known hosts file, as disabled by config") - return - } - newFd, err := os.OpenFile(tmpKnownHostsPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) if err != nil { panic(fmt.Sprintf("%s: %s", tmpKnownHostsPath, err.Error())) |
