summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/connectors/serverless.go1
-rw-r--r--internal/config/client.go3
-rw-r--r--internal/config/env.go20
-rw-r--r--internal/config/initializer.go8
-rw-r--r--internal/io/dlog/dlog.go2
-rw-r--r--internal/io/fs/catfile.go5
-rw-r--r--internal/io/fs/readfile.go26
-rw-r--r--internal/io/fs/tailfile.go5
-rw-r--r--internal/mapr/logformat/parser.go4
-rw-r--r--internal/mapr/server/aggregate.go96
-rw-r--r--internal/server/handlers/basehandler.go1
-rw-r--r--internal/server/handlers/healthhandler.go4
-rw-r--r--internal/server/handlers/readcommand.go39
-rw-r--r--internal/server/handlers/serverhandler.go4
-rw-r--r--internal/ssh/client/authmethods.go21
-rw-r--r--internal/ssh/client/knownhostscallback.go6
16 files changed, 141 insertions, 104 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 2ff490a..431247a 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -47,7 +47,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc,
dlog.Client.Debug("Starting serverless connector")
go func() {
defer cancel()
-
if err := s.handle(ctx, cancel); err != nil {
dlog.Client.Warn(err)
}
diff --git a/internal/config/client.go b/internal/config/client.go
index 86f97f0..9f4df97 100644
--- a/internal/config/client.go
+++ b/internal/config/client.go
@@ -104,9 +104,6 @@ type termColors struct {
type ClientConfig struct {
TermColorsEnable bool `json:",omitempty"`
TermColors termColors `json:",omitempty"`
- // When unit testing in Jenkins you don't want to touch files in ~jenkins
- // during integration tests really.
- SSHDontAddHostsToKnownHostsFile bool `json:",omitempty"`
}
// Create a new default client configuration.
diff --git a/internal/config/env.go b/internal/config/env.go
index 88b831d..804a10a 100644
--- a/internal/config/env.go
+++ b/internal/config/env.go
@@ -2,6 +2,26 @@ package config
import "os"
+// Env returns true when a given environment variable is set to "yes".
func Env(env string) bool {
return "yes" == os.Getenv(env)
}
+
+// Hostname returns the current hostname. It can be overriden with
+// DTAIL_HOSTNAME_OVERRIDE environment variable (useful for integration tests).
+func Hostname() (string, error) {
+ hostname := os.Getenv("DTAIL_HOSTNAME_OVERRIDE")
+ if len(hostname) > 0 {
+ return hostname, nil
+ }
+ return os.Hostname()
+}
+
+// SSHKnownHostsFile returns the known hosts file path (useful for integration tests)
+func SSHKnownHostsFile() string {
+ if len(os.Getenv("DTAIL_SSH_KNOWN_HOSTS_FILE")) > 0 {
+ return os.Getenv("DTAIL_SSH_KNOWN_HOSTS_FILE")
+ } else {
+ return os.Getenv("HOME") + "/.ssh/known_hosts"
+ }
+}
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index 4d6a73b..024464e 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -21,7 +21,7 @@ type initializer struct {
type transformCb func(*initializer, *Args, []string) error
func (in *initializer) parseConfig(args *Args) error {
- if strings.ToUpper(args.ConfigFile) == "NONE" {
+ if strings.ToLower(args.ConfigFile) == "none" {
return nil
}
@@ -82,9 +82,9 @@ func (in *initializer) transformConfig(sourceProcess source.Source, args *Args,
// There are some special options which can be set by environment variable.
func (in *initializer) readEnvironmentVars() {
- if Env("DTAIL_SSH_DONT_ADD_HOSTS_TO_KNOWNHOSTS_FILE") ||
- Env("DTAIL_JENKINS") {
- in.Client.SSHDontAddHostsToKnownHostsFile = true
+ if Env("DTAIL_RUN_INTEGRATION_TESTS") {
+ os.Setenv("DTAIL_HOSTNAME_OVERRIDE", "integrationtest")
+ os.Setenv("DTAIL_SSH_KNOWN_HOSTS_FILE", "./known_hosts")
}
}
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index 5e0c3a1..ff2cef4 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -78,7 +78,7 @@ type DLog struct {
// new creates a new DTail logger.
func new(sourceProcess, sourcePackage source.Source) *DLog {
- hostname, err := os.Hostname()
+ hostname, err := config.Hostname()
if err != nil {
panic(err)
}
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index 01c15ba..e4676f3 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,9 +6,7 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string,
- limiter chan struct{}) CatFile {
-
+func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile {
return CatFile{
readFile: readFile{
filePath: filePath,
@@ -17,7 +15,6 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string,
retry: false,
canSkipLines: false,
seekEOF: false,
- limiter: limiter,
},
}
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 28cbe58..5815aa3 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -38,7 +38,6 @@ type readFile struct {
canSkipLines bool
// Seek to the EOF before processing file?
seekEOF bool
- limiter chan struct{}
}
// String returns the string representation of the readFile
@@ -66,25 +65,7 @@ func (f readFile) Retry() bool {
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
lines chan<- line.Line, re regex.Regex) error {
- dlog.Common.Debug("readFile", f)
- defer func() {
- select {
- case <-f.limiter:
- default:
- }
- }()
-
- select {
- case f.limiter <- struct{}{}:
- default:
- select {
- case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID,
- "Server limit reached. Queuing file..."):
- case <-ctx.Done():
- return nil
- }
- f.limiter <- struct{}{}
- }
+ dlog.Common.Trace("readFile", f)
fd, err := os.Open(f.filePath)
if err != nil {
@@ -156,7 +137,9 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
return
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer,
+ truncate <-chan struct{}) error {
+
var offset uint64
reader, err := f.makeReader(fd)
if err != nil {
@@ -250,6 +233,7 @@ func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *by
return
}
if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+ //dlog.Common.Trace("TODO", "lines", lines, len(lines), cap(lines))
select {
case lines <- filteredLine:
case <-ctx.Done():
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index b03b45d..7a40ac4 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,9 +6,7 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string,
- limiter chan struct{}) TailFile {
-
+func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile {
return TailFile{
readFile: readFile{
filePath: filePath,
@@ -17,7 +15,6 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string,
retry: true,
canSkipLines: true,
seekEOF: true,
- limiter: limiter,
},
}
}
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index 129081d..d6aac8c 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -3,11 +3,11 @@ package logformat
import (
"errors"
"fmt"
- "os"
"reflect"
"strings"
"time"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/mapr"
)
@@ -26,7 +26,7 @@ type Parser struct {
// NewParser returns a new log parser.
func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) {
- hostname, err := os.Hostname()
+ hostname, err := config.Hostname()
if err != nil {
return nil, err
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 97fee11..4162828 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -2,7 +2,6 @@ package server
import (
"context"
- "os"
"strings"
"time"
@@ -20,6 +19,7 @@ type Aggregate struct {
done *internal.Done
// NextLinesCh can be used to use a new line ch.
NextLinesCh chan chan line.Line
+ linesCh chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
@@ -37,7 +37,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return nil, err
}
- fqdn, err := os.Hostname()
+ fqdn, err := config.Hostname()
if err != nil {
dlog.Common.Error(err)
}
@@ -113,58 +113,84 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
+func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
+
+ dlog.Common.Trace("nextLine", "entry", line, ok, noMoreChannels)
+ select {
+ case line, ok = <-a.linesCh:
+ if !ok {
+ // Channel is closed, go to next channel.
+ select {
+ case a.linesCh = <-a.NextLinesCh:
+ default:
+ noMoreChannels = true
+ }
+ }
+ default:
+ // No new line from current lines channel. Try next one.
+ select {
+ case newLinesCh := <-a.NextLinesCh:
+ oldLinesCh := a.linesCh
+ go func() { a.NextLinesCh <- oldLinesCh }()
+ a.linesCh = newLinesCh
+ default:
+ // No new lines channel found.
+ }
+ }
+ dlog.Common.Trace("nextLine", "exit", line, ok, noMoreChannels)
+
+ return
+}
+
func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string {
fieldsCh := make(chan map[string]string)
go func() {
defer close(fieldsCh)
- var lines chan line.Line
// Gather first lines channel (first input file)
select {
- case lines = <-a.NextLinesCh:
+ case a.linesCh = <-a.NextLinesCh:
case <-ctx.Done():
return
}
for {
select {
- case line, ok := <-lines:
- if !ok {
- select {
- case lines = <-a.NextLinesCh:
- // Have a new lines channel (e.g. new input file)
- case <-ctx.Done():
- default:
- // No new lines channel found.
- return
- }
- }
+ case <-ctx.Done():
+ return
+ default:
+ }
- maprLine := strings.TrimSpace(line.Content.String())
- fields, err := a.parser.MakeFields(maprLine)
- // Can't recycle it here yet, as field slices are still
- // TODO: Add unit test reading from multiple mapreduce files lines.
- // TODO: Add capability to recycle this bytes buffer.
- //pool.RecycleBytesBuffer(line.Content)
-
- if err != nil {
- // Should fields be ignored anyway?
- if err != logformat.ErrIgnoreFields {
- dlog.Common.Error(fields, err)
- }
- continue
- }
- if !a.query.WhereClause(fields) {
- continue
+ // Gather first lines channel (first input file)
+ line, ok, noMoreChannels := a.nextLine()
+ if !ok {
+ if noMoreChannels {
+ break
}
+ time.Sleep(time.Millisecond * 100)
+ }
+
+ maprLine := strings.TrimSpace(line.Content.String())
+ fields, err := a.parser.MakeFields(maprLine)
+ // Can't recycle it here yet, as field slices are still
+ // MAYBETODO: Add capability to recycle this bytes buffer.
+ //pool.RecycleBytesBuffer(line.Content)
- select {
- case fieldsCh <- fields:
- case <-ctx.Done():
+ if err != nil {
+ // Should fields be ignored anyway?
+ if err != logformat.ErrIgnoreFields {
+ dlog.Common.Error(fields, err)
}
+ continue
+ }
+ if !a.query.WhereClause(fields) {
+ continue
+ }
+
+ select {
+ case fieldsCh <- fields:
case <-ctx.Done():
- return
}
}
}()
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 6d10d17..53bf375 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -114,7 +114,6 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
pool.RecycleBytesBuffer(line.Content)
case <-time.After(time.Second):
- // Once in a while check whether we are done.
select {
case <-h.done.Done():
err = io.EOF
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 6dd9872..e7f7762 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -2,10 +2,10 @@ package handlers
import (
"context"
- "os"
"strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
@@ -32,7 +32,7 @@ func NewHealthHandler(user *user.User) *HealthHandler {
}
h.handleCommandCb = h.handleHealthCommand
- fqdn, err := os.Hostname()
+ fqdn, err := config.Hostname()
if err != nil {
dlog.Server.FatalPanic(err)
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 4728a55..51077fc 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -109,18 +109,51 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
r.readFile(ctx, ltx, path, globID, re)
}
+func (*readCommand) limit(ctx context.Context, limiter chan struct{}, message string) {
+ select {
+ case <-ctx.Done():
+ return
+ }
+}
+
func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
dlog.Server.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
+ var limiter chan struct{}
+
switch r.mode {
case omode.TailClient:
- reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter)
+ reader = fs.NewTailFile(path, globID, r.server.serverMessages)
+ limiter = r.server.tailLimiter
case omode.GrepClient, omode.CatClient:
- reader = fs.NewCatFile(path, globID, r.server.serverMessages, r.server.catLimiter)
+ reader = fs.NewCatFile(path, globID, r.server.serverMessages)
+ limiter = r.server.catLimiter
default:
- reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter)
+ reader = fs.NewTailFile(path, globID, r.server.serverMessages)
+ limiter = r.server.tailLimiter
+ }
+
+ defer func() {
+ select {
+ case <-limiter:
+ default:
+ }
+ }()
+
+ select {
+ case limiter <- struct{}{}:
+ case <-ctx.Done():
+ return
+ default:
+ dlog.Server.Info("Server limit hit, queueing file", len(limiter), path)
+ select {
+ case limiter <- struct{}{}:
+ dlog.Server.Info("Server limit OK now, processing file", len(limiter), path)
+ case <-ctx.Done():
+ return
+ }
}
lines := r.server.lines
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 36574a9..75a8acc 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -2,10 +2,10 @@ package handlers
import (
"context"
- "os"
"strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
@@ -43,7 +43,7 @@ func NewServerHandler(user *user.User, catLimiter,
}
h.handleCommandCb = h.handleUserCommand
- fqdn, err := os.Hostname()
+ fqdn, err := config.Hostname()
if err != nil {
dlog.Server.FatalPanic(err)
}
diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go
index 37f8382..2ee32ad 100644
--- a/internal/ssh/client/authmethods.go
+++ b/internal/ssh/client/authmethods.go
@@ -29,19 +29,14 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{},
privateKeyPath string) ([]gossh.AuthMethod, HostKeyCallback) {
var sshAuthMethods []gossh.AuthMethod
- knownHostsPath := os.Getenv("HOME") + "/.ssh/known_hosts"
- knownHostsCallback, err := NewKnownHostsCallback(knownHostsPath, trustAllHosts,
+ knownHostsFile := config.SSHKnownHostsFile()
+ knownHostsCallback, err := NewKnownHostsCallback(knownHostsFile, trustAllHosts,
throttleCh)
if err != nil {
- dlog.Client.FatalPanic(knownHostsPath, err)
+ dlog.Client.FatalPanic(knownHostsFile, err)
}
- dlog.Client.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsPath)
- /*
- if config.Client.ExperimentalFeaturesEnable {
- sshAuthMethods = append(sshAuthMethods, gossh.Password("experimental feature test"))
- dlog.Client.Debug("initKnownHostsAuthMethods", "Added experimental method to list of auth methods")
- }
- */
+
+ dlog.Client.Debug("initKnownHostsAuthMethods", "Added known hosts file path", knownHostsFile)
// First try to read custom private key path.
if privateKeyPath != "" {
@@ -100,11 +95,7 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{},
dlog.Client.Debug("initKnownHostsAuthMethods", "Unable to use private key",
privateKeyPath, err)
- // This is only a panic when we expect to do something about it.
- if !config.Client.SSHDontAddHostsToKnownHostsFile {
- dlog.Client.FatalPanic("Unable to find private SSH key information")
- }
-
+ dlog.Client.FatalPanic("Unable to find private SSH key information")
// Never reach this point.
return sshAuthMethods, knownHostsCallback
}
diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go
index 2aa0168..dd58925 100644
--- a/internal/ssh/client/knownhostscallback.go
+++ b/internal/ssh/client/knownhostscallback.go
@@ -10,7 +10,6 @@ import (
"sync"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/prompt"
@@ -216,11 +215,6 @@ func (c KnownHostsCallback) promptAddHosts(hosts []unknownHost) {
func (c KnownHostsCallback) trustHosts(hosts []unknownHost) {
tmpKnownHostsPath := fmt.Sprintf("%s.tmp", c.knownHostsPath)
- if config.Client.SSHDontAddHostsToKnownHostsFile {
- dlog.Common.Verbose("Not adding hosts to known hosts file, as disabled by config")
- return
- }
-
newFd, err := os.OpenFile(tmpKnownHostsPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
panic(fmt.Sprintf("%s: %s", tmpKnownHostsPath, err.Error()))