From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/io/dlog/loggers/file.go | 156 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 internal/io/dlog/loggers/file.go (limited to 'internal/io/dlog/loggers/file.go') diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go new file mode 100644 index 0000000..1c525c9 --- /dev/null +++ b/internal/io/dlog/loggers/file.go @@ -0,0 +1,156 @@ +package loggers + +import ( + "bufio" + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/mimecast/dtail/internal/config" +) + +type fileMessageBuf struct { + now time.Time + message string +} + +type file struct { + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + lastDateStr string + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool +} + +func newFile() *file { + f := file{ + bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + rotateCh: make(chan struct{}), + flushCh: make(chan struct{}), + } + f.getWriter(time.Now().Format("20060102")) + return &f +} + +func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Logger already started from another Goroutine. + if s.started { + wg.Done() + return + } + + pause := func(ctx context.Context) { + select { + case <-s.resumeCh: + return + case <-ctx.Done(): + return + } + } + + go func() { + defer wg.Done() + + for { + select { + case m := <-s.bufferCh: + s.write(m) + case <-s.pauseCh: + pause(ctx) + case <-s.flushCh: + s.flush() + case <-ctx.Done(): + s.flush() + s.fd.Close() + return + } + } + }() + + s.started = true +} + +func (s *file) Log(now time.Time, message string) { + s.bufferCh <- &fileMessageBuf{now, message} +} + +func (s *file) LogWithColors(now time.Time, message, coloredMessage string) { + panic("Colors not supported in file logger") +} + +func (s *file) Pause() { s.pauseCh <- struct{}{} } +func (s *file) Resume() { s.resumeCh <- struct{}{} } +func (s *file) Flush() { s.flushCh <- struct{}{} } + +// TODO: Test that Rotate() actually works. +func (s *file) Rotate() { s.rotateCh <- struct{}{} } +func (file) SupportsColors() bool { return false } + +func (s *file) write(m *fileMessageBuf) { + select { + case <-s.rotateCh: + // Force re-opening the outfile. + s.lastDateStr = "" + default: + } + + writer := s.getWriter(m.now.Format("20060102")) + writer.WriteString(m.message) + writer.WriteByte('\n') +} + +func (s *file) getWriter(dateStr string) *bufio.Writer { + if s.lastDateStr == dateStr { + return s.writer + } + + if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { + if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { + panic(err) + } + } + + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) + newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + panic(err) + } + + // Close old writer. + if s.fd != nil { + s.writer.Flush() + s.fd.Close() + } + + s.fd = newFd + s.writer = bufio.NewWriterSize(s.fd, 1) + s.lastDateStr = dateStr + + return s.writer +} + +func (s *file) flush() { + defer s.writer.Flush() + + for { + select { + case m := <-s.bufferCh: + s.write(m) + default: + return + } + } +} -- cgit v1.2.3 From 609921f9c783941eaa9019a92b78ec45b49d681c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 28 Sep 2021 21:11:50 +0300 Subject: can have daily and normal file log rotation --- internal/io/dlog/loggers/file.go | 117 +++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 53 deletions(-) (limited to 'internal/io/dlog/loggers/file.go') diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 1c525c9..dcdd7d0 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -12,49 +12,54 @@ import ( "github.com/mimecast/dtail/internal/config" ) +type fileWriter struct { +} + type fileMessageBuf struct { now time.Time message string } type file struct { - bufferCh chan *fileMessageBuf - pauseCh chan struct{} - resumeCh chan struct{} - rotateCh chan struct{} - flushCh chan struct{} - lastDateStr string - fd *os.File - writer *bufio.Writer - mutex sync.Mutex - started bool + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool + lastFileName string + strategy Strategy } -func newFile() *file { +func newFile(strategy Strategy) *file { f := file{ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), rotateCh: make(chan struct{}), flushCh: make(chan struct{}), + strategy: strategy, } - f.getWriter(time.Now().Format("20060102")) + return &f } -func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { + f.mutex.Lock() + defer f.mutex.Unlock() // Logger already started from another Goroutine. - if s.started { + if f.started { wg.Done() return } pause := func(ctx context.Context) { select { - case <-s.resumeCh: + case <-f.resumeCh: return case <-ctx.Done(): return @@ -66,55 +71,61 @@ func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) { for { select { - case m := <-s.bufferCh: - s.write(m) - case <-s.pauseCh: + case m := <-f.bufferCh: + f.write(m) + case <-f.pauseCh: pause(ctx) - case <-s.flushCh: - s.flush() + case <-f.flushCh: + f.flush() case <-ctx.Done(): - s.flush() - s.fd.Close() + f.flush() + f.fd.Close() return } } }() - s.started = true + f.started = true } -func (s *file) Log(now time.Time, message string) { - s.bufferCh <- &fileMessageBuf{now, message} +func (f *file) Log(now time.Time, message string) { + f.bufferCh <- &fileMessageBuf{now, message} } -func (s *file) LogWithColors(now time.Time, message, coloredMessage string) { +func (f *file) LogWithColors(now time.Time, message, coloredMessage string) { panic("Colors not supported in file logger") } -func (s *file) Pause() { s.pauseCh <- struct{}{} } -func (s *file) Resume() { s.resumeCh <- struct{}{} } -func (s *file) Flush() { s.flushCh <- struct{}{} } +func (f *file) Pause() { f.pauseCh <- struct{}{} } +func (f *file) Resume() { f.resumeCh <- struct{}{} } +func (f *file) Flush() { f.flushCh <- struct{}{} } // TODO: Test that Rotate() actually works. -func (s *file) Rotate() { s.rotateCh <- struct{}{} } -func (file) SupportsColors() bool { return false } +func (f *file) Rotate() { f.rotateCh <- struct{}{} } +func (*file) SupportsColors() bool { return false } -func (s *file) write(m *fileMessageBuf) { +func (f *file) write(m *fileMessageBuf) { select { - case <-s.rotateCh: - // Force re-opening the outfile. - s.lastDateStr = "" + case <-f.rotateCh: + // Force re-opening the outfile next time in getWriter. + f.lastFileName = "" default: } - writer := s.getWriter(m.now.Format("20060102")) + var writer *bufio.Writer + if f.strategy.Rotation == DailyRotation { + writer = f.getWriter(m.now.Format("20060102")) + } else { + writer = f.getWriter(f.strategy.FileBase) + } + writer.WriteString(m.message) writer.WriteByte('\n') } -func (s *file) getWriter(dateStr string) *bufio.Writer { - if s.lastDateStr == dateStr { - return s.writer +func (f *file) getWriter(name string) *bufio.Writer { + if f.lastFileName == name { + return f.writer } if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { @@ -123,32 +134,32 @@ func (s *file) getWriter(dateStr string) *bufio.Writer { } } - logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name) newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) if err != nil { panic(err) } // Close old writer. - if s.fd != nil { - s.writer.Flush() - s.fd.Close() + if f.fd != nil { + f.writer.Flush() + f.fd.Close() } - s.fd = newFd - s.writer = bufio.NewWriterSize(s.fd, 1) - s.lastDateStr = dateStr + f.fd = newFd + f.writer = bufio.NewWriterSize(f.fd, 1) + f.lastFileName = name - return s.writer + return f.writer } -func (s *file) flush() { - defer s.writer.Flush() +func (f *file) flush() { + defer f.writer.Flush() for { select { - case m := <-s.bufferCh: - s.write(m) + case m := <-f.bufferCh: + f.write(m) default: return } -- cgit v1.2.3 From 764ef99a3d779a0db1fb60679292af52425ba2f6 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 10:46:47 +0300 Subject: add more default fields to MAPREDUCE --- internal/io/dlog/loggers/file.go | 1 - 1 file changed, 1 deletion(-) (limited to 'internal/io/dlog/loggers/file.go') diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index dcdd7d0..6e692a3 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -100,7 +100,6 @@ func (f *file) Pause() { f.pauseCh <- struct{}{} } func (f *file) Resume() { f.resumeCh <- struct{}{} } func (f *file) Flush() { f.flushCh <- struct{}{} } -// TODO: Test that Rotate() actually works. func (f *file) Rotate() { f.rotateCh <- struct{}{} } func (*file) SupportsColors() bool { return false } -- cgit v1.2.3 From 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 16:44:28 +0300 Subject: add dtail health check unit test. --- internal/io/dlog/loggers/file.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'internal/io/dlog/loggers/file.go') diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 6e692a3..87280fd 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -126,7 +126,6 @@ func (f *file) getWriter(name string) *bufio.Writer { if f.lastFileName == name { return f.writer } - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { panic(err) @@ -144,7 +143,7 @@ func (f *file) getWriter(name string) *bufio.Writer { f.writer.Flush() f.fd.Close() } - + // Set new writer. f.fd = newFd f.writer = bufio.NewWriterSize(f.fd, 1) f.lastFileName = name @@ -153,8 +152,11 @@ func (f *file) getWriter(name string) *bufio.Writer { } func (f *file) flush() { - defer f.writer.Flush() - + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() for { select { case m := <-f.bufferCh: -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/io/dlog/loggers/file.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'internal/io/dlog/loggers/file.go') diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 87280fd..94824fe 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -12,8 +12,7 @@ import ( "github.com/mimecast/dtail/internal/config" ) -type fileWriter struct { -} +type fileWriter struct{} type fileMessageBuf struct { now time.Time @@ -35,7 +34,7 @@ type file struct { } func newFile(strategy Strategy) *file { - f := file{ + return &file{ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), @@ -43,16 +42,17 @@ func newFile(strategy Strategy) *file { flushCh: make(chan struct{}), strategy: strategy, } - - return &f } func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { f.mutex.Lock() - defer f.mutex.Unlock() + defer func() { + f.started = true + f.mutex.Unlock() + }() - // Logger already started from another Goroutine. if f.started { + // Logger already started from another Goroutine. wg.Done() return } @@ -68,7 +68,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { go func() { defer wg.Done() - for { select { case m := <-f.bufferCh: @@ -84,8 +83,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { } } }() - - f.started = true } func (f *file) Log(now time.Time, message string) { -- cgit v1.2.3