From f46abf36e80a67d13d12dbe4ba8c899026e53961 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 31 Jul 2021 18:20:03 +0300 Subject: more on configurable colors --- internal/io/logger/logger.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'internal/io') diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 4254eef..bef5293 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -12,7 +12,7 @@ import ( "syscall" "time" - "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/color/brush" "github.com/mimecast/dtail/internal/config" ) @@ -207,8 +207,8 @@ func write(what, severity, message string) { if Mode.logToStdout { line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message) - if color.Colored { - line = color.Colorfy(line) + if config.Client.TermColorsEnabled { + line = brush.Colorfy(line) } stdoutBufCh <- line @@ -262,8 +262,8 @@ func Raw(message string) { } if Mode.logToStdout { - if color.Colored { - message = color.Colorfy(message) + if config.Client.TermColorsEnabled { + message = brush.Colorfy(message) } stdoutBufCh <- message } -- cgit v1.2.3 From 3cc8887885f24a3f0d607af24197bc364ab16b8d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 12 Aug 2021 10:56:36 +0300 Subject: add missing brush and also add color client configs plus jsonschema --- internal/io/logger/logger.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'internal/io') diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index bef5293..bb9dc02 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -207,7 +207,7 @@ func write(what, severity, message string) { if Mode.logToStdout { line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message) - if config.Client.TermColorsEnabled { + if config.Client.TermColorsEnable { line = brush.Colorfy(line) } @@ -262,7 +262,7 @@ func Raw(message string) { } if Mode.logToStdout { - if config.Client.TermColorsEnabled { + if config.Client.TermColorsEnable { message = brush.Colorfy(message) } stdoutBufCh <- message -- cgit v1.2.3 From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/io/fs/readfile.go | 71 +++++++++++++++++--------------------------- internal/io/logger/logger.go | 7 +++-- 2 files changed, 33 insertions(+), 45 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..8a365a1 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -14,6 +14,7 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -148,80 +149,64 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t if err != nil { return err } - rawLine := make([]byte, 0, 512) lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false + warnedAboutLongLine := false + message := make([]byte, 0, 512) for { select { case <-ctx.Done(): return nil - default: - } - - select { case <-truncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } - logger.Info(f.filePath, "Current offset", offset) default: } - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() + b, err := reader.ReadByte() if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. if err != io.EOF { return err } if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") + logger.Info(f.FilePath(), "End of file reached") return nil } time.Sleep(time.Millisecond * 100) continue } + offset++ - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-ctx.Done(): - return nil + switch b { + case '\n': + if len(message) == 0 { + time.Sleep(time.Millisecond * 100) + continue } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false - } - continue - } - - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true - } - rawLine = append(rawLine, '\n') select { - case rawLines <- rawLine: + case rawLines <- append(message, protocol.MessageDelimiter): + message = make([]byte, 0, 512) + warnedAboutLongLine = false case <-ctx.Done(): return nil } - rawLine = make([]byte, 0, 512) + default: + if len(message) >= lineLengthThreshold { + if !warnedAboutLongLine { + f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") + warnedAboutLongLine = true + } + select { + case <-ctx.Done(): + return nil + case rawLines <- append(message, protocol.MessageDelimiter): + message = make([]byte, 0, 512) + } + } + message = append(message, b) } } } diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index bb9dc02..3a3935d 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -205,7 +205,7 @@ func Trace(args ...interface{}) string { // Write log line to buffer and/or log file. func write(what, severity, message string) { if Mode.logToStdout { - line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message) + line := fmt.Sprintf("%s|%s|%s|%s", what, hostname, severity, message) if config.Client.TermColorsEnable { line = brush.Colorfy(line) @@ -219,7 +219,7 @@ func write(what, severity, message string) { timeStr := t.Format("20060102-150405") fileLogBufCh <- buf{ time: t, - message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message), + message: fmt.Sprintf("%s|%s|%s|%s", severity, timeStr, what, message), } } } @@ -326,6 +326,7 @@ func Flush() { select { case message := <-stdoutBufCh: stdoutWriter.WriteString(message) + stdoutWriter.WriteString("\n") default: stdoutWriter.Flush() return @@ -338,6 +339,7 @@ func writeToStdout(ctx context.Context) { select { case message := <-stdoutBufCh: stdoutWriter.WriteString(message) + stdoutWriter.WriteString("\n") case <-time.After(time.Millisecond * 100): stdoutWriter.Flush() case <-pauseCh: @@ -365,6 +367,7 @@ func writeToFile(ctx context.Context) { dateStr := buf.time.Format("20060102") w := fileWriter(dateStr) w.WriteString(buf.message) + w.WriteString("\n") case <-pauseCh: PAUSE: for { -- cgit v1.2.3 From 6d727b9bdbc387c8a5c34406a2c4de9140face38 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 19:36:46 +0100 Subject: use a byte.Buffer in the file reader --- internal/io/fs/readfile.go | 32 +++++++++++++++++++------------- internal/io/line/line.go | 5 +++-- internal/io/pool/bytesbuffer.go | 19 +++++++++++++++++++ 3 files changed, 41 insertions(+), 15 deletions(-) create mode 100644 internal/io/pool/bytesbuffer.go (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 8a365a1..e44f30e 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -2,6 +2,7 @@ package fs import ( "bufio" + "bytes" "compress/gzip" "context" "errors" @@ -14,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" @@ -90,7 +92,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re fd.Seek(0, io.SeekEnd) } - rawLines := make(chan []byte, 100) + rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) var wg sync.WaitGroup @@ -142,7 +144,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { return } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 reader, err := f.makeReader(fd) @@ -152,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb warnedAboutLongLine := false - message := make([]byte, 0, 512) + message := pool.BytesBuffer.Get().(*bytes.Buffer) for { select { @@ -182,37 +184,41 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t switch b { case '\n': - if len(message) == 0 { + if message.Len() == 0 { time.Sleep(time.Millisecond * 100) continue } + message.WriteByte(protocol.MessageDelimiter) select { - case rawLines <- append(message, protocol.MessageDelimiter): - message = make([]byte, 0, 512) + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) warnedAboutLongLine = false case <-ctx.Done(): return nil } default: - if len(message) >= lineLengthThreshold { + if message.Len() >= lineLengthThreshold { if !warnedAboutLongLine { f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") warnedAboutLongLine = true } + message.WriteByte(protocol.MessageDelimiter) select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) case <-ctx.Done(): return nil - case rawLines <- append(message, protocol.MessageDelimiter): - message = make([]byte, 0, 512) } } - message = append(message, b) + message.WriteByte(b) } } } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { defer wg.Done() for { @@ -233,10 +239,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha } } -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { +func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !re.Match(lineBytes) { + if !re.Match(lineBytes.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 715be34..d306c88 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -1,13 +1,14 @@ package line import ( + "bytes" "fmt" ) // Line represents a read log line. type Line struct { // The content of the log line. - Content []byte + Content *bytes.Buffer // Until now, how many log lines were processed? Count uint64 // Sometimes we produce too many log lines so that the client @@ -25,7 +26,7 @@ type Line struct { // Return a human readable representation of the followed line. func (l Line) String() string { return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)", - string(l.Content), + l.Content.String(), l.TransmittedPerc, l.Count, l.SourceID) diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go new file mode 100644 index 0000000..0a159f5 --- /dev/null +++ b/internal/io/pool/bytesbuffer.go @@ -0,0 +1,19 @@ +package pool + +import ( + "bytes" + "sync" +) + +var BytesBuffer = sync.Pool{ + New: func() interface{} { + b := bytes.Buffer{} + b.Grow(128) + return &b + }, +} + +func RecycleBytesBuffer(b *bytes.Buffer) { + b.Reset() + BytesBuffer.Put(b) +} -- cgit v1.2.3 From 23982f331c2154a66b86d596226c24454fd06be5 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 20:26:32 +0100 Subject: 1. Major performance gain by not checking for file truncation aftter each bytes read. 2. Introduce field separator to the protocol package. --- internal/io/fs/readfile.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index e44f30e..f2f672a 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -157,22 +157,21 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu message := pool.BytesBuffer.Get().(*bytes.Buffer) for { - select { - case <-ctx.Done(): - return nil - case <-truncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - default: - } - b, err := reader.ReadByte() if err != nil { if err != io.EOF { return err } + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + case <-ctx.Done(): + return nil + default: + } if !f.seekEOF { logger.Info(f.FilePath(), "End of file reached") return nil @@ -207,7 +206,6 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) - //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) case <-ctx.Done(): return nil } -- cgit v1.2.3 From f74a9e4b35feb8c07d8a70b5a581088a0a59889d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 7 Sep 2021 10:01:32 +0300 Subject: Produce MAPREDUCE lines, can aggregate these via default log format --- internal/io/logger/logger.go | 40 ++++++++++++++++++++++++++++++++-------- internal/io/pool/builder.go | 18 ++++++++++++++++++ 2 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 internal/io/pool/builder.go (limited to 'internal/io') diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 3a3935d..6890201 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -14,6 +14,8 @@ import ( "github.com/mimecast/dtail/internal/color/brush" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) const ( @@ -132,6 +134,24 @@ func Info(args ...interface{}) string { return log(clientStr, infoStr, args) } +// Mapreduce message logging. +func Mapreduce(table string, data map[string]interface{}) string { + args := make([]interface{}, len(data)+1) + + args[0] = fmt.Sprintf("MAPREDUCE:%s", strings.ToUpper(table)) + i := 1 + for k, v := range data { + args[i] = fmt.Sprintf("%s=%v", k, v) + i++ + } + + if Mode.Server { + return log(serverStr, infoStr, args) + } + + return log(clientStr, infoStr, args) +} + // Warn message logging. func Warn(args ...interface{}) string { if !Mode.Quiet { @@ -230,24 +250,28 @@ func log(what string, severity string, args []interface{}) string { return "" } - messages := []string{} + sb := pool.BuilderBuffer.Get().(*strings.Builder) + + for i, arg := range args { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } - for _, arg := range args { switch v := arg.(type) { case string: - messages = append(messages, v) + sb.WriteString(v) case int: - messages = append(messages, fmt.Sprintf("%d", v)) + sb.WriteString(fmt.Sprintf("%d", v)) case error: - messages = append(messages, v.Error()) + sb.WriteString(v.Error()) default: - messages = append(messages, fmt.Sprintf("%v", v)) + sb.WriteString(fmt.Sprintf("%v", v)) } } - message := strings.Join(messages, "|") + message := sb.String() + pool.RecycleBuilderBuffer(sb) write(what, severity, message) - return fmt.Sprintf("%s|%s", severity, message) } diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go new file mode 100644 index 0000000..c9dc221 --- /dev/null +++ b/internal/io/pool/builder.go @@ -0,0 +1,18 @@ +package pool + +import ( + "strings" + "sync" +) + +var BuilderBuffer = sync.Pool{ + New: func() interface{} { + sb := strings.Builder{} + return &sb + }, +} + +func RecycleBuilderBuffer(sb *strings.Builder) { + sb.Reset() + BuilderBuffer.Put(sb) +} -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/io/fs/readfile.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index f2f672a..c0d44dd 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -16,7 +16,6 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -187,7 +186,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu time.Sleep(time.Millisecond * 100) continue } - message.WriteByte(protocol.MessageDelimiter) + message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) @@ -202,7 +201,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") warnedAboutLongLine = true } - message.WriteByte(protocol.MessageDelimiter) + message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) -- cgit v1.2.3 From 6506e20f6c80f4acb7434eb9dd14f784a67189cd Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 14:41:25 +0300 Subject: add spartan mode --- internal/io/fs/readfile.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index c0d44dd..ec33c60 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -182,11 +182,14 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu switch b { case '\n': - if message.Len() == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - message.WriteString("\n") + /* + // dcat/dgrep should actually transfer empty lines + if message.Len() == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + */ + //message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/io/dlog/dlog.go | 206 +++++++++++++++++++++++++++++++ internal/io/dlog/level.go | 89 +++++++++++++ internal/io/dlog/loggers/factory.go | 60 +++++++++ internal/io/dlog/loggers/file.go | 156 +++++++++++++++++++++++ internal/io/dlog/loggers/fout.go | 46 +++++++ internal/io/dlog/loggers/logger.go | 18 +++ internal/io/dlog/loggers/none.go | 21 ++++ internal/io/dlog/loggers/stdout.go | 73 +++++++++++ internal/io/dlog/rotation.go | 27 ++++ internal/io/dlog/source.go | 19 +++ internal/io/dlog/strategy.go | 22 ++++ internal/io/fs/permissions/permission.go | 4 +- internal/io/fs/readfile.go | 16 +-- internal/io/logger/logger.go | 23 +++- internal/io/logger/modes.go | 9 +- internal/io/prompt/prompt.go | 6 +- 16 files changed, 774 insertions(+), 21 deletions(-) create mode 100644 internal/io/dlog/dlog.go create mode 100644 internal/io/dlog/level.go create mode 100644 internal/io/dlog/loggers/factory.go create mode 100644 internal/io/dlog/loggers/file.go create mode 100644 internal/io/dlog/loggers/fout.go create mode 100644 internal/io/dlog/loggers/logger.go create mode 100644 internal/io/dlog/loggers/none.go create mode 100644 internal/io/dlog/loggers/stdout.go create mode 100644 internal/io/dlog/rotation.go create mode 100644 internal/io/dlog/source.go create mode 100644 internal/io/dlog/strategy.go (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go new file mode 100644 index 0000000..7282741 --- /dev/null +++ b/internal/io/dlog/dlog.go @@ -0,0 +1,206 @@ +package dlog + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/mimecast/dtail/internal/color/brush" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog/loggers" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" +) + +// Client is the log handler for the client packages. +var Client *DLog + +// Server is the log handler for the server packages. +var Server *DLog + +// Common is the log handler for all other packages. +// TODO: Rename Common to Common +var Common *DLog + +var mutex sync.Mutex +var started bool + +// Start logger(s). +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLevel string) { + mutex.Lock() + defer mutex.Unlock() + + if started { + Common.FatalPanic("Logger already started") + } + + level := newLevel(logLevel) + switch sourceProcess { + case CLIENT: + // This is a DTail client process running. + impl := loggers.FOUT + Client = New(CLIENT, CLIENT, impl, level) + Server = New(CLIENT, SERVER, impl, level) + Common = Client + case SERVER: + // This is a DTail server process running. + impl := loggers.FILE + Client = New(SERVER, CLIENT, impl, level) + Server = New(SERVER, SERVER, impl, level) + Common = Server + } + + var wg2 sync.WaitGroup + wg2.Add(2) + Client.start(ctx, &wg2) + Server.start(ctx, &wg2) + started = true + + go rotation(ctx) + go func() { + wg2.Wait() + wg.Done() + }() +} + +// DLog is the DTail logger. +type DLog struct { + logger loggers.Logger + // Is this a DTail server or client process logging? + sourceProcess source + // Is this a DTail server or client package logging? In serverless mode + // the client can also execute code from the server package. + sourcePackage source + // Max log level to log. + maxLevel level +} + +// New creates a new DTail logger. +func New(sourceProcess, sourcePackage source, impl loggers.Impl, maxLevel level) *DLog { + return &DLog{ + logger: loggers.Factory(sourceProcess.String(), impl), + sourceProcess: sourceProcess, + sourcePackage: sourcePackage, + maxLevel: maxLevel, + } +} + +func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { + go func() { + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() + }() +} + +func (d *DLog) log(level level, args []interface{}) string { + if d.maxLevel < level { + return "" + } + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + now := time.Now() + + sb.WriteString(d.sourcePackage.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(now.Format("20060102-150405")) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(level.String()) + sb.WriteString(protocol.FieldDelimiter) + d.writeArgStrings(sb, args) + + message := sb.String() + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(now, message) + return message + } + + d.logger.LogWithColors(now, message, brush.Colorfy(message)) + return message +} + +func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { + for i, arg := range args { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } + switch v := arg.(type) { + case string: + sb.WriteString(v) + case error: + sb.WriteString(v.Error()) + default: + sb.WriteString(fmt.Sprintf("%v", v)) + } + } +} + +func (d *DLog) FatalPanic(args ...interface{}) { + d.log(FATAL, args) + d.logger.Flush() + panic("Not recovering from this fatal error...") +} + +func (d *DLog) Fatal(args ...interface{}) string { + return d.log(FATAL, args) +} + +func (d *DLog) Error(args ...interface{}) string { + return d.log(ERROR, args) +} + +func (d *DLog) Warn(args ...interface{}) string { + return d.log(WARN, args) +} + +func (d *DLog) Info(args ...interface{}) string { + return d.log(INFO, args) +} + +func (d *DLog) Verbose(args ...interface{}) string { + return d.log(VERBOSE, args) +} + +func (d *DLog) Debug(args ...interface{}) string { + return d.log(DEBUG, args) +} + +func (d *DLog) Trace(args ...interface{}) string { + return d.log(TRACE, args) +} + +func (d *DLog) Devel(args ...interface{}) string { + return d.log(DEVEL, args) +} + +func (d *DLog) Raw(message string) string { + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(time.Now(), message) + return message + } + + d.logger.Log(time.Now(), brush.Colorfy(message)) + return message +} + +func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { + args := make([]interface{}, len(data)+1) + args[0] = fmt.Sprintf("%s:%s", "MAPREDUCE", strings.ToUpper(table)) + + i := 1 + for k, v := range data { + args[i] = fmt.Sprintf("%s=%v", k, v) + i++ + } + + return d.log(INFO, args) +} + +func (d *DLog) Flush() { d.logger.Flush() } +func (d *DLog) Pause() { d.logger.Pause() } +func (d *DLog) Resume() { d.logger.Resume() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go new file mode 100644 index 0000000..84550f0 --- /dev/null +++ b/internal/io/dlog/level.go @@ -0,0 +1,89 @@ +package dlog + +import ( + "fmt" + "strings" +) + +type level int + +const ( + FATAL level = iota + ERROR level = iota + WARN level = iota + INFO level = iota + DEFAULT level = iota + VERBOSE level = iota + DEBUG level = iota + DEVEL level = iota + TRACE level = iota + ALL level = iota +) + +var allLevels = []level{ + FATAL, + ERROR, + WARN, + INFO, + DEFAULT, + VERBOSE, + DEBUG, + DEVEL, + TRACE, + ALL, +} + +func newLevel(l string) level { + switch strings.ToUpper(l) { + case "FATAL": + return FATAL + case "ERROR": + return ERROR + case "WARN": + return WARN + case "INFO": + return INFO + case "": + fallthrough + case "DEFAULT": + return DEFAULT + case "VERBOSE": + return VERBOSE + case "DEBUG": + return DEBUG + case "DEVEL": + return DEVEL + case "TRACE": + return TRACE + case "ALL": + return ALL + } + panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels)) +} + +func (l level) String() string { + switch l { + case FATAL: + return "FATAL" + case ERROR: + return "ERROR" + case WARN: + return "WARN" + case INFO: + return "INFO" + case DEFAULT: + return "DEFAULT" + case VERBOSE: + return "VERBOSE" + case DEBUG: + return "DEBUG" + case DEVEL: + return "DEVEL" + case TRACE: + return "TRACE" + case ALL: + return "ALL" + } + + panic("Unknown log level " + fmt.Sprintf("%d", l)) +} diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go new file mode 100644 index 0000000..3eb29c5 --- /dev/null +++ b/internal/io/dlog/loggers/factory.go @@ -0,0 +1,60 @@ +package loggers + +import ( + "fmt" + "sync" +) + +type Impl int + +const ( + NONE Impl = iota + STDOUT Impl = iota + FILE Impl = iota + FOUT Impl = iota +) + +var factoryMap map[string]Logger +var factoryMutex sync.Mutex + +func Factory(name string, impl Impl) Logger { + factoryMutex.Lock() + defer factoryMutex.Unlock() + + id := fmt.Sprintf("name:%s,impl:%v", name, impl) + + if factoryMap == nil { + factoryMap = make(map[string]Logger) + } + + singleton, ok := factoryMap[id] + if !ok { + switch impl { + case NONE: + singleton = none{} + case STDOUT: + singleton = newStdout() + factoryMap[id] = singleton + case FILE: + singleton = newFile() + factoryMap[id] = singleton + case FOUT: + singleton = newFout() + factoryMap[id] = singleton + } + } + + return singleton +} + +func FactoryRotate() { + factoryMutex.Lock() + defer factoryMutex.Unlock() + if factoryMap == nil { + return + } + + for _, impl := range factoryMap { + impl.Rotate() + } +} diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go new file mode 100644 index 0000000..1c525c9 --- /dev/null +++ b/internal/io/dlog/loggers/file.go @@ -0,0 +1,156 @@ +package loggers + +import ( + "bufio" + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/mimecast/dtail/internal/config" +) + +type fileMessageBuf struct { + now time.Time + message string +} + +type file struct { + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + lastDateStr string + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool +} + +func newFile() *file { + f := file{ + bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + rotateCh: make(chan struct{}), + flushCh: make(chan struct{}), + } + f.getWriter(time.Now().Format("20060102")) + return &f +} + +func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Logger already started from another Goroutine. + if s.started { + wg.Done() + return + } + + pause := func(ctx context.Context) { + select { + case <-s.resumeCh: + return + case <-ctx.Done(): + return + } + } + + go func() { + defer wg.Done() + + for { + select { + case m := <-s.bufferCh: + s.write(m) + case <-s.pauseCh: + pause(ctx) + case <-s.flushCh: + s.flush() + case <-ctx.Done(): + s.flush() + s.fd.Close() + return + } + } + }() + + s.started = true +} + +func (s *file) Log(now time.Time, message string) { + s.bufferCh <- &fileMessageBuf{now, message} +} + +func (s *file) LogWithColors(now time.Time, message, coloredMessage string) { + panic("Colors not supported in file logger") +} + +func (s *file) Pause() { s.pauseCh <- struct{}{} } +func (s *file) Resume() { s.resumeCh <- struct{}{} } +func (s *file) Flush() { s.flushCh <- struct{}{} } + +// TODO: Test that Rotate() actually works. +func (s *file) Rotate() { s.rotateCh <- struct{}{} } +func (file) SupportsColors() bool { return false } + +func (s *file) write(m *fileMessageBuf) { + select { + case <-s.rotateCh: + // Force re-opening the outfile. + s.lastDateStr = "" + default: + } + + writer := s.getWriter(m.now.Format("20060102")) + writer.WriteString(m.message) + writer.WriteByte('\n') +} + +func (s *file) getWriter(dateStr string) *bufio.Writer { + if s.lastDateStr == dateStr { + return s.writer + } + + if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { + if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { + panic(err) + } + } + + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) + newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + panic(err) + } + + // Close old writer. + if s.fd != nil { + s.writer.Flush() + s.fd.Close() + } + + s.fd = newFd + s.writer = bufio.NewWriterSize(s.fd, 1) + s.lastDateStr = dateStr + + return s.writer +} + +func (s *file) flush() { + defer s.writer.Flush() + + for { + select { + case m := <-s.bufferCh: + s.write(m) + default: + return + } + } +} diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go new file mode 100644 index 0000000..603dbe9 --- /dev/null +++ b/internal/io/dlog/loggers/fout.go @@ -0,0 +1,46 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +type fout struct { + file *file + stdout *stdout +} + +// Logs to both, a file and stdout +func newFout() *fout { + return &fout{file: newFile(), stdout: newStdout()} +} + +func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) { + go func() { + defer wg.Done() + + var wg2 sync.WaitGroup + wg2.Add(2) + f.file.Start(ctx, &wg2) + f.stdout.Start(ctx, &wg2) + wg2.Wait() + }() +} + +func (f *fout) Log(now time.Time, message string) { + f.stdout.Log(now, message) + f.file.Log(now, message) +} + +func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) { + f.stdout.LogWithColors(now, "", coloredMessage) + f.file.Log(now, message) +} + +func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() } +func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() } +func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() } +func (f *fout) Rotate() { f.file.Rotate() } + +func (fout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go new file mode 100644 index 0000000..c88900d --- /dev/null +++ b/internal/io/dlog/loggers/logger.go @@ -0,0 +1,18 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +type Logger interface { + Log(now time.Time, message string) + LogWithColors(now time.Time, message, messageWithColors string) + Start(ctx context.Context, wg *sync.WaitGroup) + Flush() + Pause() + Resume() + Rotate() + SupportsColors() bool +} diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go new file mode 100644 index 0000000..270027f --- /dev/null +++ b/internal/io/dlog/loggers/none.go @@ -0,0 +1,21 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +// don't log anything +type none struct{} + +func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() } +func (none) Log(now time.Time, message string) {} + +func (none) LogWithColors(now time.Time, message, coloredMessage string) {} + +func (none) Flush() {} +func (none) Pause() {} +func (none) Resume() {} +func (none) Rotate() {} +func (none) SupportsColors() bool { return false } diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go new file mode 100644 index 0000000..9738323 --- /dev/null +++ b/internal/io/dlog/loggers/stdout.go @@ -0,0 +1,73 @@ +package loggers + +import ( + "context" + "fmt" + "sync" + "time" +) + +type stdout struct { + bufferCh chan string + pauseCh chan struct{} + resumeCh chan struct{} +} + +func newStdout() *stdout { + return &stdout{ + bufferCh: make(chan string, 100), + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + } +} + +func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) { + pause := func(ctx context.Context) { + select { + case <-s.resumeCh: + return + case <-ctx.Done(): + return + } + } + + go func() { + defer wg.Done() + + for { + select { + case message := <-s.bufferCh: + fmt.Println(message) + case <-s.pauseCh: + pause(ctx) + case <-ctx.Done(): + s.Flush() + return + } + } + }() +} + +func (s *stdout) Log(now time.Time, message string) { + s.bufferCh <- message +} + +func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) { + s.bufferCh <- coloredMessage +} + +func (s *stdout) Flush() { + for { + select { + case message := <-s.bufferCh: + fmt.Println(message) + default: + return + } + } +} + +func (s *stdout) Pause() { s.pauseCh <- struct{}{} } +func (s *stdout) Resume() { s.resumeCh <- struct{}{} } +func (s *stdout) Rotate() {} +func (stdout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go new file mode 100644 index 0000000..15ce1fd --- /dev/null +++ b/internal/io/dlog/rotation.go @@ -0,0 +1,27 @@ +package dlog + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/mimecast/dtail/internal/io/dlog/loggers" +) + +func rotation(ctx context.Context) { + rotateCh := make(chan os.Signal, 1) + signal.Notify(rotateCh, syscall.SIGHUP) + go func() { + for { + select { + case <-rotateCh: + Common.Debug("Invoking log rotation") + loggers.FactoryRotate() + return + case <-ctx.Done(): + return + } + } + }() +} diff --git a/internal/io/dlog/source.go b/internal/io/dlog/source.go new file mode 100644 index 0000000..265885e --- /dev/null +++ b/internal/io/dlog/source.go @@ -0,0 +1,19 @@ +package dlog + +type source int + +const ( + CLIENT source = iota + SERVER source = iota +) + +func (s source) String() string { + switch s { + case CLIENT: + return "CLIENT" + case SERVER: + return "SERVER" + } + + panic("Unknown log source type") +} diff --git a/internal/io/dlog/strategy.go b/internal/io/dlog/strategy.go new file mode 100644 index 0000000..32d8298 --- /dev/null +++ b/internal/io/dlog/strategy.go @@ -0,0 +1,22 @@ +package dlog + +import "github.com/mimecast/dtail/internal/config" + +// Strategy allows to specify a log rotation strategy. +type Strategy int + +// Possible log strategies. +const ( + NormalStrategy Strategy = iota + DailyStrategy Strategy = iota + StdoutStrategy Strategy = iota +) + +func logStrategy() Strategy { + switch config.Common.LogStrategy { + case "daily": + return DailyStrategy + default: + } + return StdoutStrategy +} diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index cc5dd9b..bbcb74e 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -3,12 +3,12 @@ package permissions import ( - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + dlog.Common.Warn(user, filePath, "Not performing ACL check, not supported on this platform") return true, nil } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index ec33c60..07486a1 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -14,7 +14,7 @@ import ( "time" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/regex" @@ -62,7 +62,7 @@ func (f readFile) Retry() bool { // Start tailing a log file. func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { - logger.Debug("readFile", f) + dlog.Common.Debug("readFile", f) defer func() { select { case <-f.limiter: @@ -74,7 +74,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re case f.limiter <- struct{}{}: default: select { - case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): case <-ctx.Done(): return nil } @@ -126,7 +126,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") + dlog.Common.Info(f.FilePath(), "Detected gzip compression format") var gzipReader *gzip.Reader gzipReader, err = gzip.NewReader(fd) if err != nil { @@ -134,7 +134,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { } reader = bufio.NewReader(gzipReader) case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") + dlog.Common.Info(f.FilePath(), "Detected zstd compression format") reader = bufio.NewReader(zstd.NewReader(fd)) default: reader = bufio.NewReader(fd) @@ -172,7 +172,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu default: } if !f.seekEOF { - logger.Info(f.FilePath(), "End of file reached") + dlog.Common.Info(f.FilePath(), "End of file reached") return nil } time.Sleep(time.Millisecond * 100) @@ -201,7 +201,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu default: if message.Len() >= lineLengthThreshold { if !warnedAboutLongLine { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") + f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") warnedAboutLongLine = true } message.WriteString("\n") @@ -268,7 +268,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, r // Check wether log file is truncated. Returns nil if not. func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") + dlog.Common.Debug(f.filePath, "File truncation check") // Can not seek currently open FD. curPos, err := fd.Seek(0, os.SEEK_CUR) diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 6890201..6a6b5ec 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -1,5 +1,7 @@ package logger +// TODO: Rewrite this logger + import ( "bufio" "context" @@ -64,12 +66,25 @@ var resumeCh chan struct{} // Tell the logger about logrotation var rotateCh chan os.Signal +// Override the logger with a custom callack (e.g. for the t.Log for unit tests) +type unitTestCallback func(message string) + +var unitTestOkCb unitTestCallback +var unitTestErrorCb unitTestCallback + // Helper type to make logging non-blocking. type buf struct { time time.Time message string } +// StartUnitTests enables to log all messages to the unit tests. +func StartUnitTests(ctx context.Context, okCb, errCb unitTestCallback) { + unitTestOkCb = okCb + unitTestErrorCb = errCb + Start(ctx, Modes{UnitTest: true}) +} + // Start logging. func Start(ctx context.Context, mode Modes) { Mode = mode @@ -91,12 +106,12 @@ func Start(ctx context.Context, mode Modes) { switch strategy { case DailyStrategy: _, err := os.Stat(config.Common.LogDir) - Mode.logToFile = !os.IsNotExist(err) + Mode.logToFile = !os.IsNotExist(err) && !Mode.UnitTest Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet case StdoutStrategy: fallthrough default: - Mode.logToFile = !Mode.Server + Mode.logToFile = !Mode.Server && !Mode.UnitTest Mode.logToStdout = true } @@ -182,8 +197,8 @@ func Fatal(args ...interface{}) string { return log(clientStr, fatalStr, args) } -// FatalExit logs an error and exists the process. -func FatalExit(args ...interface{}) { +// FatalPanic logs an error and exists the process. +func FatalPanic(args ...interface{}) { what := clientStr if Mode.Server { what = serverStr diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go index 8864179..85f90a5 100644 --- a/internal/io/logger/modes.go +++ b/internal/io/logger/modes.go @@ -2,11 +2,12 @@ package logger // Modes specifies the logging mode. type Modes struct { - Server bool - Trace bool Debug bool + logToFile bool + logToStdout bool Nothing bool Quiet bool - logToStdout bool - logToFile bool + Server bool + Trace bool + UnitTest bool } diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index 36ebdb5..7c3cdb5 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -6,7 +6,7 @@ import ( "os" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // Answer is a user input of a prompt question. @@ -58,7 +58,7 @@ func (p *Prompt) Add(answer Answer) { // Ask a question. func (p *Prompt) Ask() { reader := bufio.NewReader(os.Stdin) - logger.Pause() + dlog.Common.Pause() for { fmt.Print(p.askString()) @@ -70,7 +70,7 @@ func (p *Prompt) Ask() { } if !a.AskAgain { - logger.Resume() + dlog.Common.Resume() if a.EndCallback != nil { a.EndCallback() } -- cgit v1.2.3 From fcaa94c7453efa0d74e330128c0f5c2cde8f11b3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 26 Sep 2021 16:42:47 +0300 Subject: refactor config reader - also looks in additional search paths for config file unless NONE is specified --- internal/io/dlog/dlog.go | 37 ++++++++++++++++++++++++++++--------- internal/io/logger/logger.go | 2 -- 2 files changed, 28 insertions(+), 11 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 7282741..49b405d 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -3,6 +3,7 @@ package dlog import ( "context" "fmt" + "os" "strings" "sync" "time" @@ -21,7 +22,6 @@ var Client *DLog var Server *DLog // Common is the log handler for all other packages. -// TODO: Rename Common to Common var Common *DLog var mutex sync.Mutex @@ -75,15 +75,22 @@ type DLog struct { sourcePackage source // Max log level to log. maxLevel level + // Current hostname. + hostname string } // New creates a new DTail logger. func New(sourceProcess, sourcePackage source, impl loggers.Impl, maxLevel level) *DLog { + hostname, err := os.Hostname() + if err != nil { + panic(err) + } return &DLog{ logger: loggers.Factory(sourceProcess.String(), impl), sourceProcess: sourceProcess, sourcePackage: sourcePackage, maxLevel: maxLevel, + hostname: hostname, } } @@ -106,11 +113,18 @@ func (d *DLog) log(level level, args []interface{}) string { defer pool.RecycleBuilderBuffer(sb) now := time.Now() - sb.WriteString(d.sourcePackage.String()) - sb.WriteString(protocol.FieldDelimiter) - sb.WriteString(now.Format("20060102-150405")) - sb.WriteString(protocol.FieldDelimiter) - sb.WriteString(level.String()) + switch d.sourceProcess { + case CLIENT: + sb.WriteString(d.sourcePackage.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(d.hostname) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(level.String()) + default: + sb.WriteString(level.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(now.Format("20060102-150405")) + } sb.WriteString(protocol.FieldDelimiter) d.writeArgStrings(sb, args) @@ -159,6 +173,11 @@ func (d *DLog) Warn(args ...interface{}) string { } func (d *DLog) Info(args ...interface{}) string { + if d.sourcePackage == SERVER && d.sourceProcess != CLIENT { + // This can be dtail client in serverless mode. In this case log all + // info server messages as verbose. + return d.log(VERBOSE, args) + } return d.log(INFO, args) } @@ -183,13 +202,14 @@ func (d *DLog) Raw(message string) string { d.logger.Log(time.Now(), message) return message } - - d.logger.Log(time.Now(), brush.Colorfy(message)) + d.logger.LogWithColors(time.Now(), message, brush.Colorfy(message)) return message } func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args := make([]interface{}, len(data)+1) + + // TODO: mC compatible SERVER mapreduce fields, no MAPREDUCE keyword in CLIENT mode args[0] = fmt.Sprintf("%s:%s", "MAPREDUCE", strings.ToUpper(table)) i := 1 @@ -197,7 +217,6 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args[i] = fmt.Sprintf("%s=%v", k, v) i++ } - return d.log(INFO, args) } diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index 6a6b5ec..905d1cf 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -1,7 +1,5 @@ package logger -// TODO: Rewrite this logger - import ( "bufio" "context" -- cgit v1.2.3 From 609921f9c783941eaa9019a92b78ec45b49d681c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 28 Sep 2021 21:11:50 +0300 Subject: can have daily and normal file log rotation --- internal/io/dlog/dlog.go | 14 +- internal/io/dlog/loggers/factory.go | 8 +- internal/io/dlog/loggers/file.go | 117 +++++----- internal/io/dlog/loggers/fout.go | 4 +- internal/io/dlog/strategy.go | 22 -- internal/io/logger/logger.go | 443 ------------------------------------ internal/io/logger/modes.go | 13 -- internal/io/logger/strategy.go | 22 -- 8 files changed, 78 insertions(+), 565 deletions(-) delete mode 100644 internal/io/dlog/strategy.go delete mode 100644 internal/io/logger/logger.go delete mode 100644 internal/io/logger/modes.go delete mode 100644 internal/io/logger/strategy.go (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 49b405d..49533a5 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -36,19 +36,21 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLev Common.FatalPanic("Logger already started") } + strategy := loggers.GetStrategy(config.Common.LogStrategy) level := newLevel(logLevel) + switch sourceProcess { case CLIENT: // This is a DTail client process running. impl := loggers.FOUT - Client = New(CLIENT, CLIENT, impl, level) - Server = New(CLIENT, SERVER, impl, level) + Client = New(CLIENT, CLIENT, level, impl, strategy) + Server = New(CLIENT, SERVER, level, impl, strategy) Common = Client case SERVER: // This is a DTail server process running. impl := loggers.FILE - Client = New(SERVER, CLIENT, impl, level) - Server = New(SERVER, SERVER, impl, level) + Client = New(SERVER, CLIENT, level, impl, strategy) + Server = New(SERVER, SERVER, level, impl, strategy) Common = Server } @@ -80,13 +82,13 @@ type DLog struct { } // New creates a new DTail logger. -func New(sourceProcess, sourcePackage source, impl loggers.Impl, maxLevel level) *DLog { +func New(sourceProcess, sourcePackage source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) } return &DLog{ - logger: loggers.Factory(sourceProcess.String(), impl), + logger: loggers.Factory(sourceProcess.String(), impl, strategy), sourceProcess: sourceProcess, sourcePackage: sourcePackage, maxLevel: maxLevel, diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index 3eb29c5..8697dc4 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -17,11 +17,11 @@ const ( var factoryMap map[string]Logger var factoryMutex sync.Mutex -func Factory(name string, impl Impl) Logger { +func Factory(name string, impl Impl, strategy Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() - id := fmt.Sprintf("name:%s,impl:%v", name, impl) + id := fmt.Sprintf("name:%s,fileBase:%s,impl:%v", name, strategy.FileBase, impl) if factoryMap == nil { factoryMap = make(map[string]Logger) @@ -36,10 +36,10 @@ func Factory(name string, impl Impl) Logger { singleton = newStdout() factoryMap[id] = singleton case FILE: - singleton = newFile() + singleton = newFile(strategy) factoryMap[id] = singleton case FOUT: - singleton = newFout() + singleton = newFout(strategy) factoryMap[id] = singleton } } diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 1c525c9..dcdd7d0 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -12,49 +12,54 @@ import ( "github.com/mimecast/dtail/internal/config" ) +type fileWriter struct { +} + type fileMessageBuf struct { now time.Time message string } type file struct { - bufferCh chan *fileMessageBuf - pauseCh chan struct{} - resumeCh chan struct{} - rotateCh chan struct{} - flushCh chan struct{} - lastDateStr string - fd *os.File - writer *bufio.Writer - mutex sync.Mutex - started bool + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool + lastFileName string + strategy Strategy } -func newFile() *file { +func newFile(strategy Strategy) *file { f := file{ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), rotateCh: make(chan struct{}), flushCh: make(chan struct{}), + strategy: strategy, } - f.getWriter(time.Now().Format("20060102")) + return &f } -func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { + f.mutex.Lock() + defer f.mutex.Unlock() // Logger already started from another Goroutine. - if s.started { + if f.started { wg.Done() return } pause := func(ctx context.Context) { select { - case <-s.resumeCh: + case <-f.resumeCh: return case <-ctx.Done(): return @@ -66,55 +71,61 @@ func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { for { select { - case m := <-s.bufferCh: - s.write(m) - case <-s.pauseCh: + case m := <-f.bufferCh: + f.write(m) + case <-f.pauseCh: pause(ctx) - case <-s.flushCh: - s.flush() + case <-f.flushCh: + f.flush() case <-ctx.Done(): - s.flush() - s.fd.Close() + f.flush() + f.fd.Close() return } } }() - s.started = true + f.started = true } -func (s *file) Log(now time.Time, message string) { - s.bufferCh <- &fileMessageBuf{now, message} +func (f *file) Log(now time.Time, message string) { + f.bufferCh <- &fileMessageBuf{now, message} } -func (s *file) LogWithColors(now time.Time, message, coloredMessage string) { +func (f *file) LogWithColors(now time.Time, message, coloredMessage string) { panic("Colors not supported in file logger") } -func (s *file) Pause() { s.pauseCh <- struct{}{} } -func (s *file) Resume() { s.resumeCh <- struct{}{} } -func (s *file) Flush() { s.flushCh <- struct{}{} } +func (f *file) Pause() { f.pauseCh <- struct{}{} } +func (f *file) Resume() { f.resumeCh <- struct{}{} } +func (f *file) Flush() { f.flushCh <- struct{}{} } // TODO: Test that Rotate() actually works. -func (s *file) Rotate() { s.rotateCh <- struct{}{} } -func (file) SupportsColors() bool { return false } +func (f *file) Rotate() { f.rotateCh <- struct{}{} } +func (*file) SupportsColors() bool { return false } -func (s *file) write(m *fileMessageBuf) { +func (f *file) write(m *fileMessageBuf) { select { - case <-s.rotateCh: - // Force re-opening the outfile. - s.lastDateStr = "" + case <-f.rotateCh: + // Force re-opening the outfile next time in getWriter. + f.lastFileName = "" default: } - writer := s.getWriter(m.now.Format("20060102")) + var writer *bufio.Writer + if f.strategy.Rotation == DailyRotation { + writer = f.getWriter(m.now.Format("20060102")) + } else { + writer = f.getWriter(f.strategy.FileBase) + } + writer.WriteString(m.message) writer.WriteByte('\n') } -func (s *file) getWriter(dateStr string) *bufio.Writer { - if s.lastDateStr == dateStr { - return s.writer +func (f *file) getWriter(name string) *bufio.Writer { + if f.lastFileName == name { + return f.writer } if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { @@ -123,32 +134,32 @@ func (s *file) getWriter(dateStr string) *bufio.Writer { } } - logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name) newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) if err != nil { panic(err) } // Close old writer. - if s.fd != nil { - s.writer.Flush() - s.fd.Close() + if f.fd != nil { + f.writer.Flush() + f.fd.Close() } - s.fd = newFd - s.writer = bufio.NewWriterSize(s.fd, 1) - s.lastDateStr = dateStr + f.fd = newFd + f.writer = bufio.NewWriterSize(f.fd, 1) + f.lastFileName = name - return s.writer + return f.writer } -func (s *file) flush() { - defer s.writer.Flush() +func (f *file) flush() { + defer f.writer.Flush() for { select { - case m := <-s.bufferCh: - s.write(m) + case m := <-f.bufferCh: + f.write(m) default: return } diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go index 603dbe9..60c318d 100644 --- a/internal/io/dlog/loggers/fout.go +++ b/internal/io/dlog/loggers/fout.go @@ -12,8 +12,8 @@ type fout struct { } // Logs to both, a file and stdout -func newFout() *fout { - return &fout{file: newFile(), stdout: newStdout()} +func newFout(strategy Strategy) *fout { + return &fout{file: newFile(strategy), stdout: newStdout()} } func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) { diff --git a/internal/io/dlog/strategy.go b/internal/io/dlog/strategy.go deleted file mode 100644 index 32d8298..0000000 --- a/internal/io/dlog/strategy.go +++ /dev/null @@ -1,22 +0,0 @@ -package dlog - -import "github.com/mimecast/dtail/internal/config" - -// Strategy allows to specify a log rotation strategy. -type Strategy int - -// Possible log strategies. -const ( - NormalStrategy Strategy = iota - DailyStrategy Strategy = iota - StdoutStrategy Strategy = iota -) - -func logStrategy() Strategy { - switch config.Common.LogStrategy { - case "daily": - return DailyStrategy - default: - } - return StdoutStrategy -} diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go deleted file mode 100644 index 905d1cf..0000000 --- a/internal/io/logger/logger.go +++ /dev/null @@ -1,443 +0,0 @@ -package logger - -import ( - "bufio" - "context" - "fmt" - "os" - "os/signal" - "runtime" - "strings" - "sync" - "syscall" - "time" - - "github.com/mimecast/dtail/internal/color/brush" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/protocol" -) - -const ( - clientStr string = "CLIENT" - serverStr string = "SERVER" - infoStr string = "INFO" - warnStr string = "WARN" - errorStr string = "ERROR" - fatalStr string = "FATAL" - debugStr string = "DEBUG" - traceStr string = "TRACE" -) - -// Mode specifies the configured logging mode(s) -var Mode Modes - -// Strategy is the current log strattegy used. -var strategy Strategy - -// Synchronise access to logging. -var mutex sync.Mutex - -// File descriptor of log file when Mode.logToFile enabled. -var fd *os.File - -// File write buffer of log file when Mode.logToFile enabled. -var writer *bufio.Writer - -// File write buffer of stdout when Mode.logToStdout enabled. -var stdoutWriter *bufio.Writer - -// Current hostname. -var hostname string - -// Used to detect change of day (create one log file per day0 -var lastDateStr string - -// Used to make logging non-blocking. -var fileLogBufCh chan buf -var stdoutBufCh chan string - -// Stdout channel, required to pause output -var pauseCh chan struct{} -var resumeCh chan struct{} - -// Tell the logger about logrotation -var rotateCh chan os.Signal - -// Override the logger with a custom callack (e.g. for the t.Log for unit tests) -type unitTestCallback func(message string) - -var unitTestOkCb unitTestCallback -var unitTestErrorCb unitTestCallback - -// Helper type to make logging non-blocking. -type buf struct { - time time.Time - message string -} - -// StartUnitTests enables to log all messages to the unit tests. -func StartUnitTests(ctx context.Context, okCb, errCb unitTestCallback) { - unitTestOkCb = okCb - unitTestErrorCb = errCb - Start(ctx, Modes{UnitTest: true}) -} - -// Start logging. -func Start(ctx context.Context, mode Modes) { - Mode = mode - - switch { - case Mode.Nothing: - return - case Mode.Quiet: - Mode.Trace = false - Mode.Debug = false - case Mode.Trace: - Mode.Debug = true - default: - } - - strategy := logStrategy() - stdoutWriter = bufio.NewWriter(os.Stdout) - - switch strategy { - case DailyStrategy: - _, err := os.Stat(config.Common.LogDir) - Mode.logToFile = !os.IsNotExist(err) && !Mode.UnitTest - Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet - case StdoutStrategy: - fallthrough - default: - Mode.logToFile = !Mode.Server && !Mode.UnitTest - Mode.logToStdout = true - } - - fqdn, err := os.Hostname() - if err != nil { - panic(err) - } - s := strings.Split(fqdn, ".") - hostname = s[0] - - pauseCh = make(chan struct{}) - resumeCh = make(chan struct{}) - - // Setup logrotation - rotateCh = make(chan os.Signal, 1) - signal.Notify(rotateCh, syscall.SIGHUP) - - if Mode.logToStdout { - stdoutBufCh = make(chan string, runtime.NumCPU()*100) - go writeToStdout(ctx) - } - - if Mode.logToFile { - fileLogBufCh = make(chan buf, runtime.NumCPU()*100) - go writeToFile(ctx) - } -} - -// Info message logging. -func Info(args ...interface{}) string { - if Mode.Server { - return log(serverStr, infoStr, args) - } - - return log(clientStr, infoStr, args) -} - -// Mapreduce message logging. -func Mapreduce(table string, data map[string]interface{}) string { - args := make([]interface{}, len(data)+1) - - args[0] = fmt.Sprintf("MAPREDUCE:%s", strings.ToUpper(table)) - i := 1 - for k, v := range data { - args[i] = fmt.Sprintf("%s=%v", k, v) - i++ - } - - if Mode.Server { - return log(serverStr, infoStr, args) - } - - return log(clientStr, infoStr, args) -} - -// Warn message logging. -func Warn(args ...interface{}) string { - if !Mode.Quiet { - if Mode.Server { - return log(serverStr, warnStr, args) - } - return log(clientStr, warnStr, args) - } - - return "" -} - -// Error message logging. -func Error(args ...interface{}) string { - if Mode.Server { - return log(serverStr, errorStr, args) - } - - return log(clientStr, errorStr, args) -} - -// Fatal message logging. -func Fatal(args ...interface{}) string { - if Mode.Server { - return log(serverStr, fatalStr, args) - } - - return log(clientStr, fatalStr, args) -} - -// FatalPanic logs an error and exists the process. -func FatalPanic(args ...interface{}) { - what := clientStr - if Mode.Server { - what = serverStr - } - log(what, fatalStr, args) - - time.Sleep(time.Second) - mutex.Lock() - defer mutex.Unlock() - - closeWriter() - os.Exit(3) -} - -// Debug message logging. -func Debug(args ...interface{}) string { - if Mode.Debug { - if Mode.Server { - return log(serverStr, debugStr, args) - } - return log(clientStr, debugStr, args) - } - - return "" -} - -// Trace message logging. -func Trace(args ...interface{}) string { - if Mode.Trace { - if Mode.Server { - return log(serverStr, traceStr, args) - } - return log(clientStr, traceStr, args) - } - - return "" -} - -// Write log line to buffer and/or log file. -func write(what, severity, message string) { - if Mode.logToStdout { - line := fmt.Sprintf("%s|%s|%s|%s", what, hostname, severity, message) - - if config.Client.TermColorsEnable { - line = brush.Colorfy(line) - } - - stdoutBufCh <- line - } - - if Mode.logToFile { - t := time.Now() - timeStr := t.Format("20060102-150405") - fileLogBufCh <- buf{ - time: t, - message: fmt.Sprintf("%s|%s|%s|%s", severity, timeStr, what, message), - } - } -} - -// Generig log message. -func log(what string, severity string, args []interface{}) string { - if Mode.Nothing { - return "" - } - - sb := pool.BuilderBuffer.Get().(*strings.Builder) - - for i, arg := range args { - if i > 0 { - sb.WriteString(protocol.FieldDelimiter) - } - - switch v := arg.(type) { - case string: - sb.WriteString(v) - case int: - sb.WriteString(fmt.Sprintf("%d", v)) - case error: - sb.WriteString(v.Error()) - default: - sb.WriteString(fmt.Sprintf("%v", v)) - } - } - - message := sb.String() - pool.RecycleBuilderBuffer(sb) - write(what, severity, message) - return fmt.Sprintf("%s|%s", severity, message) -} - -// Raw message logging. -func Raw(message string) { - if Mode.Nothing { - return - } - - if Mode.logToFile { - fileLogBufCh <- buf{time.Now(), message} - } - - if Mode.logToStdout { - if config.Client.TermColorsEnable { - message = brush.Colorfy(message) - } - stdoutBufCh <- message - } -} - -// Close log writer (e.g. on change of day). -func closeWriter() { - if writer != nil { - writer.Flush() - fd.Close() - } -} - -// Return the correct log file writer -func fileWriter(dateStr string) *bufio.Writer { - if dateStr != lastDateStr { - return updateFileWriter(dateStr) - } - - // Check for log rotation signal - select { - case <-rotateCh: - stdoutWriter.WriteString("Received signal for logrotation\n") - return updateFileWriter(dateStr) - default: - } - - return writer -} - -// Update log file writer -func updateFileWriter(dateStr string) *bufio.Writer { - // Detected change of day. Close current writer and create a new one. - mutex.Lock() - defer mutex.Unlock() - closeWriter() - - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { - if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { - panic(err) - } - } - - logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) - newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - panic(err) - } - - fd = newFd - writer = bufio.NewWriterSize(fd, 1) - lastDateStr = dateStr - - return writer -} - -// Flush all outstanding lines. -func Flush() { - for { - select { - case message := <-stdoutBufCh: - stdoutWriter.WriteString(message) - stdoutWriter.WriteString("\n") - default: - stdoutWriter.Flush() - return - } - } -} - -func writeToStdout(ctx context.Context) { - for { - select { - case message := <-stdoutBufCh: - stdoutWriter.WriteString(message) - stdoutWriter.WriteString("\n") - case <-time.After(time.Millisecond * 100): - stdoutWriter.Flush() - case <-pauseCh: - PAUSE: - for { - select { - case <-stdoutBufCh: - case <-resumeCh: - break PAUSE - case <-ctx.Done(): - return - } - } - case <-ctx.Done(): - Flush() - return - } - } -} - -func writeToFile(ctx context.Context) { - for { - select { - case buf := <-fileLogBufCh: - dateStr := buf.time.Format("20060102") - w := fileWriter(dateStr) - w.WriteString(buf.message) - w.WriteString("\n") - case <-pauseCh: - PAUSE: - for { - select { - case <-stdoutBufCh: - case <-resumeCh: - break PAUSE - case <-ctx.Done(): - return - } - } - case <-ctx.Done(): - return - } - } -} - -// Pause logging. -func Pause() { - if Mode.logToStdout { - pauseCh <- struct{}{} - } - if Mode.logToFile { - pauseCh <- struct{}{} - } -} - -// Resume logging (after pausing). -func Resume() { - if Mode.logToStdout { - resumeCh <- struct{}{} - } - if Mode.logToFile { - resumeCh <- struct{}{} - } -} diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go deleted file mode 100644 index 85f90a5..0000000 --- a/internal/io/logger/modes.go +++ /dev/null @@ -1,13 +0,0 @@ -package logger - -// Modes specifies the logging mode. -type Modes struct { - Debug bool - logToFile bool - logToStdout bool - Nothing bool - Quiet bool - Server bool - Trace bool - UnitTest bool -} diff --git a/internal/io/logger/strategy.go b/internal/io/logger/strategy.go deleted file mode 100644 index 44bf393..0000000 --- a/internal/io/logger/strategy.go +++ /dev/null @@ -1,22 +0,0 @@ -package logger - -import "github.com/mimecast/dtail/internal/config" - -// Strategy allows to specify a log rotation strategy. -type Strategy int - -// Possible log strategies. -const ( - NormalStrategy Strategy = iota - DailyStrategy Strategy = iota - StdoutStrategy Strategy = iota -) - -func logStrategy() Strategy { - switch config.Common.LogStrategy { - case "daily": - return DailyStrategy - default: - } - return StdoutStrategy -} -- cgit v1.2.3 From 764ef99a3d779a0db1fb60679292af52425ba2f6 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 10:46:47 +0300 Subject: add more default fields to MAPREDUCE --- internal/io/dlog/dlog.go | 38 ++++++++++++++++++++++++++++++++++++-- internal/io/dlog/loggers/file.go | 1 - 2 files changed, 36 insertions(+), 3 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 49533a5..bc9b2f8 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -3,7 +3,11 @@ package dlog import ( "context" "fmt" + "io/ioutil" "os" + "path/filepath" + "runtime" + "strconv" "strings" "sync" "time" @@ -211,8 +215,38 @@ func (d *DLog) Raw(message string) string { func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args := make([]interface{}, len(data)+1) - // TODO: mC compatible SERVER mapreduce fields, no MAPREDUCE keyword in CLIENT mode - args[0] = fmt.Sprintf("%s:%s", "MAPREDUCE", strings.ToUpper(table)) + if d.sourceProcess == SERVER { + // level|date-time|process|caller|cpus|goroutines|cgocalls|loadavg|uptime|MAPREDUCE:TABLE|key=value|... + + var loadAvg string + if loadAvgBytes, err := ioutil.ReadFile("/proc/loadavg"); err == nil { + tmp := string(loadAvgBytes) + s := strings.SplitN(tmp, " ", 2) + loadAvg = s[0] + } + + var uptime string + if uptimeBytes, err := ioutil.ReadFile("/proc/uptime"); err == nil { + tmp := string(uptimeBytes) + s := strings.SplitN(tmp, ".", 2) + i, _ := strconv.ParseInt(s[0], 10, 64) + t := time.Duration(i) * time.Second + uptime = fmt.Sprintf("%v", t) + } + + _, file, line, _ := runtime.Caller(1) + args[0] = fmt.Sprintf("%d|%s:%d|%d|%d|%d|%s|%s|MAPREDUCE:%s", + os.Getpid(), + filepath.Base(file), line, + runtime.NumCPU(), + runtime.NumGoroutine(), + runtime.NumCgoCall(), + loadAvg, + uptime, + strings.ToUpper(table)) + } else { + args[0] = fmt.Sprintf("STATS:%s", strings.ToUpper(table)) + } i := 1 for k, v := range data { diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index dcdd7d0..6e692a3 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -100,7 +100,6 @@ func (f *file) Pause() { f.pauseCh <- struct{}{} } func (f *file) Resume() { f.resumeCh <- struct{}{} } func (f *file) Flush() { f.flushCh <- struct{}{} } -// TODO: Test that Rotate() actually works. func (f *file) Rotate() { f.rotateCh <- struct{}{} } func (*file) SupportsColors() bool { return false } -- cgit v1.2.3 From 6e1af993924bc7bebe898b403962db5a6b3505d1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 11:23:08 +0300 Subject: Client default log dir is ~/log --- internal/io/dlog/dlog.go | 27 ++++++++++++++------------- internal/io/dlog/loggers/strategy.go | 27 +++++++++++++++++++++++++++ internal/io/dlog/source.go | 19 ------------------- 3 files changed, 41 insertions(+), 32 deletions(-) create mode 100644 internal/io/dlog/loggers/strategy.go delete mode 100644 internal/io/dlog/source.go (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index bc9b2f8..3de3120 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -17,6 +17,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog/loggers" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" + "github.com/mimecast/dtail/internal/source" ) // Client is the log handler for the client packages. @@ -32,7 +33,7 @@ var mutex sync.Mutex var started bool // Start logger(s). -func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLevel string) { +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, logLevel string) { mutex.Lock() defer mutex.Unlock() @@ -44,17 +45,17 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLev level := newLevel(logLevel) switch sourceProcess { - case CLIENT: + case source.Client: // This is a DTail client process running. impl := loggers.FOUT - Client = New(CLIENT, CLIENT, level, impl, strategy) - Server = New(CLIENT, SERVER, level, impl, strategy) + Client = New(source.Client, source.Client, level, impl, strategy) + Server = New(source.Client, source.Server, level, impl, strategy) Common = Client - case SERVER: + case source.Server: // This is a DTail server process running. impl := loggers.FILE - Client = New(SERVER, CLIENT, level, impl, strategy) - Server = New(SERVER, SERVER, level, impl, strategy) + Client = New(source.Server, source.Client, level, impl, strategy) + Server = New(source.Server, source.Server, level, impl, strategy) Common = Server } @@ -75,10 +76,10 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLev type DLog struct { logger loggers.Logger // Is this a DTail server or client process logging? - sourceProcess source + sourceProcess source.Source // Is this a DTail server or client package logging? In serverless mode // the client can also execute code from the server package. - sourcePackage source + sourcePackage source.Source // Max log level to log. maxLevel level // Current hostname. @@ -86,7 +87,7 @@ type DLog struct { } // New creates a new DTail logger. -func New(sourceProcess, sourcePackage source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog { +func New(sourceProcess, sourcePackage source.Source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) @@ -120,7 +121,7 @@ func (d *DLog) log(level level, args []interface{}) string { now := time.Now() switch d.sourceProcess { - case CLIENT: + case source.Client: sb.WriteString(d.sourcePackage.String()) sb.WriteString(protocol.FieldDelimiter) sb.WriteString(d.hostname) @@ -179,7 +180,7 @@ func (d *DLog) Warn(args ...interface{}) string { } func (d *DLog) Info(args ...interface{}) string { - if d.sourcePackage == SERVER && d.sourceProcess != CLIENT { + if d.sourcePackage == source.Server && d.sourceProcess != source.Client { // This can be dtail client in serverless mode. In this case log all // info server messages as verbose. return d.log(VERBOSE, args) @@ -215,7 +216,7 @@ func (d *DLog) Raw(message string) string { func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args := make([]interface{}, len(data)+1) - if d.sourceProcess == SERVER { + if d.sourceProcess == source.Server { // level|date-time|process|caller|cpus|goroutines|cgocalls|loadavg|uptime|MAPREDUCE:TABLE|key=value|... var loadAvg string diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go new file mode 100644 index 0000000..a1b9355 --- /dev/null +++ b/internal/io/dlog/loggers/strategy.go @@ -0,0 +1,27 @@ +package loggers + +import ( + "os" + "path/filepath" +) + +type Rotation int + +const ( + DailyRotation Rotation = iota + SignalRotation Rotation = iota +) + +type Strategy struct { + Rotation Rotation + FileBase string +} + +func GetStrategy(name string) Strategy { + switch name { + case "daily": + return Strategy{DailyRotation, ""} + default: + return Strategy{SignalRotation, filepath.Base(os.Args[0])} + } +} diff --git a/internal/io/dlog/source.go b/internal/io/dlog/source.go deleted file mode 100644 index 265885e..0000000 --- a/internal/io/dlog/source.go +++ /dev/null @@ -1,19 +0,0 @@ -package dlog - -type source int - -const ( - CLIENT source = iota - SERVER source = iota -) - -func (s source) String() string { - switch s { - case CLIENT: - return "CLIENT" - case SERVER: - return "SERVER" - } - - panic("Unknown log source type") -} -- cgit v1.2.3 From 12c79f68bb5bda6673819d7b754820ecfe6d08ff Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 11:54:07 +0300 Subject: reduce logging in serverless mode --- internal/io/dlog/dlog.go | 5 ----- 1 file changed, 5 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 3de3120..6cacfe2 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -180,11 +180,6 @@ func (d *DLog) Warn(args ...interface{}) string { } func (d *DLog) Info(args ...interface{}) string { - if d.sourcePackage == source.Server && d.sourceProcess != source.Client { - // This can be dtail client in serverless mode. In this case log all - // info server messages as verbose. - return d.log(VERBOSE, args) - } return d.log(INFO, args) } -- cgit v1.2.3 From 86ec83754e0ee7153ad55091f7b6da448bc529c5 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 13:44:27 +0300 Subject: add dcat test --- internal/io/dlog/loggers/stdout.go | 59 +++++++++++++------------------------- internal/io/fs/readfile.go | 10 +------ 2 files changed, 21 insertions(+), 48 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go index 9738323..05485c6 100644 --- a/internal/io/dlog/loggers/stdout.go +++ b/internal/io/dlog/loggers/stdout.go @@ -8,66 +8,47 @@ import ( ) type stdout struct { - bufferCh chan string pauseCh chan struct{} resumeCh chan struct{} + mutex sync.Mutex } func newStdout() *stdout { return &stdout{ - bufferCh: make(chan string, 100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), } } func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) { - pause := func(ctx context.Context) { - select { - case <-s.resumeCh: - return - case <-ctx.Done(): - return - } - } - - go func() { - defer wg.Done() - - for { - select { - case message := <-s.bufferCh: - fmt.Println(message) - case <-s.pauseCh: - pause(ctx) - case <-ctx.Done(): - s.Flush() - return - } - } - }() + wg.Done() } func (s *stdout) Log(now time.Time, message string) { - s.bufferCh <- message + s.log(message) } func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) { - s.bufferCh <- coloredMessage + s.log(coloredMessage) } -func (s *stdout) Flush() { - for { - select { - case message := <-s.bufferCh: - fmt.Println(message) - default: - return - } +func (s *stdout) log(message string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + select { + case <-s.pauseCh: + // Pause until resumed. + <-s.resumeCh + default: } + + fmt.Println(message) } -func (s *stdout) Pause() { s.pauseCh <- struct{}{} } -func (s *stdout) Resume() { s.resumeCh <- struct{}{} } -func (s *stdout) Rotate() {} +func (s *stdout) Pause() { s.pauseCh <- struct{}{} } +func (s *stdout) Resume() { s.resumeCh <- struct{}{} } +func (s *stdout) Flush() {} +func (s *stdout) Rotate() {} + func (stdout) SupportsColors() bool { return true } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 07486a1..f128c07 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -13,8 +13,8 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/regex" @@ -182,14 +182,6 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu switch b { case '\n': - /* - // dcat/dgrep should actually transfer empty lines - if message.Len() == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - */ - //message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) -- cgit v1.2.3 From 07e1470892beacf0722276f94bacbd822b002540 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 3 Oct 2021 13:09:32 +0300 Subject: add dmap tests --- internal/io/dlog/dlog.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 6cacfe2..2beda75 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -163,8 +163,11 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { func (d *DLog) FatalPanic(args ...interface{}) { d.log(FATAL, args) - d.logger.Flush() - panic("Not recovering from this fatal error...") + d.Flush() + + var sb strings.Builder + d.writeArgStrings(&sb, args) + panic(sb.String()) } func (d *DLog) Fatal(args ...interface{}) string { -- cgit v1.2.3 From f70622f307629a2542ea5eb128dea8c1043d3a40 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 10:00:38 +0300 Subject: more on this --- internal/io/dlog/dlog.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 2beda75..db99307 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -57,6 +57,12 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, Client = New(source.Server, source.Client, level, impl, strategy) Server = New(source.Server, source.Server, level, impl, strategy) Common = Server + case source.HealthCheck: + // Health check isn't logging anything. + impl := loggers.STDOUT + Client = New(source.HealthCheck, source.Client, level, impl, strategy) + Server = New(source.HealthCheck, source.Server, level, impl, strategy) + Common = Client } var wg2 sync.WaitGroup -- cgit v1.2.3 From fab5dc3e70434ea0abc7a0976487a1973b662331 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 09:50:41 +0300 Subject: enable faster shutdown - useful for dgrep/dmap and dcat commands --- internal/io/dlog/dlog.go | 2 ++ internal/io/fs/permissions/permission.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index db99307..a17d6e9 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -201,6 +201,8 @@ func (d *DLog) Debug(args ...interface{}) string { } func (d *DLog) Trace(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(TRACE, args) } diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index bbcb74e..e80dbb2 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -9,6 +9,6 @@ import ( // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - dlog.Common.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in") return true, nil } -- cgit v1.2.3 From 7306afe9ab073c424ddca0ddc57950f237948118 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 10:55:50 +0300 Subject: move health check to separate client binary --- internal/io/dlog/dlog.go | 2 +- internal/io/signal/signal.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index a17d6e9..2ae3b04 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -59,7 +59,7 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, Common = Server case source.HealthCheck: // Health check isn't logging anything. - impl := loggers.STDOUT + impl := loggers.NONE Client = New(source.HealthCheck, source.Client, level, impl, strategy) Server = New(source.HealthCheck, source.Server, level, impl, strategy) Common = Client diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index 500c530..14056c4 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -44,3 +44,8 @@ func InterruptCh(ctx context.Context) <-chan string { return statsCh } + +// NoCh doesn't listen on a signal. +func NoCh(ctx context.Context) <-chan string { + return make(chan string) +} -- cgit v1.2.3 From 2d7ddbeae8286373ac19787dc7dde598a7cb0598 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 8 Oct 2021 11:43:43 +0300 Subject: refactor --- internal/io/dlog/dlog.go | 53 ++++++++++----------- internal/io/dlog/level.go | 95 ++++++++++++++++--------------------- internal/io/dlog/loggers/factory.go | 36 ++++++-------- 3 files changed, 80 insertions(+), 104 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 2ae3b04..f3774ba 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -33,7 +33,7 @@ var mutex sync.Mutex var started bool // Start logger(s). -func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, logLevel string) { +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) { mutex.Lock() defer mutex.Unlock() @@ -41,27 +41,18 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, Common.FatalPanic("Logger already started") } - strategy := loggers.GetStrategy(config.Common.LogStrategy) - level := newLevel(logLevel) - switch sourceProcess { case source.Client: - // This is a DTail client process running. - impl := loggers.FOUT - Client = New(source.Client, source.Client, level, impl, strategy) - Server = New(source.Client, source.Server, level, impl, strategy) + Client = New(source.Client, source.Client) + Server = New(source.Client, source.Server) Common = Client case source.Server: - // This is a DTail server process running. - impl := loggers.FILE - Client = New(source.Server, source.Client, level, impl, strategy) - Server = New(source.Server, source.Server, level, impl, strategy) + Client = New(source.Server, source.Client) + Server = New(source.Server, source.Server) Common = Server case source.HealthCheck: - // Health check isn't logging anything. - impl := loggers.NONE - Client = New(source.HealthCheck, source.Client, level, impl, strategy) - Server = New(source.HealthCheck, source.Server, level, impl, strategy) + Client = New(source.HealthCheck, source.Client) + Server = New(source.HealthCheck, source.Server) Common = Client } @@ -93,16 +84,20 @@ type DLog struct { } // New creates a new DTail logger. -func New(sourceProcess, sourcePackage source.Source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog { +func New(sourceProcess, sourcePackage source.Source) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) } + strategy := loggers.GetStrategy(config.Common.LogStrategy) + loggerName := config.Common.Logger + level := newLevel(config.Common.LogLevel) + return &DLog{ - logger: loggers.Factory(sourceProcess.String(), impl, strategy), + logger: loggers.Factory(sourceProcess.String(), loggerName, strategy), sourceProcess: sourceProcess, sourcePackage: sourcePackage, - maxLevel: maxLevel, + maxLevel: level, hostname: hostname, } } @@ -168,7 +163,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { } func (d *DLog) FatalPanic(args ...interface{}) { - d.log(FATAL, args) + d.log(Fatal, args) d.Flush() var sb strings.Builder @@ -177,37 +172,37 @@ func (d *DLog) FatalPanic(args ...interface{}) { } func (d *DLog) Fatal(args ...interface{}) string { - return d.log(FATAL, args) + return d.log(Fatal, args) } func (d *DLog) Error(args ...interface{}) string { - return d.log(ERROR, args) + return d.log(Error, args) } func (d *DLog) Warn(args ...interface{}) string { - return d.log(WARN, args) + return d.log(Warn, args) } func (d *DLog) Info(args ...interface{}) string { - return d.log(INFO, args) + return d.log(Info, args) } func (d *DLog) Verbose(args ...interface{}) string { - return d.log(VERBOSE, args) + return d.log(Verbose, args) } func (d *DLog) Debug(args ...interface{}) string { - return d.log(DEBUG, args) + return d.log(Debug, args) } func (d *DLog) Trace(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) - return d.log(TRACE, args) + return d.log(Trace, args) } func (d *DLog) Devel(args ...interface{}) string { - return d.log(DEVEL, args) + return d.log(Devel, args) } func (d *DLog) Raw(message string) string { @@ -260,7 +255,7 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args[i] = fmt.Sprintf("%s=%v", k, v) i++ } - return d.log(INFO, args) + return d.log(Info, args) } func (d *DLog) Flush() { d.logger.Flush() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 84550f0..248ad83 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -8,80 +8,69 @@ import ( type level int const ( - FATAL level = iota - ERROR level = iota - WARN level = iota - INFO level = iota - DEFAULT level = iota - VERBOSE level = iota - DEBUG level = iota - DEVEL level = iota - TRACE level = iota - ALL level = iota + Fatal level = iota + Error level = iota + Warn level = iota + Info level = iota + Default level = iota + Verbose level = iota + Debug level = iota + Devel level = iota + Trace level = iota + All level = iota ) -var allLevels = []level{ - FATAL, - ERROR, - WARN, - INFO, - DEFAULT, - VERBOSE, - DEBUG, - DEVEL, - TRACE, - ALL, -} +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All} func newLevel(l string) level { - switch strings.ToUpper(l) { - case "FATAL": - return FATAL - case "ERROR": - return ERROR - case "WARN": - return WARN - case "INFO": - return INFO + switch strings.ToLower(l) { + case "fatal": + return Fatal + case "error": + return Error + case "warn": + return Warn + case "info": + return Info case "": fallthrough - case "DEFAULT": - return DEFAULT - case "VERBOSE": - return VERBOSE - case "DEBUG": - return DEBUG - case "DEVEL": - return DEVEL - case "TRACE": - return TRACE - case "ALL": - return ALL + case "default": + return Default + case "verbose": + return Verbose + case "debug": + return Debug + case "devel": + return Devel + case "trace": + return Trace + case "all": + return All } panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels)) } func (l level) String() string { switch l { - case FATAL: + case Fatal: return "FATAL" - case ERROR: + case Error: return "ERROR" - case WARN: + case Warn: return "WARN" - case INFO: + case Info: return "INFO" - case DEFAULT: + case Default: return "DEFAULT" - case VERBOSE: + case Verbose: return "VERBOSE" - case DEBUG: + case Debug: return "DEBUG" - case DEVEL: + case Devel: return "DEVEL" - case TRACE: + case Trace: return "TRACE" - case ALL: + case All: return "ALL" } diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index 8697dc4..dda3ee6 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -2,45 +2,38 @@ package loggers import ( "fmt" + "strings" "sync" ) -type Impl int - -const ( - NONE Impl = iota - STDOUT Impl = iota - FILE Impl = iota - FOUT Impl = iota -) - var factoryMap map[string]Logger var factoryMutex sync.Mutex -func Factory(name string, impl Impl, strategy Strategy) Logger { +func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() - id := fmt.Sprintf("name:%s,fileBase:%s,impl:%v", name, strategy.FileBase, impl) - + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName) if factoryMap == nil { factoryMap = make(map[string]Logger) } singleton, ok := factoryMap[id] if !ok { - switch impl { - case NONE: + switch strings.ToLower(loggerName) { + case "none": singleton = none{} - case STDOUT: + case "stdout": singleton = newStdout() factoryMap[id] = singleton - case FILE: - singleton = newFile(strategy) + case "file": + singleton = newFile(rotationStrategy) factoryMap[id] = singleton - case FOUT: - singleton = newFout(strategy) + case "fout": + singleton = newFout(rotationStrategy) factoryMap[id] = singleton + default: + panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) } } @@ -53,8 +46,7 @@ func FactoryRotate() { if factoryMap == nil { return } - - for _, impl := range factoryMap { - impl.Rotate() + for _, logger := range factoryMap { + logger.Rotate() } } -- cgit v1.2.3 From 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 16:44:28 +0300 Subject: add dtail health check unit test. --- internal/io/dlog/dlog.go | 43 +++++++++++++++++----------------------- internal/io/dlog/loggers/file.go | 10 ++++++---- 2 files changed, 24 insertions(+), 29 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index f3774ba..28e6882 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -41,32 +41,25 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) Common.FatalPanic("Logger already started") } - switch sourceProcess { - case source.Client: - Client = New(source.Client, source.Client) - Server = New(source.Client, source.Server) - Common = Client - case source.Server: - Client = New(source.Server, source.Client) - Server = New(source.Server, source.Server) + Client = new(sourceProcess, source.Client) + Server = new(sourceProcess, source.Server) + Common = Client + if sourceProcess == source.Server { Common = Server - case source.HealthCheck: - Client = New(source.HealthCheck, source.Client) - Server = New(source.HealthCheck, source.Server) - Common = Client } var wg2 sync.WaitGroup wg2.Add(2) - Client.start(ctx, &wg2) - Server.start(ctx, &wg2) - started = true + go Client.start(ctx, &wg2) + go Server.start(ctx, &wg2) go rotation(ctx) go func() { wg2.Wait() wg.Done() }() + + started = true } // DLog is the DTail logger. @@ -83,8 +76,8 @@ type DLog struct { hostname string } -// New creates a new DTail logger. -func New(sourceProcess, sourcePackage source.Source) *DLog { +// new creates a new DTail logger. +func new(sourceProcess, sourcePackage source.Source) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) @@ -103,14 +96,12 @@ func New(sourceProcess, sourcePackage source.Source) *DLog { } func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { - go func() { - defer wg.Done() - var wg2 sync.WaitGroup - wg2.Add(1) - d.logger.Start(ctx, &wg2) - <-ctx.Done() - wg2.Wait() - }() + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() } func (d *DLog) log(level level, args []interface{}) string { @@ -202,6 +193,8 @@ func (d *DLog) Trace(args ...interface{}) string { } func (d *DLog) Devel(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Devel, args) } diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 6e692a3..87280fd 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer { if f.lastFileName == name { return f.writer } - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { panic(err) @@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer { f.writer.Flush() f.fd.Close() } - + // Set new writer. f.fd = newFd f.writer = bufio.NewWriterSize(f.fd, 1) f.lastFileName = name @@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer { } func (f *file) flush() { - defer f.writer.Flush() - + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() for { select { case m := <-f.bufferCh: -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/io/dlog/dlog.go | 22 ++++++++++++++--- internal/io/dlog/level.go | 5 ++-- internal/io/dlog/loggers/factory.go | 6 +++-- internal/io/dlog/loggers/file.go | 17 ++++++------- internal/io/dlog/loggers/logger.go | 1 + internal/io/dlog/loggers/strategy.go | 11 +++++++-- internal/io/fs/catfile.go | 4 +++- internal/io/fs/filereader.go | 3 ++- internal/io/fs/permissions/permission_linuxacl.go | 2 +- internal/io/fs/readfile.go | 29 ++++++++++++----------- internal/io/fs/tailfile.go | 4 +++- internal/io/pool/builder.go | 3 +++ internal/io/pool/bytesbuffer.go | 3 +++ internal/io/prompt/prompt.go | 7 ++---- internal/io/signal/signal.go | 3 --- 15 files changed, 75 insertions(+), 45 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 28e6882..da67585 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -82,7 +82,7 @@ func new(sourceProcess, sourcePackage source.Source) *DLog { if err != nil { panic(err) } - strategy := loggers.GetStrategy(config.Common.LogStrategy) + strategy := loggers.NewStrategy(config.Common.LogStrategy) loggerName := config.Common.Logger level := newLevel(config.Common.LogLevel) @@ -153,6 +153,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { } } +// FatalPanic terminates the process with a fatal error. func (d *DLog) FatalPanic(args ...interface{}) { d.log(Fatal, args) d.Flush() @@ -162,42 +163,51 @@ func (d *DLog) FatalPanic(args ...interface{}) { panic(sb.String()) } +// Fatal logs a fatal error. func (d *DLog) Fatal(args ...interface{}) string { return d.log(Fatal, args) } +// Error logging. func (d *DLog) Error(args ...interface{}) string { return d.log(Error, args) } +// Warn logs a warning message. func (d *DLog) Warn(args ...interface{}) string { return d.log(Warn, args) } +// Info logging. func (d *DLog) Info(args ...interface{}) string { return d.log(Info, args) } +// Verbose logging. func (d *DLog) Verbose(args ...interface{}) string { return d.log(Verbose, args) } +// Debug logging. func (d *DLog) Debug(args ...interface{}) string { return d.log(Debug, args) } +// Trace logging. func (d *DLog) Trace(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Trace, args) } +// Devel used for development purpose only logging (e.g. "print" debugging). func (d *DLog) Devel(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(Devel, args) } +// Raw message logging. func (d *DLog) Raw(message string) string { if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { d.logger.Log(time.Now(), message) @@ -207,6 +217,7 @@ func (d *DLog) Raw(message string) string { return message } +// Mapreduce logging. func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args := make([]interface{}, len(data)+1) @@ -251,6 +262,11 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { return d.log(Info, args) } -func (d *DLog) Flush() { d.logger.Flush() } -func (d *DLog) Pause() { d.logger.Pause() } +// Flush the log buffers. +func (d *DLog) Flush() { d.logger.Flush() } + +// Pause the logging. +func (d *DLog) Pause() { d.logger.Pause() } + +// Resume the logging. func (d *DLog) Resume() { d.logger.Resume() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 248ad83..0971094 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -7,6 +7,7 @@ import ( type level int +// Available log levels. const ( Fatal level = iota Error level = iota @@ -20,7 +21,8 @@ const ( All level = iota ) -var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All} +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, + Devel, Trace, All} func newLevel(l string) level { switch strings.ToLower(l) { @@ -73,6 +75,5 @@ func (l level) String() string { case All: return "ALL" } - panic("Unknown log level " + fmt.Sprintf("%d", l)) } diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index dda3ee6..415d7fb 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -9,11 +9,13 @@ import ( var factoryMap map[string]Logger var factoryMutex sync.Mutex +// Factory is there to retrieve a logger based on various settings. func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() - id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName) + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, + rotationStrategy.FileBase, loggerName) if factoryMap == nil { factoryMap = make(map[string]Logger) } @@ -36,10 +38,10 @@ func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) } } - return singleton } +// FactoryRotate invokes a log rotation of all loggers. func FactoryRotate() { factoryMutex.Lock() defer factoryMutex.Unlock() diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 87280fd..94824fe 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -12,8 +12,7 @@ import ( "github.com/mimecast/dtail/internal/config" ) -type fileWriter struct { -} +type fileWriter struct{} type fileMessageBuf struct { now time.Time @@ -35,7 +34,7 @@ type file struct { } func newFile(strategy Strategy) *file { - f := file{ + return &file{ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), @@ -43,16 +42,17 @@ func newFile(strategy Strategy) *file { flushCh: make(chan struct{}), strategy: strategy, } - - return &f } func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { f.mutex.Lock() - defer f.mutex.Unlock() + defer func() { + f.started = true + f.mutex.Unlock() + }() - // Logger already started from another Goroutine. if f.started { + // Logger already started from another Goroutine. wg.Done() return } @@ -68,7 +68,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { go func() { defer wg.Done() - for { select { case m := <-f.bufferCh: @@ -84,8 +83,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { } } }() - - f.started = true } func (f *file) Log(now time.Time, message string) { diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go index c88900d..d4e85de 100644 --- a/internal/io/dlog/loggers/logger.go +++ b/internal/io/dlog/loggers/logger.go @@ -6,6 +6,7 @@ import ( "time" ) +// Logger is there to plug in your own log implementation. type Logger interface { Log(now time.Time, message string) LogWithColors(now time.Time, message, messageWithColors string) diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go index a1b9355..25d10f0 100644 --- a/internal/io/dlog/loggers/strategy.go +++ b/internal/io/dlog/loggers/strategy.go @@ -5,19 +5,26 @@ import ( "path/filepath" ) +// Rotation is the actual strategy used for log rotation.. type Rotation int const ( - DailyRotation Rotation = iota + // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP. + DailyRotation Rotation = iota + // SignalRotation tells DTail to rotate its logs only on SIGHUP. SignalRotation Rotation = iota ) +// Strategy is a pair of the rotation and the file base. type Strategy struct { + // Rotation is the actual rotation strategy used. Rotation Rotation + // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used. FileBase string } -func GetStrategy(name string) Strategy { +// NewStrategy returns the stratey based on its name. +func NewStrategy(name string) Strategy { switch name { case "daily": return Strategy{DailyRotation, ""} diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index 7f387bc..01c15ba 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 0774837..7773142 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -7,7 +7,8 @@ import ( "github.com/mimecast/dtail/internal/regex" ) -// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. +// FileReader is the interface used on the dtail server to read/cat/grep/mapr... +// a file. type FileReader interface { Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error FilePath() string diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go index 7d2d7ca..904b90f 100644 --- a/internal/io/fs/permissions/permission_linuxacl.go +++ b/internal/io/fs/permissions/permission_linuxacl.go @@ -13,7 +13,7 @@ import ( "unsafe" ) -// ToRead checks whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index f128c07..92f85b6 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -42,7 +42,8 @@ type readFile struct { // String returns the string representation of the readFile func (f readFile) String() string { - return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", + return fmt.Sprintf( + "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", f.filePath, f.globID, f.retry, @@ -61,7 +62,9 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lines chan<- line.Line, + re regex.Regex) error { + dlog.Common.Debug("readFile", f) defer func() { select { @@ -74,7 +77,8 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re case f.limiter <- struct{}{}: default: select { - case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, + "Server limit reached. Queuing file..."): case <-ctx.Done(): return nil } @@ -139,13 +143,11 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { default: reader = bufio.NewReader(fd) } - return } func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 - reader, err := f.makeReader(fd) if err != nil { return err @@ -193,7 +195,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu default: if message.Len() >= lineLengthThreshold { if !warnedAboutLongLine { - f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") warnedAboutLongLine = true } message.WriteString("\n") @@ -210,9 +213,10 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() +func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + defer wg.Done() for { select { case line, ok := <-rawLines: @@ -231,9 +235,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha } } -func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { - var read line.Line +func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, + re regex.Regex) (line.Line, bool) { + var read line.Line if !re.Match(lineBytes.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() @@ -254,7 +259,6 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, r Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), } - return read, true } @@ -267,7 +271,6 @@ func (f readFile) truncated(fd *os.File) (bool, error) { if err != nil { return true, err } - // Can not open file at original path. pathFd, err := os.Open(f.filePath) if err != nil { @@ -280,10 +283,8 @@ func (f readFile) truncated(fd *os.File) (bool, error) { if err != nil { return true, err } - if curPos > pathPos { return true, errors.New("File got truncated") } - return false, nil } diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 14994e5..b03b45d 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go index c9dc221..89fcf81 100644 --- a/internal/io/pool/builder.go +++ b/internal/io/pool/builder.go @@ -5,6 +5,8 @@ import ( "sync" ) +// BuilderBuffer is there to optimize memory allocations (DTail allocates a lot +// of memory while reading log data otherwise) var BuilderBuffer = sync.Pool{ New: func() interface{} { sb := strings.Builder{} @@ -12,6 +14,7 @@ var BuilderBuffer = sync.Pool{ }, } +// RecycleBuilderBuffer recycles the buffer again. func RecycleBuilderBuffer(sb *strings.Builder) { sb.Reset() BuilderBuffer.Put(sb) diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go index 0a159f5..3d48f2c 100644 --- a/internal/io/pool/bytesbuffer.go +++ b/internal/io/pool/bytesbuffer.go @@ -5,6 +5,8 @@ import ( "sync" ) +// BytesBuffer is there to optimize memory allocations. DTail otherwise allocates +// a lot of memory while reading logs. var BytesBuffer = sync.Pool{ New: func() interface{} { b := bytes.Buffer{} @@ -13,6 +15,7 @@ var BytesBuffer = sync.Pool{ }, } +// RecycleBytesBuffer recycles the buffer again. func RecycleBytesBuffer(b *bytes.Buffer) { b.Reset() BytesBuffer.Put(b) diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index 7c3cdb5..e82132d 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -19,7 +19,8 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - AskAgain bool + // AskAgain can be used to not to ask again about the question. + AskAgain bool } // Prompt used for interactive user input. @@ -30,7 +31,6 @@ type Prompt struct { func (p *Prompt) askString() string { var sb strings.Builder - sb.WriteString(p.question) sb.WriteString("? (") @@ -41,7 +41,6 @@ func (p *Prompt) askString() string { sb.WriteString(strings.Join(ax, ",")) sb.WriteString("): ") - return sb.String() } @@ -68,7 +67,6 @@ func (p *Prompt) Ask() { if a.Callback != nil { a.Callback() } - if !a.AskAgain { dlog.Common.Resume() if a.EndCallback != nil { @@ -90,6 +88,5 @@ func (p *Prompt) answer(answerStr string) (*Answer, bool) { default: } } - return nil, false } diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index 14056c4..584b59c 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -14,10 +14,8 @@ import ( func InterruptCh(ctx context.Context) <-chan string { sigIntCh := make(chan os.Signal) gosignal.Notify(sigIntCh, os.Interrupt) - sigOtherCh := make(chan os.Signal) gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) - statsCh := make(chan string) go func() { @@ -41,7 +39,6 @@ func InterruptCh(ctx context.Context) <-chan string { } } }() - return statsCh } -- cgit v1.2.3 From f44792c9102488774c9993b080f35c65287a64b1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 10 Oct 2021 14:02:12 +0300 Subject: add another dmap test - reading 100 source files at once fix a data race when reading multiple files on one server from the same session at once --- internal/io/dlog/level.go | 5 +++++ internal/io/fs/permissions/permission.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'internal/io') diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 0971094..05d9ed9 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -9,6 +9,7 @@ type level int // Available log levels. const ( + None level = iota Fatal level = iota Error level = iota Warn level = iota @@ -26,6 +27,8 @@ var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, func newLevel(l string) level { switch strings.ToLower(l) { + case "none": + return None case "fatal": return Fatal case "error": @@ -54,6 +57,8 @@ func newLevel(l string) level { func (l level) String() string { switch l { + case None: + return "NONE" case Fatal: return "FATAL" case Error: diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index e80dbb2..d621c09 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -9,6 +9,6 @@ import ( // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in") + dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in") return true, nil } -- cgit v1.2.3 From a6098084f7150df34edecf1519386bd28a527361 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 11 Oct 2021 17:42:37 +0300 Subject: Update JSON-schema to reflect all recent config file changes. --- internal/io/dlog/dlog.go | 4 ++-- internal/io/dlog/loggers/factory.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'internal/io') diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index da67585..5e0c3a1 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -82,12 +82,12 @@ func new(sourceProcess, sourcePackage source.Source) *DLog { if err != nil { panic(err) } - strategy := loggers.NewStrategy(config.Common.LogStrategy) + logRotation := loggers.NewStrategy(config.Common.LogRotation) loggerName := config.Common.Logger level := newLevel(config.Common.LogLevel) return &DLog{ - logger: loggers.Factory(sourceProcess.String(), loggerName, strategy), + logger: loggers.Factory(sourceProcess.String(), loggerName, logRotation), sourceProcess: sourceProcess, sourcePackage: sourcePackage, maxLevel: level, diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index 415d7fb..a5cc7cf 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -10,12 +10,12 @@ var factoryMap map[string]Logger var factoryMutex sync.Mutex // Factory is there to retrieve a logger based on various settings. -func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { +func Factory(sourceName, loggerName string, logRotation Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, - rotationStrategy.FileBase, loggerName) + logRotation.FileBase, loggerName) if factoryMap == nil { factoryMap = make(map[string]Logger) } @@ -29,10 +29,10 @@ func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { singleton = newStdout() factoryMap[id] = singleton case "file": - singleton = newFile(rotationStrategy) + singleton = newFile(logRotation) factoryMap[id] = singleton case "fout": - singleton = newFout(rotationStrategy) + singleton = newFout(logRotation) factoryMap[id] = singleton default: panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) -- cgit v1.2.3 From 1dead22129a26e4f532e68c2c63fe4122b519506 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 13 Oct 2021 21:10:28 +0300 Subject: Merging grep context from master --- internal/io/fs/filereader.go | 4 +- internal/io/fs/readfile.go | 149 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 144 insertions(+), 9 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 7773142..b05fd39 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,13 +4,15 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... // a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 92f85b6..88d467e 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -16,6 +16,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -62,8 +63,8 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, - re regex.Regex) error { +func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, + lines chan<- line.Line, re regex.Regex) error { dlog.Common.Debug("readFile", f) defer func() { @@ -102,7 +103,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, wg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + go f.filter(ctx, ltx, &wg, rawLines, lines, re) err = f.read(ctx, fd, rawLines, truncate) close(rawLines) @@ -213,10 +214,27 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, + wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, + re regex.Regex) { defer wg.Done() + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if ltx.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, ltx, rawLines, lines, re) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, + lines chan<- line.Line, re regex.Regex) { + for { select { case line, ok := <-rawLines: @@ -235,11 +253,126 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, } } -func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Scenario 1: Finish once maxCount hits found + maxCount := ltx.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := ltx.BeforeContext + processBefore := before > 0 + var beforeBuf chan *bytes.Buffer + if processBefore { + beforeBuf = make(chan *bytes.Buffer, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := ltx.AfterContext > 0 + + for lineBytesBuffer := range rawLines { + f.updatePosition() + + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- lineBytesBuffer: + default: + pool.RecycleBytesBuffer(<-beforeBuf) + beforeBuf <- lineBytesBuffer + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = ltx.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case lineBytesBuffer := <-beforeBuf: + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount() - i, + TransmittedPerc: 100, + } + i-- + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !re.Match(lineBytes.Bytes()) { + if !re.Match(lineBytesBuffer.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false @@ -254,7 +387,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, f.updateLineTransmitted() read = line.Line{ - Content: lineBytes, + Content: lineBytesBuffer, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), -- cgit v1.2.3 From 06ece112c0dd20c0c211c538216fe64ebe4045c9 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 14 Oct 2021 20:10:55 +0300 Subject: add dgrep context integration tests --- internal/io/fs/readfile.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'internal/io') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 88d467e..28cbe58 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -99,15 +99,24 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) + readCtx, readCancel := context.WithCancel(ctx) + var filterWg sync.WaitGroup + filterWg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, ltx, &wg, rawLines, lines, re) + go func() { + f.filter(ctx, ltx, rawLines, lines, re) + filterWg.Done() + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines, truncate) close(rawLines) - wg.Wait() + // Filter may sends some data still. So wait until it is done here. + filterWg.Wait() return err } @@ -215,10 +224,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu // Filter log lines matching a given regular expression. func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, - wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, - re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() // Do we have any kind of local context settings? If so then run the more complex // filterWithLContext method. if ltx.Has() { -- cgit v1.2.3 From ddfdb8e01aea4f8b6127834a14b74c5c534820d1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 19 Oct 2021 19:50:38 +0300 Subject: lowercase log rotation comparison --- internal/io/dlog/loggers/strategy.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'internal/io') diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go index 25d10f0..48e7d44 100644 --- a/internal/io/dlog/loggers/strategy.go +++ b/internal/io/dlog/loggers/strategy.go @@ -3,6 +3,7 @@ package loggers import ( "os" "path/filepath" + "strings" ) // Rotation is the actual strategy used for log rotation.. @@ -25,7 +26,7 @@ type Strategy struct { // NewStrategy returns the stratey based on its name. func NewStrategy(name string) Strategy { - switch name { + switch strings.ToLower(name) { case "daily": return Strategy{DailyRotation, ""} default: -- cgit v1.2.3