summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <35781042+pbuetow@users.noreply.github.com>2021-03-29 17:49:16 +0100
committerGitHub <noreply@github.com>2021-03-29 17:49:16 +0100
commit9a467da883976c74d231ea9c7773430f583bab98 (patch)
tree4e75a996ef44bc5adc771c318753b0c4ad934269
parente811d1725ee5f931ece6fac01db70227b0fc8a7a (diff)
parent93fce245564ffde20c3e5113757bc65672f69ed5 (diff)
Merge pull request #22 from snonux/develop
Add context awareness to dgrep
-rw-r--r--.gitignore14
-rw-r--r--cmd/dgrep/main.go15
-rw-r--r--cmd/dtail/main.go9
-rw-r--r--docker/Dockerfile3
-rw-r--r--docker/Makefile3
-rwxr-xr-xdocker/spindown.sh1
-rwxr-xr-xdocker/spinup.sh3
-rw-r--r--internal/clients/args.go6
-rw-r--r--internal/clients/baseclient.go38
-rw-r--r--internal/clients/catclient.go6
-rw-r--r--internal/clients/grepclient.go6
-rw-r--r--internal/clients/handlers/basehandler.go10
-rw-r--r--internal/clients/maker.go2
-rw-r--r--internal/clients/maprclient.go10
-rw-r--r--internal/clients/tailclient.go6
-rw-r--r--internal/datas/rbuffer.go49
-rw-r--r--internal/datas/rbuffer_test.go106
-rw-r--r--internal/io/fs/filereader.go3
-rw-r--r--internal/io/fs/filter.go167
-rw-r--r--internal/io/fs/readfile.go172
-rw-r--r--internal/io/fs/truncate.go61
-rw-r--r--internal/lcontext/lcontext.go22
-rw-r--r--internal/mapr/groupset.go6
-rw-r--r--internal/options/options.go3
-rw-r--r--internal/server/handlers/readcommand.go21
-rw-r--r--internal/server/handlers/serverhandler.go45
-rw-r--r--internal/server/server.go5
-rw-r--r--internal/ssh/client/authmethods.go9
-rw-r--r--internal/version/version.go2
29 files changed, 604 insertions, 199 deletions
diff --git a/.gitignore b/.gitignore
index c9e8333..bac8dd1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,14 +1,10 @@
-*_proprietary.go
-<<<<<<< HEAD
-cache/
-log/
-tags
-=======
-/cache/
-/log/
+/cache
+/log
+/tags
/dtail
/dgrep
/dcat
/dmap
/dserver
->>>>>>> 7ee0121afed3e7cab6457142f70e411020ab2b21
+/bin
+serverlist.txt
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index 4da1bb3..123d061 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -36,15 +36,24 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
- flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")
+
+ // Line context awareness.
+ flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
+ flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
+ flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
+ flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")
flag.Parse()
+ if grep != "" {
+ args.RegexStr = grep
+ }
+
config.Read(cfgFile, sshPort)
color.Colored = !noColor
@@ -58,10 +67,6 @@ func main() {
Quiet: args.Quiet,
})
- if grep != "" {
- args.RegexStr = grep
- }
-
client, err := clients.NewGrepClient(args)
if err != nil {
panic(err)
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index f2a039f..2639b4b 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -50,14 +50,19 @@ func main() {
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
- flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&cfgFile, "cfg", "", "Config file path")
- flag.StringVar(&grep, "grep", "", "Alias for -regex")
flag.StringVar(&queryStr, "query", "", "Map reduce query")
+ // Line context awareness.
+ flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
+ flag.StringVar(&grep, "grep", "", "Alias for -regex")
+ flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
+ flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
+ flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")
+
flag.Parse()
if grep != "" {
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 8632832..61a1f7d 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -14,5 +14,6 @@ USER dserver
WORKDIR /var/run/dserver
EXPOSE 2222/tcp
+EXPOSE 8080/tcp
-CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"]
+CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"]
diff --git a/docker/Makefile b/docker/Makefile
index 029adf6..f09d9e0 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -3,7 +3,8 @@ all:
docker build . -t dserver:develop
rm ./dserver
run:
- docker run -p 2222:2222 dserver:develop
+ # http://localhost:8080/debug/pprof/goroutines?debug=1
+ docker run -p 2222:2222 -p 8080:8080 dserver:develop
spinup:
./spinup.sh 10
spindown:
diff --git a/docker/spindown.sh b/docker/spindown.sh
index 73ed059..b3d3a8c 100755
--- a/docker/spindown.sh
+++ b/docker/spindown.sh
@@ -6,4 +6,5 @@ declare -i BASE_PORT=2222
for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker stop dserver-serv$i
+ docker rm dserver-serv$i
done
diff --git a/docker/spinup.sh b/docker/spinup.sh
index 3890ce6..0625967 100755
--- a/docker/spinup.sh
+++ b/docker/spinup.sh
@@ -3,7 +3,10 @@
declare -i NUM_INSTANCES=$1
declare -i BASE_PORT=2222
+echo > serverlist.txt
+
for (( i=0; i < $NUM_INSTANCES; i++ )); do
port=$[ BASE_PORT + i + 1 ]
docker run -d --name dserver-serv$i --hostname serv$i -p $port:2222 dserver:develop
+ echo localhost:$port >> serverlist.txt
done
diff --git a/internal/clients/args.go b/internal/clients/args.go
index 7f782f1..684dadd 100644
--- a/internal/clients/args.go
+++ b/internal/clients/args.go
@@ -1,6 +1,7 @@
package clients
import (
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -8,12 +9,13 @@ import (
// Args is a helper struct to summarize common client arguments.
type Args struct {
+ lcontext.LContext
+ RegexStr string
Mode omode.Mode
ServersStr string
UserName string
What string
Arguments []string
- RegexStr string
RegexInvert bool
TrustAllHosts bool
Discovery string
@@ -22,5 +24,5 @@ type Args struct {
SSHAuthMethods []gossh.AuthMethod
SSHHostKeyCallback gossh.HostKeyCallback
PrivateKeyPathFile string
- Quiet bool
+ Quiet bool
}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index f20156f..f83fcfd 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -2,6 +2,8 @@ package clients
import (
"context"
+ "fmt"
+ "strings"
"sync"
"time"
@@ -118,10 +120,44 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
}
}
+func (c *baseClient) makeCommandOptions() map[string]string {
+ options := make(map[string]string)
+
+ if c.Args.Quiet {
+ options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet)
+ }
+ if c.Args.LContext.MaxCount != 0 {
+ options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount)
+ }
+ if c.Args.LContext.BeforeContext != 0 {
+ options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext)
+ }
+ if c.Args.LContext.AfterContext != 0 {
+ options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext)
+ }
+
+ return options
+}
+
+func (c *baseClient) commandOptionsToString(options map[string]string) string {
+ var sb strings.Builder
+
+ count := 0
+ for k, v := range options {
+ if count > 0 {
+ sb.WriteString(":")
+ }
+ sb.WriteString(fmt.Sprintf("%s=%s", k, v))
+ count++
+ }
+
+ return sb.String()
+}
+
func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection {
conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
conn.Handler = c.maker.makeHandler(server)
- conn.Commands = c.maker.makeCommands()
+ conn.Commands = c.maker.makeCommands(c.makeCommandOptions())
return conn
}
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index b7b6131..db892f1 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -41,10 +41,10 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c CatClient) makeCommands() (commands []string) {
- options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
+func (c CatClient) makeCommands(options map[string]string) (commands []string) {
+ optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
return
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 652c31b..567193a 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -40,10 +40,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c GrepClient) makeCommands() (commands []string) {
- options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
+func (c GrepClient) makeCommands(options map[string]string) (commands []string) {
+ optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
return
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index f07fd90..602a7ac 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -101,15 +101,7 @@ func (h *baseHandler) handleMessageType(message string) {
// Handle messages received from server which are not meant to be displayed
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
- switch {
- case strings.HasPrefix(message, ".syn close connection"):
+ if strings.HasPrefix(message, ".syn close connection") {
h.SendMessage(".ack close connection")
- select {
- case <-time.After(time.Second * 5):
- logger.Debug("Shutting down client after timeout and sending ack to server")
- h.Shutdown()
- case <-h.Done():
- return
- }
}
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index d5ffd8b..a1d6864 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -9,5 +9,5 @@ import (
// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
- makeCommands() (commands []string)
+ makeCommands(options map[string]string) (commands []string)
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 1c0c2cc..feb7e47 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -110,27 +110,31 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
-func (c MaprClient) makeCommands() (commands []string) {
+func (c MaprClient) makeCommands(options map[string]string) (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
- options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
}
+ optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
if c.Timeout > 0 {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize()))
}
return
}
func (c *MaprClient) periodicReportResults(ctx context.Context) {
+ rampUpSleep := c.query.Interval / 2
+ logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
+ time.Sleep(rampUpSleep)
+
for {
select {
case <-time.After(c.query.Interval):
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index cefbaa7..853ef1d 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -37,10 +37,10 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c TailClient) makeCommands() (commands []string) {
- options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
+func (c TailClient) makeCommands(options map[string]string) (commands []string) {
+ optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
}
logger.Debug(commands)
diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go
new file mode 100644
index 0000000..df8f622
--- /dev/null
+++ b/internal/datas/rbuffer.go
@@ -0,0 +1,49 @@
+package datas
+
+import "fmt"
+
+// RBuffer is a simple circular string ring buffer data structure.
+type RBuffer struct {
+ Capacity int
+ size int
+ readPos int
+ writePos int
+ data []string
+}
+
+// NewRBuffer creates a new string ring buffer.
+func NewRBuffer(capacity int) (*RBuffer, error) {
+ if capacity < 1 {
+ return nil, fmt.Errorf("RBuffer capacity must not be less than 1")
+ }
+
+ r := RBuffer{
+ Capacity: capacity,
+ size: capacity + 1,
+ data: make([]string, capacity+1),
+ }
+
+ return &r, nil
+}
+
+// Add a value.
+func (r *RBuffer) Add(value string) {
+ r.data[r.writePos] = value
+ r.writePos = (r.writePos + 1) % r.size
+
+ if r.writePos == r.readPos {
+ r.readPos = (r.readPos + 1) % r.size
+ }
+}
+
+// Get a value.
+func (r *RBuffer) Get() (string, bool) {
+ if r.readPos == r.writePos {
+ // RBuffer is empty.
+ return "", false
+ }
+
+ value := r.data[r.readPos]
+ r.readPos = (r.readPos + 1) % r.size
+ return value, true
+}
diff --git a/internal/datas/rbuffer_test.go b/internal/datas/rbuffer_test.go
new file mode 100644
index 0000000..456511a
--- /dev/null
+++ b/internal/datas/rbuffer_test.go
@@ -0,0 +1,106 @@
+package datas
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+)
+
+func TestRBufferOneElement(t *testing.T) {
+ r, err := NewRBuffer(1)
+ if err != nil {
+ t.Errorf("Expected error creating ring buffer with capacity 1")
+ }
+
+ testRBufferValues(t, r, []string{"Hello world"})
+ testRBufferValues(t, r, []string{"Hello world", "Hello universe"})
+}
+
+func TestRBuffer(t *testing.T) {
+ if _, err := NewRBuffer(0); err == nil {
+ t.Errorf("Expected error creating ring buffer with capacity 0")
+ }
+
+ r, err := NewRBuffer(10)
+ if err != nil {
+ t.Errorf("Error creating ring buffer with capacity 10: %v", err)
+ }
+
+ fiveValues := []string{
+ "42 is the answer!",
+ "Scroption: Get over here!",
+ "Have you swiped your nectar card?",
+ "Please mind the gap between the train and the platform!",
+ "Visit DTail at https://dtail.dev",
+ }
+ testRBufferValues(t, r, fiveValues)
+
+ moreFiveValues := []string{
+ "I love Golang",
+ "As a contrast, I also love Perl",
+ "Mimecast: Stop Bad Things From Happening to Good Organizations",
+ "We are the Buetow Brothers",
+ "London is calling",
+ }
+ tenValues := append(fiveValues, moreFiveValues...)
+ testRBufferValues(t, r, tenValues)
+}
+
+func TestRandomRBuffer(t *testing.T) {
+ for i := 0; i < 100; i++ {
+ testRandomRBuffer(t)
+ }
+}
+
+func testRandomRBuffer(t *testing.T) {
+ rand.Seed(time.Now().UnixNano())
+
+ maxCapacity := 1000
+ minCapacity := 1
+ capacity := rand.Intn(maxCapacity-minCapacity) + minCapacity
+ r, err := NewRBuffer(capacity)
+ if err != nil {
+ t.Errorf("Error creating ring buffer with capacity %d: %v", capacity, err)
+ }
+
+ numValues := rand.Intn(capacity * 2)
+ values := make([]string, numValues)
+ for i := 0; i < numValues; i++ {
+ values = append(values, fmt.Sprintf("%d.%d", i, rand.Int()))
+ }
+
+ testRBufferValues(t, r, values)
+}
+
+func testRBufferValues(t *testing.T, r *RBuffer, values []string) {
+ value, ok := r.Get()
+ if ok {
+ t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value)
+ }
+
+ for _, value := range values {
+ r.Add(value)
+ }
+
+ expectedValues := values
+ overCapacity := len(values) - r.Capacity
+ if overCapacity > 0 {
+ expectedValues = values[overCapacity:]
+ }
+
+ for _, expected := range expectedValues {
+ value, ok := r.Get()
+ if !ok {
+ t.Errorf("Expected value '%s' but got nothing", expected)
+ }
+ if value != expected {
+ t.Errorf("Expected value '%s' but got value '%v'", expected, value)
+ }
+ }
+
+ value, ok = r.Get()
+ if ok {
+ t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value)
+ }
+}
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 0774837..efd410e 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -4,12 +4,13 @@ import (
"context"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
type FileReader interface {
- Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go
new file mode 100644
index 0000000..c4f605e
--- /dev/null
+++ b/internal/io/fs/filter.go
@@ -0,0 +1,167 @@
+package fs
+
+import (
+ "context"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if lContext.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, rawLines, lines, re, lContext)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := lContext.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := lContext.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan []byte
+ if processBefore {
+ beforeBuf = make(chan []byte, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := lContext.AfterContext > 0
+
+ for rawLine := range rawLines {
+ // logger.Debug("rawLine", string(rawLine))
+ f.updatePosition()
+
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- rawLine:
+ default:
+ <-beforeBuf
+ beforeBuf <- rawLine
+ }
+ }
+ continue
+ }
+
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
+ }
+ after = lContext.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case myRawLine := <-beforeBuf:
+ myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100}
+ i--
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
+ }
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression, there is no local grep context specified.
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+ for {
+ select {
+ case rawLine, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+
+ if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) {
+ continue
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()}
+
+ select {
+ case lines <- line:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+}
+
+func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool {
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ // Regex dosn't match, so not interested in it.
+ return true
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ // Matching, not transmittable
+ return true
+ }
+ f.updateLineTransmitted()
+
+ // Matching, transmittable
+ return false
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 6757bd6..161e3f0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -4,16 +4,15 @@ import (
"bufio"
"compress/gzip"
"context"
- "errors"
"fmt"
"io"
"os"
"strings"
- "sync"
"time"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -38,6 +37,28 @@ type readFile struct {
limiter chan struct{}
}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
// String returns the string representation of the readFile
func (f readFile) String() string {
return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
@@ -59,7 +80,7 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
logger.Debug("readFile", f)
defer func() {
select {
@@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
}
rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ readCtx, readCancel := context.WithCancel(ctx)
+
+ filterDone := make(chan struct{})
+ go func() {
+ f.filter(ctx, rawLines, lines, re, lContext)
+ close(filterDone)
+ // If the filter stopped, make the reader stop too, no need to read
+ // more data if there is nothing more the filter wants to filter for!
+ // E.g. it could be that we only want to filter N matches but not more.
+ readCancel()
+ }()
- err = f.read(ctx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, rawLines)
close(rawLines)
- wg.Wait()
-
- return err
-}
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
+ // Filter may flushes some data still. So wait until it is done here.
+ <-filterDone
- return
+ return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
longLineWarning := false
+ checkTruncate := f.truncateTimer(ctx)
+
for {
select {
case <-ctx.Done():
@@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
select {
- case <-truncate:
+ case <-checkTruncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
@@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
}
}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- for {
- select {
- case line, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
- select {
- case lines <- filteredLine:
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
- var read line.Line
-
- if !re.Match(lineBytes) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineTransmitted()
-
- read = line.Line{
- Content: lineBytes,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}
diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go
new file mode 100644
index 0000000..a8d59ac
--- /dev/null
+++ b/internal/io/fs/truncate.go
@@ -0,0 +1,61 @@
+package fs
+
+import (
+ "context"
+ "errors"
+ "io"
+ "os"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) {
+ checkTruncate = make(chan struct{})
+
+ go func() {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case checkTruncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ logger.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
+ }
+
+ return false, nil
+}
diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go
new file mode 100644
index 0000000..89cb7c3
--- /dev/null
+++ b/internal/lcontext/lcontext.go
@@ -0,0 +1,22 @@
+package lcontext
+
+// LContext stands for line context and is here to help filtering out only specific lines.
+type LContext struct {
+ AfterContext int
+ BeforeContext int
+ MaxCount int
+}
+
+// Has returns true if it has any parameter set.
+func (c LContext) Has() bool {
+ if c.AfterContext > 0 {
+ return true
+ }
+ if c.BeforeContext > 0 {
+ return true
+ }
+ if c.MaxCount > 0 {
+ return true
+ }
+ return false
+}
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 50155f8..b5c8a48 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -68,15 +68,11 @@ func (g *GroupSet) WriteResult(query *Query) error {
}
// -1: Don't limit the result, include all data sets
- result, count, err := g.limitedResult(query, query.Limit, "", ",", true)
+ result, _, err := g.limitedResult(query, query.Limit, "", ",", true)
if err != nil {
return err
}
- if count == 0 {
- logger.Warn("Not writing outfile this time as empty result set", query.Outfile)
- }
-
logger.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
diff --git a/internal/options/options.go b/internal/options/options.go
new file mode 100644
index 0000000..816ddc9
--- /dev/null
+++ b/internal/options/options.go
@@ -0,0 +1,3 @@
+package options
+
+type Options map[string]string
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 5bab26f..b659c06 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -25,7 +26,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) {
+func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) {
re := regex.NewNoop()
if argc >= 4 {
@@ -40,10 +41,10 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie
r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc))
return
}
- r.readGlob(ctx, args[1], re, retries)
+ r.readGlob(ctx, lContext, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) {
+func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) {
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
@@ -67,7 +68,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
continue
}
- r.readFiles(ctx, paths, glob, re, retryInterval)
+ r.readFiles(ctx, lContext, paths, glob, re, retryInterval)
return
}
@@ -75,18 +76,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
-func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
- go r.readFileIfPermissions(ctx, &wg, path, glob, re)
+ go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re)
}
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
globID := r.makeGlobID(path, glob)
@@ -96,10 +97,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr
return
}
- r.readFile(ctx, path, globID, re)
+ r.readFile(ctx, lContext, path, globID, re)
}
-func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) {
+func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) {
logger.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
@@ -120,7 +121,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
for {
- if err := reader.Start(ctx, lines, re); err != nil {
+ if err := reader.Start(ctx, lContext, lines, re); err != nil {
logger.Error(r.server.user, path, globID, err)
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 185e7c2..7da6012 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
user "github.com/mimecast/dtail/internal/user/server"
@@ -240,7 +242,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
- options, err := readOptions(splitted[1:])
+ options, lContext, err := readOptions(splitted[1:])
if err != nil {
h.sendServerMessage(logger.Error(h.user, err))
commandFinished()
@@ -258,7 +260,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.CatClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 1)
+ command.Start(ctx, lContext, argc, args, 1)
readerFinished()
commandFinished()
}()
@@ -267,7 +269,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.TailClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 10)
+ command.Start(ctx, lContext, argc, args, 10)
readerFinished()
commandFinished()
}()
@@ -390,13 +392,16 @@ func (h *ServerHandler) decrementActiveReaders() int32 {
return atomic.LoadInt32(&h.activeReaders)
}
-func readOptions(opts []string) (map[string]string, error) {
+// TODO: All options related code should be in its own package (client + server)
+func readOptions(opts []string) (map[string]string, lcontext.LContext, error) {
options := make(map[string]string, len(opts))
+ // Local search context
+ var lContext lcontext.LContext
for _, o := range opts {
kv := strings.SplitN(o, "=", 2)
if len(kv) != 2 {
- return options, fmt.Errorf("Unable to parse options: %v", kv)
+ continue
}
key := kv[0]
val := kv[1]
@@ -405,13 +410,37 @@ func readOptions(opts []string) (map[string]string, error) {
s := strings.SplitN(val, "%", 2)
decoded, err := base64.StdEncoding.DecodeString(s[1])
if err != nil {
- return options, err
+ return options, lContext, err
}
val = string(decoded)
}
- options[key] = val
+ switch key {
+ case "before":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.BeforeContext = iVal
+ case "after":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.AfterContext = iVal
+ case "max":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.MaxCount = iVal
+ default:
+ options[key] = val
+ }
}
- return options, nil
+ return options, lContext, nil
}
diff --git a/internal/server/server.go b/internal/server/server.go
index a20737e..73822d5 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -141,7 +141,7 @@ func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChann
}
if err := s.handleRequests(ctx, sshConn, requests, channel, user); err != nil {
- logger.Error(user, err)
+ logger.Error(user, "While handling request", err)
sshConn.Close()
}
}
@@ -190,7 +190,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
go func() {
if err := sshConn.Wait(); err != nil && err != io.EOF {
- logger.Error(user, err)
+ // Use of closed network connection.
+ logger.Debug(user, "While waiting for ssh connection", err)
}
s.stats.decrementConnections()
logger.Info(user, "Good bye Mister!")
diff --git a/internal/ssh/client/authmethods.go b/internal/ssh/client/authmethods.go
index 2ff80b2..bbfb7be 100644
--- a/internal/ssh/client/authmethods.go
+++ b/internal/ssh/client/authmethods.go
@@ -77,6 +77,15 @@ func initKnownHostsAuthMethods(trustAllHosts bool, throttleCh chan struct{}, pri
}
logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
+ privateKeyPath = os.Getenv("HOME") + "/.ssh/id_ecdsa"
+ authMethod, err = ssh.PrivateKey(privateKeyPath)
+ if err == nil {
+ sshAuthMethods = append(sshAuthMethods, authMethod)
+ logger.Debug("initKnownHostsAuthmethods", "Added path to list of auth methods, not adding further methods", privateKeyPath)
+ return sshAuthMethods, knownHostsCallback
+ }
+ logger.Debug("initKnownHostsAuthMethods", "Unable to use private key", privateKeyPath, err)
+
logger.FatalExit("Unable to find private SSH key information")
// Never reach this point.
diff --git a/internal/version/version.go b/internal/version/version.go
index 1d974f7..f9acd56 100644
--- a/internal/version/version.go
+++ b/internal/version/version.go
@@ -11,7 +11,7 @@ const (
// Name of DTail.
Name string = "DTail"
// Version of DTail.
- Version string = "3.2.1"
+ Version string = "3.3.0"
// Additional information for DTail
Additional string = "develop"
// ProtocolCompat -ibility version.