summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <35781042+pbuetow@users.noreply.github.com>2021-03-29 17:49:16 +0100
committerGitHub <noreply@github.com>2021-03-29 17:49:16 +0100
commit9a467da883976c74d231ea9c7773430f583bab98 (patch)
tree4e75a996ef44bc5adc771c318753b0c4ad934269 /internal/server
parente811d1725ee5f931ece6fac01db70227b0fc8a7a (diff)
parent93fce245564ffde20c3e5113757bc65672f69ed5 (diff)
Merge pull request #22 from snonux/develop
Add context awareness to dgrep
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go21
-rw-r--r--internal/server/handlers/serverhandler.go45
-rw-r--r--internal/server/server.go5
3 files changed, 51 insertions, 20 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 5bab26f..b659c06 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -25,7 +26,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) {
+func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) {
re := regex.NewNoop()
if argc >= 4 {
@@ -40,10 +41,10 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie
r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc))
return
}
- r.readGlob(ctx, args[1], re, retries)
+ r.readGlob(ctx, lContext, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) {
+func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) {
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
@@ -67,7 +68,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
continue
}
- r.readFiles(ctx, paths, glob, re, retryInterval)
+ r.readFiles(ctx, lContext, paths, glob, re, retryInterval)
return
}
@@ -75,18 +76,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
-func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
- go r.readFileIfPermissions(ctx, &wg, path, glob, re)
+ go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re)
}
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
globID := r.makeGlobID(path, glob)
@@ -96,10 +97,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr
return
}
- r.readFile(ctx, path, globID, re)
+ r.readFile(ctx, lContext, path, globID, re)
}
-func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) {
+func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) {
logger.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
@@ -120,7 +121,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
for {
- if err := reader.Start(ctx, lines, re); err != nil {
+ if err := reader.Start(ctx, lContext, lines, re); err != nil {
logger.Error(r.server.user, path, globID, err)
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 185e7c2..7da6012 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
user "github.com/mimecast/dtail/internal/user/server"
@@ -240,7 +242,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
- options, err := readOptions(splitted[1:])
+ options, lContext, err := readOptions(splitted[1:])
if err != nil {
h.sendServerMessage(logger.Error(h.user, err))
commandFinished()
@@ -258,7 +260,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.CatClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 1)
+ command.Start(ctx, lContext, argc, args, 1)
readerFinished()
commandFinished()
}()
@@ -267,7 +269,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.TailClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 10)
+ command.Start(ctx, lContext, argc, args, 10)
readerFinished()
commandFinished()
}()
@@ -390,13 +392,16 @@ func (h *ServerHandler) decrementActiveReaders() int32 {
return atomic.LoadInt32(&h.activeReaders)
}
-func readOptions(opts []string) (map[string]string, error) {
+// TODO: All options related code should be in its own package (client + server)
+func readOptions(opts []string) (map[string]string, lcontext.LContext, error) {
options := make(map[string]string, len(opts))
+ // Local search context
+ var lContext lcontext.LContext
for _, o := range opts {
kv := strings.SplitN(o, "=", 2)
if len(kv) != 2 {
- return options, fmt.Errorf("Unable to parse options: %v", kv)
+ continue
}
key := kv[0]
val := kv[1]
@@ -405,13 +410,37 @@ func readOptions(opts []string) (map[string]string, error) {
s := strings.SplitN(val, "%", 2)
decoded, err := base64.StdEncoding.DecodeString(s[1])
if err != nil {
- return options, err
+ return options, lContext, err
}
val = string(decoded)
}
- options[key] = val
+ switch key {
+ case "before":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.BeforeContext = iVal
+ case "after":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.AfterContext = iVal
+ case "max":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.MaxCount = iVal
+ default:
+ options[key] = val
+ }
}
- return options, nil
+ return options, lContext, nil
}
diff --git a/internal/server/server.go b/internal/server/server.go
index a20737e..73822d5 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -141,7 +141,7 @@ func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChann
}
if err := s.handleRequests(ctx, sshConn, requests, channel, user); err != nil {
- logger.Error(user, err)
+ logger.Error(user, "While handling request", err)
sshConn.Close()
}
}
@@ -190,7 +190,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
go func() {
if err := sshConn.Wait(); err != nil && err != io.EOF {
- logger.Error(user, err)
+ // Use of closed network connection.
+ logger.Debug(user, "While waiting for ssh connection", err)
}
s.stats.decrementConnections()
logger.Info(user, "Good bye Mister!")