summaryrefslogtreecommitdiff
path: root/internal/io/dlog
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-19 13:22:59 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commitfe3e68afd99d8ea246be52893730f987e138ec24 (patch)
tree726e0914730912e0a3b223f7b37facc05ba31140 /internal/io/dlog
parentabeac87aec44249bf67f1b0eca471a31086265ca (diff)
move args to config package
logger package rewrite as dlog
Diffstat (limited to 'internal/io/dlog')
-rw-r--r--internal/io/dlog/dlog.go206
-rw-r--r--internal/io/dlog/level.go89
-rw-r--r--internal/io/dlog/loggers/factory.go60
-rw-r--r--internal/io/dlog/loggers/file.go156
-rw-r--r--internal/io/dlog/loggers/fout.go46
-rw-r--r--internal/io/dlog/loggers/logger.go18
-rw-r--r--internal/io/dlog/loggers/none.go21
-rw-r--r--internal/io/dlog/loggers/stdout.go73
-rw-r--r--internal/io/dlog/rotation.go27
-rw-r--r--internal/io/dlog/source.go19
-rw-r--r--internal/io/dlog/strategy.go22
11 files changed, 737 insertions, 0 deletions
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
new file mode 100644
index 0000000..7282741
--- /dev/null
+++ b/internal/io/dlog/dlog.go
@@ -0,0 +1,206 @@
+package dlog
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color/brush"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// Client is the log handler for the client packages.
+var Client *DLog
+
+// Server is the log handler for the server packages.
+var Server *DLog
+
+// Common is the log handler for all other packages.
+// TODO: Rename Common to Common
+var Common *DLog
+
+var mutex sync.Mutex
+var started bool
+
+// Start logger(s).
+func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source, logLevel string) {
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ if started {
+ Common.FatalPanic("Logger already started")
+ }
+
+ level := newLevel(logLevel)
+ switch sourceProcess {
+ case CLIENT:
+ // This is a DTail client process running.
+ impl := loggers.FOUT
+ Client = New(CLIENT, CLIENT, impl, level)
+ Server = New(CLIENT, SERVER, impl, level)
+ Common = Client
+ case SERVER:
+ // This is a DTail server process running.
+ impl := loggers.FILE
+ Client = New(SERVER, CLIENT, impl, level)
+ Server = New(SERVER, SERVER, impl, level)
+ Common = Server
+ }
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ Client.start(ctx, &wg2)
+ Server.start(ctx, &wg2)
+ started = true
+
+ go rotation(ctx)
+ go func() {
+ wg2.Wait()
+ wg.Done()
+ }()
+}
+
+// DLog is the DTail logger.
+type DLog struct {
+ logger loggers.Logger
+ // Is this a DTail server or client process logging?
+ sourceProcess source
+ // Is this a DTail server or client package logging? In serverless mode
+ // the client can also execute code from the server package.
+ sourcePackage source
+ // Max log level to log.
+ maxLevel level
+}
+
+// New creates a new DTail logger.
+func New(sourceProcess, sourcePackage source, impl loggers.Impl, maxLevel level) *DLog {
+ return &DLog{
+ logger: loggers.Factory(sourceProcess.String(), impl),
+ sourceProcess: sourceProcess,
+ sourcePackage: sourcePackage,
+ maxLevel: maxLevel,
+ }
+}
+
+func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) {
+ go func() {
+ defer wg.Done()
+ var wg2 sync.WaitGroup
+ wg2.Add(1)
+ d.logger.Start(ctx, &wg2)
+ <-ctx.Done()
+ wg2.Wait()
+ }()
+}
+
+func (d *DLog) log(level level, args []interface{}) string {
+ if d.maxLevel < level {
+ return ""
+ }
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+ now := time.Now()
+
+ sb.WriteString(d.sourcePackage.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(now.Format("20060102-150405"))
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(level.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ d.writeArgStrings(sb, args)
+
+ message := sb.String()
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(now, message)
+ return message
+ }
+
+ d.logger.LogWithColors(now, message, brush.Colorfy(message))
+ return message
+}
+
+func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) {
+ for i, arg := range args {
+ if i > 0 {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ switch v := arg.(type) {
+ case string:
+ sb.WriteString(v)
+ case error:
+ sb.WriteString(v.Error())
+ default:
+ sb.WriteString(fmt.Sprintf("%v", v))
+ }
+ }
+}
+
+func (d *DLog) FatalPanic(args ...interface{}) {
+ d.log(FATAL, args)
+ d.logger.Flush()
+ panic("Not recovering from this fatal error...")
+}
+
+func (d *DLog) Fatal(args ...interface{}) string {
+ return d.log(FATAL, args)
+}
+
+func (d *DLog) Error(args ...interface{}) string {
+ return d.log(ERROR, args)
+}
+
+func (d *DLog) Warn(args ...interface{}) string {
+ return d.log(WARN, args)
+}
+
+func (d *DLog) Info(args ...interface{}) string {
+ return d.log(INFO, args)
+}
+
+func (d *DLog) Verbose(args ...interface{}) string {
+ return d.log(VERBOSE, args)
+}
+
+func (d *DLog) Debug(args ...interface{}) string {
+ return d.log(DEBUG, args)
+}
+
+func (d *DLog) Trace(args ...interface{}) string {
+ return d.log(TRACE, args)
+}
+
+func (d *DLog) Devel(args ...interface{}) string {
+ return d.log(DEVEL, args)
+}
+
+func (d *DLog) Raw(message string) string {
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(time.Now(), message)
+ return message
+ }
+
+ d.logger.Log(time.Now(), brush.Colorfy(message))
+ return message
+}
+
+func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
+ args := make([]interface{}, len(data)+1)
+ args[0] = fmt.Sprintf("%s:%s", "MAPREDUCE", strings.ToUpper(table))
+
+ i := 1
+ for k, v := range data {
+ args[i] = fmt.Sprintf("%s=%v", k, v)
+ i++
+ }
+
+ return d.log(INFO, args)
+}
+
+func (d *DLog) Flush() { d.logger.Flush() }
+func (d *DLog) Pause() { d.logger.Pause() }
+func (d *DLog) Resume() { d.logger.Resume() }
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
new file mode 100644
index 0000000..84550f0
--- /dev/null
+++ b/internal/io/dlog/level.go
@@ -0,0 +1,89 @@
+package dlog
+
+import (
+ "fmt"
+ "strings"
+)
+
+type level int
+
+const (
+ FATAL level = iota
+ ERROR level = iota
+ WARN level = iota
+ INFO level = iota
+ DEFAULT level = iota
+ VERBOSE level = iota
+ DEBUG level = iota
+ DEVEL level = iota
+ TRACE level = iota
+ ALL level = iota
+)
+
+var allLevels = []level{
+ FATAL,
+ ERROR,
+ WARN,
+ INFO,
+ DEFAULT,
+ VERBOSE,
+ DEBUG,
+ DEVEL,
+ TRACE,
+ ALL,
+}
+
+func newLevel(l string) level {
+ switch strings.ToUpper(l) {
+ case "FATAL":
+ return FATAL
+ case "ERROR":
+ return ERROR
+ case "WARN":
+ return WARN
+ case "INFO":
+ return INFO
+ case "":
+ fallthrough
+ case "DEFAULT":
+ return DEFAULT
+ case "VERBOSE":
+ return VERBOSE
+ case "DEBUG":
+ return DEBUG
+ case "DEVEL":
+ return DEVEL
+ case "TRACE":
+ return TRACE
+ case "ALL":
+ return ALL
+ }
+ panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels))
+}
+
+func (l level) String() string {
+ switch l {
+ case FATAL:
+ return "FATAL"
+ case ERROR:
+ return "ERROR"
+ case WARN:
+ return "WARN"
+ case INFO:
+ return "INFO"
+ case DEFAULT:
+ return "DEFAULT"
+ case VERBOSE:
+ return "VERBOSE"
+ case DEBUG:
+ return "DEBUG"
+ case DEVEL:
+ return "DEVEL"
+ case TRACE:
+ return "TRACE"
+ case ALL:
+ return "ALL"
+ }
+
+ panic("Unknown log level " + fmt.Sprintf("%d", l))
+}
diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go
new file mode 100644
index 0000000..3eb29c5
--- /dev/null
+++ b/internal/io/dlog/loggers/factory.go
@@ -0,0 +1,60 @@
+package loggers
+
+import (
+ "fmt"
+ "sync"
+)
+
+type Impl int
+
+const (
+ NONE Impl = iota
+ STDOUT Impl = iota
+ FILE Impl = iota
+ FOUT Impl = iota
+)
+
+var factoryMap map[string]Logger
+var factoryMutex sync.Mutex
+
+func Factory(name string, impl Impl) Logger {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+
+ id := fmt.Sprintf("name:%s,impl:%v", name, impl)
+
+ if factoryMap == nil {
+ factoryMap = make(map[string]Logger)
+ }
+
+ singleton, ok := factoryMap[id]
+ if !ok {
+ switch impl {
+ case NONE:
+ singleton = none{}
+ case STDOUT:
+ singleton = newStdout()
+ factoryMap[id] = singleton
+ case FILE:
+ singleton = newFile()
+ factoryMap[id] = singleton
+ case FOUT:
+ singleton = newFout()
+ factoryMap[id] = singleton
+ }
+ }
+
+ return singleton
+}
+
+func FactoryRotate() {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+ if factoryMap == nil {
+ return
+ }
+
+ for _, impl := range factoryMap {
+ impl.Rotate()
+ }
+}
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
new file mode 100644
index 0000000..1c525c9
--- /dev/null
+++ b/internal/io/dlog/loggers/file.go
@@ -0,0 +1,156 @@
+package loggers
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+)
+
+type fileMessageBuf struct {
+ now time.Time
+ message string
+}
+
+type file struct {
+ bufferCh chan *fileMessageBuf
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+ rotateCh chan struct{}
+ flushCh chan struct{}
+ lastDateStr string
+ fd *os.File
+ writer *bufio.Writer
+ mutex sync.Mutex
+ started bool
+}
+
+func newFile() *file {
+ f := file{
+ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100),
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ rotateCh: make(chan struct{}),
+ flushCh: make(chan struct{}),
+ }
+ f.getWriter(time.Now().Format("20060102"))
+ return &f
+}
+
+func (s *file) Start(ctx context.Context, wg *sync.WaitGroup) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ // Logger already started from another Goroutine.
+ if s.started {
+ wg.Done()
+ return
+ }
+
+ pause := func(ctx context.Context) {
+ select {
+ case <-s.resumeCh:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ go func() {
+ defer wg.Done()
+
+ for {
+ select {
+ case m := <-s.bufferCh:
+ s.write(m)
+ case <-s.pauseCh:
+ pause(ctx)
+ case <-s.flushCh:
+ s.flush()
+ case <-ctx.Done():
+ s.flush()
+ s.fd.Close()
+ return
+ }
+ }
+ }()
+
+ s.started = true
+}
+
+func (s *file) Log(now time.Time, message string) {
+ s.bufferCh <- &fileMessageBuf{now, message}
+}
+
+func (s *file) LogWithColors(now time.Time, message, coloredMessage string) {
+ panic("Colors not supported in file logger")
+}
+
+func (s *file) Pause() { s.pauseCh <- struct{}{} }
+func (s *file) Resume() { s.resumeCh <- struct{}{} }
+func (s *file) Flush() { s.flushCh <- struct{}{} }
+
+// TODO: Test that Rotate() actually works.
+func (s *file) Rotate() { s.rotateCh <- struct{}{} }
+func (file) SupportsColors() bool { return false }
+
+func (s *file) write(m *fileMessageBuf) {
+ select {
+ case <-s.rotateCh:
+ // Force re-opening the outfile.
+ s.lastDateStr = ""
+ default:
+ }
+
+ writer := s.getWriter(m.now.Format("20060102"))
+ writer.WriteString(m.message)
+ writer.WriteByte('\n')
+}
+
+func (s *file) getWriter(dateStr string) *bufio.Writer {
+ if s.lastDateStr == dateStr {
+ return s.writer
+ }
+
+ if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
+ if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
+ panic(err)
+ }
+ }
+
+ logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr)
+ newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ panic(err)
+ }
+
+ // Close old writer.
+ if s.fd != nil {
+ s.writer.Flush()
+ s.fd.Close()
+ }
+
+ s.fd = newFd
+ s.writer = bufio.NewWriterSize(s.fd, 1)
+ s.lastDateStr = dateStr
+
+ return s.writer
+}
+
+func (s *file) flush() {
+ defer s.writer.Flush()
+
+ for {
+ select {
+ case m := <-s.bufferCh:
+ s.write(m)
+ default:
+ return
+ }
+ }
+}
diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go
new file mode 100644
index 0000000..603dbe9
--- /dev/null
+++ b/internal/io/dlog/loggers/fout.go
@@ -0,0 +1,46 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+type fout struct {
+ file *file
+ stdout *stdout
+}
+
+// Logs to both, a file and stdout
+func newFout() *fout {
+ return &fout{file: newFile(), stdout: newStdout()}
+}
+
+func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ go func() {
+ defer wg.Done()
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ f.file.Start(ctx, &wg2)
+ f.stdout.Start(ctx, &wg2)
+ wg2.Wait()
+ }()
+}
+
+func (f *fout) Log(now time.Time, message string) {
+ f.stdout.Log(now, message)
+ f.file.Log(now, message)
+}
+
+func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) {
+ f.stdout.LogWithColors(now, "", coloredMessage)
+ f.file.Log(now, message)
+}
+
+func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() }
+func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() }
+func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() }
+func (f *fout) Rotate() { f.file.Rotate() }
+
+func (fout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go
new file mode 100644
index 0000000..c88900d
--- /dev/null
+++ b/internal/io/dlog/loggers/logger.go
@@ -0,0 +1,18 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+type Logger interface {
+ Log(now time.Time, message string)
+ LogWithColors(now time.Time, message, messageWithColors string)
+ Start(ctx context.Context, wg *sync.WaitGroup)
+ Flush()
+ Pause()
+ Resume()
+ Rotate()
+ SupportsColors() bool
+}
diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go
new file mode 100644
index 0000000..270027f
--- /dev/null
+++ b/internal/io/dlog/loggers/none.go
@@ -0,0 +1,21 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+// don't log anything
+type none struct{}
+
+func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }
+func (none) Log(now time.Time, message string) {}
+
+func (none) LogWithColors(now time.Time, message, coloredMessage string) {}
+
+func (none) Flush() {}
+func (none) Pause() {}
+func (none) Resume() {}
+func (none) Rotate() {}
+func (none) SupportsColors() bool { return false }
diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go
new file mode 100644
index 0000000..9738323
--- /dev/null
+++ b/internal/io/dlog/loggers/stdout.go
@@ -0,0 +1,73 @@
+package loggers
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+)
+
+type stdout struct {
+ bufferCh chan string
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+}
+
+func newStdout() *stdout {
+ return &stdout{
+ bufferCh: make(chan string, 100),
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ }
+}
+
+func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ pause := func(ctx context.Context) {
+ select {
+ case <-s.resumeCh:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ go func() {
+ defer wg.Done()
+
+ for {
+ select {
+ case message := <-s.bufferCh:
+ fmt.Println(message)
+ case <-s.pauseCh:
+ pause(ctx)
+ case <-ctx.Done():
+ s.Flush()
+ return
+ }
+ }
+ }()
+}
+
+func (s *stdout) Log(now time.Time, message string) {
+ s.bufferCh <- message
+}
+
+func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) {
+ s.bufferCh <- coloredMessage
+}
+
+func (s *stdout) Flush() {
+ for {
+ select {
+ case message := <-s.bufferCh:
+ fmt.Println(message)
+ default:
+ return
+ }
+ }
+}
+
+func (s *stdout) Pause() { s.pauseCh <- struct{}{} }
+func (s *stdout) Resume() { s.resumeCh <- struct{}{} }
+func (s *stdout) Rotate() {}
+func (stdout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go
new file mode 100644
index 0000000..15ce1fd
--- /dev/null
+++ b/internal/io/dlog/rotation.go
@@ -0,0 +1,27 @@
+package dlog
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+)
+
+func rotation(ctx context.Context) {
+ rotateCh := make(chan os.Signal, 1)
+ signal.Notify(rotateCh, syscall.SIGHUP)
+ go func() {
+ for {
+ select {
+ case <-rotateCh:
+ Common.Debug("Invoking log rotation")
+ loggers.FactoryRotate()
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
diff --git a/internal/io/dlog/source.go b/internal/io/dlog/source.go
new file mode 100644
index 0000000..265885e
--- /dev/null
+++ b/internal/io/dlog/source.go
@@ -0,0 +1,19 @@
+package dlog
+
+type source int
+
+const (
+ CLIENT source = iota
+ SERVER source = iota
+)
+
+func (s source) String() string {
+ switch s {
+ case CLIENT:
+ return "CLIENT"
+ case SERVER:
+ return "SERVER"
+ }
+
+ panic("Unknown log source type")
+}
diff --git a/internal/io/dlog/strategy.go b/internal/io/dlog/strategy.go
new file mode 100644
index 0000000..32d8298
--- /dev/null
+++ b/internal/io/dlog/strategy.go
@@ -0,0 +1,22 @@
+package dlog
+
+import "github.com/mimecast/dtail/internal/config"
+
+// Strategy allows to specify a log rotation strategy.
+type Strategy int
+
+// Possible log strategies.
+const (
+ NormalStrategy Strategy = iota
+ DailyStrategy Strategy = iota
+ StdoutStrategy Strategy = iota
+)
+
+func logStrategy() Strategy {
+ switch config.Common.LogStrategy {
+ case "daily":
+ return DailyStrategy
+ default:
+ }
+ return StdoutStrategy
+}