diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/io/dlog | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/io/dlog')
| -rw-r--r-- | internal/io/dlog/dlog.go | 272 | ||||
| -rw-r--r-- | internal/io/dlog/level.go | 84 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/factory.go | 54 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/file.go | 165 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/fout.go | 46 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/logger.go | 19 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/none.go | 21 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/stdout.go | 54 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/strategy.go | 35 | ||||
| -rw-r--r-- | internal/io/dlog/rotation.go | 27 |
10 files changed, 777 insertions, 0 deletions
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go new file mode 100644 index 0000000..5e0c3a1 --- /dev/null +++ b/internal/io/dlog/dlog.go @@ -0,0 +1,272 @@ +package dlog + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/mimecast/dtail/internal/color/brush" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog/loggers" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" + "github.com/mimecast/dtail/internal/source" +) + +// Client is the log handler for the client packages. +var Client *DLog + +// Server is the log handler for the server packages. +var Server *DLog + +// Common is the log handler for all other packages. +var Common *DLog + +var mutex sync.Mutex +var started bool + +// Start logger(s). +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) { + mutex.Lock() + defer mutex.Unlock() + + if started { + Common.FatalPanic("Logger already started") + } + + Client = new(sourceProcess, source.Client) + Server = new(sourceProcess, source.Server) + Common = Client + if sourceProcess == source.Server { + Common = Server + } + + var wg2 sync.WaitGroup + wg2.Add(2) + go Client.start(ctx, &wg2) + go Server.start(ctx, &wg2) + + go rotation(ctx) + go func() { + wg2.Wait() + wg.Done() + }() + + started = true +} + +// DLog is the DTail logger. +type DLog struct { + logger loggers.Logger + // Is this a DTail server or client process logging? + sourceProcess source.Source + // Is this a DTail server or client package logging? In serverless mode + // the client can also execute code from the server package. + sourcePackage source.Source + // Max log level to log. + maxLevel level + // Current hostname. + hostname string +} + +// new creates a new DTail logger. +func new(sourceProcess, sourcePackage source.Source) *DLog { + hostname, err := os.Hostname() + if err != nil { + panic(err) + } + logRotation := loggers.NewStrategy(config.Common.LogRotation) + loggerName := config.Common.Logger + level := newLevel(config.Common.LogLevel) + + return &DLog{ + logger: loggers.Factory(sourceProcess.String(), loggerName, logRotation), + sourceProcess: sourceProcess, + sourcePackage: sourcePackage, + maxLevel: level, + hostname: hostname, + } +} + +func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() +} + +func (d *DLog) log(level level, args []interface{}) string { + if d.maxLevel < level { + return "" + } + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + now := time.Now() + + switch d.sourceProcess { + case source.Client: + sb.WriteString(d.sourcePackage.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(d.hostname) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(level.String()) + default: + sb.WriteString(level.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(now.Format("20060102-150405")) + } + sb.WriteString(protocol.FieldDelimiter) + d.writeArgStrings(sb, args) + + message := sb.String() + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(now, message) + return message + } + + d.logger.LogWithColors(now, message, brush.Colorfy(message)) + return message +} + +func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { + for i, arg := range args { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } + switch v := arg.(type) { + case string: + sb.WriteString(v) + case error: + sb.WriteString(v.Error()) + default: + sb.WriteString(fmt.Sprintf("%v", v)) + } + } +} + +// FatalPanic terminates the process with a fatal error. +func (d *DLog) FatalPanic(args ...interface{}) { + d.log(Fatal, args) + d.Flush() + + var sb strings.Builder + d.writeArgStrings(&sb, args) + panic(sb.String()) +} + +// Fatal logs a fatal error. +func (d *DLog) Fatal(args ...interface{}) string { + return d.log(Fatal, args) +} + +// Error logging. +func (d *DLog) Error(args ...interface{}) string { + return d.log(Error, args) +} + +// Warn logs a warning message. +func (d *DLog) Warn(args ...interface{}) string { + return d.log(Warn, args) +} + +// Info logging. +func (d *DLog) Info(args ...interface{}) string { + return d.log(Info, args) +} + +// Verbose logging. +func (d *DLog) Verbose(args ...interface{}) string { + return d.log(Verbose, args) +} + +// Debug logging. +func (d *DLog) Debug(args ...interface{}) string { + return d.log(Debug, args) +} + +// Trace logging. +func (d *DLog) Trace(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) + return d.log(Trace, args) +} + +// Devel used for development purpose only logging (e.g. "print" debugging). +func (d *DLog) Devel(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) + return d.log(Devel, args) +} + +// Raw message logging. +func (d *DLog) Raw(message string) string { + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(time.Now(), message) + return message + } + d.logger.LogWithColors(time.Now(), message, brush.Colorfy(message)) + return message +} + +// Mapreduce logging. +func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { + args := make([]interface{}, len(data)+1) + + if d.sourceProcess == source.Server { + // level|date-time|process|caller|cpus|goroutines|cgocalls|loadavg|uptime|MAPREDUCE:TABLE|key=value|... + + var loadAvg string + if loadAvgBytes, err := ioutil.ReadFile("/proc/loadavg"); err == nil { + tmp := string(loadAvgBytes) + s := strings.SplitN(tmp, " ", 2) + loadAvg = s[0] + } + + var uptime string + if uptimeBytes, err := ioutil.ReadFile("/proc/uptime"); err == nil { + tmp := string(uptimeBytes) + s := strings.SplitN(tmp, ".", 2) + i, _ := strconv.ParseInt(s[0], 10, 64) + t := time.Duration(i) * time.Second + uptime = fmt.Sprintf("%v", t) + } + + _, file, line, _ := runtime.Caller(1) + args[0] = fmt.Sprintf("%d|%s:%d|%d|%d|%d|%s|%s|MAPREDUCE:%s", + os.Getpid(), + filepath.Base(file), line, + runtime.NumCPU(), + runtime.NumGoroutine(), + runtime.NumCgoCall(), + loadAvg, + uptime, + strings.ToUpper(table)) + } else { + args[0] = fmt.Sprintf("STATS:%s", strings.ToUpper(table)) + } + + i := 1 + for k, v := range data { + args[i] = fmt.Sprintf("%s=%v", k, v) + i++ + } + return d.log(Info, args) +} + +// Flush the log buffers. +func (d *DLog) Flush() { d.logger.Flush() } + +// Pause the logging. +func (d *DLog) Pause() { d.logger.Pause() } + +// Resume the logging. +func (d *DLog) Resume() { d.logger.Resume() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go new file mode 100644 index 0000000..05d9ed9 --- /dev/null +++ b/internal/io/dlog/level.go @@ -0,0 +1,84 @@ +package dlog + +import ( + "fmt" + "strings" +) + +type level int + +// Available log levels. +const ( + None level = iota + Fatal level = iota + Error level = iota + Warn level = iota + Info level = iota + Default level = iota + Verbose level = iota + Debug level = iota + Devel level = iota + Trace level = iota + All level = iota +) + +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, + Devel, Trace, All} + +func newLevel(l string) level { + switch strings.ToLower(l) { + case "none": + return None + case "fatal": + return Fatal + case "error": + return Error + case "warn": + return Warn + case "info": + return Info + case "": + fallthrough + case "default": + return Default + case "verbose": + return Verbose + case "debug": + return Debug + case "devel": + return Devel + case "trace": + return Trace + case "all": + return All + } + panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels)) +} + +func (l level) String() string { + switch l { + case None: + return "NONE" + case Fatal: + return "FATAL" + case Error: + return "ERROR" + case Warn: + return "WARN" + case Info: + return "INFO" + case Default: + return "DEFAULT" + case Verbose: + return "VERBOSE" + case Debug: + return "DEBUG" + case Devel: + return "DEVEL" + case Trace: + return "TRACE" + case All: + return "ALL" + } + panic("Unknown log level " + fmt.Sprintf("%d", l)) +} diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go new file mode 100644 index 0000000..a5cc7cf --- /dev/null +++ b/internal/io/dlog/loggers/factory.go @@ -0,0 +1,54 @@ +package loggers + +import ( + "fmt" + "strings" + "sync" +) + +var factoryMap map[string]Logger +var factoryMutex sync.Mutex + +// Factory is there to retrieve a logger based on various settings. +func Factory(sourceName, loggerName string, logRotation Strategy) Logger { + factoryMutex.Lock() + defer factoryMutex.Unlock() + + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, + logRotation.FileBase, loggerName) + if factoryMap == nil { + factoryMap = make(map[string]Logger) + } + + singleton, ok := factoryMap[id] + if !ok { + switch strings.ToLower(loggerName) { + case "none": + singleton = none{} + case "stdout": + singleton = newStdout() + factoryMap[id] = singleton + case "file": + singleton = newFile(logRotation) + factoryMap[id] = singleton + case "fout": + singleton = newFout(logRotation) + factoryMap[id] = singleton + default: + panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) + } + } + return singleton +} + +// FactoryRotate invokes a log rotation of all loggers. +func FactoryRotate() { + factoryMutex.Lock() + defer factoryMutex.Unlock() + if factoryMap == nil { + return + } + for _, logger := range factoryMap { + logger.Rotate() + } +} diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go new file mode 100644 index 0000000..94824fe --- /dev/null +++ b/internal/io/dlog/loggers/file.go @@ -0,0 +1,165 @@ +package loggers + +import ( + "bufio" + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/mimecast/dtail/internal/config" +) + +type fileWriter struct{} + +type fileMessageBuf struct { + now time.Time + message string +} + +type file struct { + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool + lastFileName string + strategy Strategy +} + +func newFile(strategy Strategy) *file { + return &file{ + bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + rotateCh: make(chan struct{}), + flushCh: make(chan struct{}), + strategy: strategy, + } +} + +func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { + f.mutex.Lock() + defer func() { + f.started = true + f.mutex.Unlock() + }() + + if f.started { + // Logger already started from another Goroutine. + wg.Done() + return + } + + pause := func(ctx context.Context) { + select { + case <-f.resumeCh: + return + case <-ctx.Done(): + return + } + } + + go func() { + defer wg.Done() + for { + select { + case m := <-f.bufferCh: + f.write(m) + case <-f.pauseCh: + pause(ctx) + case <-f.flushCh: + f.flush() + case <-ctx.Done(): + f.flush() + f.fd.Close() + return + } + } + }() +} + +func (f *file) Log(now time.Time, message string) { + f.bufferCh <- &fileMessageBuf{now, message} +} + +func (f *file) LogWithColors(now time.Time, message, coloredMessage string) { + panic("Colors not supported in file logger") +} + +func (f *file) Pause() { f.pauseCh <- struct{}{} } +func (f *file) Resume() { f.resumeCh <- struct{}{} } +func (f *file) Flush() { f.flushCh <- struct{}{} } + +func (f *file) Rotate() { f.rotateCh <- struct{}{} } +func (*file) SupportsColors() bool { return false } + +func (f *file) write(m *fileMessageBuf) { + select { + case <-f.rotateCh: + // Force re-opening the outfile next time in getWriter. + f.lastFileName = "" + default: + } + + var writer *bufio.Writer + if f.strategy.Rotation == DailyRotation { + writer = f.getWriter(m.now.Format("20060102")) + } else { + writer = f.getWriter(f.strategy.FileBase) + } + + writer.WriteString(m.message) + writer.WriteByte('\n') +} + +func (f *file) getWriter(name string) *bufio.Writer { + if f.lastFileName == name { + return f.writer + } + if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { + if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { + panic(err) + } + } + + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name) + newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + panic(err) + } + + // Close old writer. + if f.fd != nil { + f.writer.Flush() + f.fd.Close() + } + // Set new writer. + f.fd = newFd + f.writer = bufio.NewWriterSize(f.fd, 1) + f.lastFileName = name + + return f.writer +} + +func (f *file) flush() { + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() + for { + select { + case m := <-f.bufferCh: + f.write(m) + default: + return + } + } +} diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go new file mode 100644 index 0000000..60c318d --- /dev/null +++ b/internal/io/dlog/loggers/fout.go @@ -0,0 +1,46 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +type fout struct { + file *file + stdout *stdout +} + +// Logs to both, a file and stdout +func newFout(strategy Strategy) *fout { + return &fout{file: newFile(strategy), stdout: newStdout()} +} + +func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) { + go func() { + defer wg.Done() + + var wg2 sync.WaitGroup + wg2.Add(2) + f.file.Start(ctx, &wg2) + f.stdout.Start(ctx, &wg2) + wg2.Wait() + }() +} + +func (f *fout) Log(now time.Time, message string) { + f.stdout.Log(now, message) + f.file.Log(now, message) +} + +func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) { + f.stdout.LogWithColors(now, "", coloredMessage) + f.file.Log(now, message) +} + +func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() } +func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() } +func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() } +func (f *fout) Rotate() { f.file.Rotate() } + +func (fout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go new file mode 100644 index 0000000..d4e85de --- /dev/null +++ b/internal/io/dlog/loggers/logger.go @@ -0,0 +1,19 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +// Logger is there to plug in your own log implementation. +type Logger interface { + Log(now time.Time, message string) + LogWithColors(now time.Time, message, messageWithColors string) + Start(ctx context.Context, wg *sync.WaitGroup) + Flush() + Pause() + Resume() + Rotate() + SupportsColors() bool +} diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go new file mode 100644 index 0000000..270027f --- /dev/null +++ b/internal/io/dlog/loggers/none.go @@ -0,0 +1,21 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +// don't log anything +type none struct{} + +func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() } +func (none) Log(now time.Time, message string) {} + +func (none) LogWithColors(now time.Time, message, coloredMessage string) {} + +func (none) Flush() {} +func (none) Pause() {} +func (none) Resume() {} +func (none) Rotate() {} +func (none) SupportsColors() bool { return false } diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go new file mode 100644 index 0000000..05485c6 --- /dev/null +++ b/internal/io/dlog/loggers/stdout.go @@ -0,0 +1,54 @@ +package loggers + +import ( + "context" + "fmt" + "sync" + "time" +) + +type stdout struct { + pauseCh chan struct{} + resumeCh chan struct{} + mutex sync.Mutex +} + +func newStdout() *stdout { + return &stdout{ + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + } +} + +func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) { + wg.Done() +} + +func (s *stdout) Log(now time.Time, message string) { + s.log(message) +} + +func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) { + s.log(coloredMessage) +} + +func (s *stdout) log(message string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + select { + case <-s.pauseCh: + // Pause until resumed. + <-s.resumeCh + default: + } + + fmt.Println(message) +} + +func (s *stdout) Pause() { s.pauseCh <- struct{}{} } +func (s *stdout) Resume() { s.resumeCh <- struct{}{} } +func (s *stdout) Flush() {} +func (s *stdout) Rotate() {} + +func (stdout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go new file mode 100644 index 0000000..48e7d44 --- /dev/null +++ b/internal/io/dlog/loggers/strategy.go @@ -0,0 +1,35 @@ +package loggers + +import ( + "os" + "path/filepath" + "strings" +) + +// Rotation is the actual strategy used for log rotation.. +type Rotation int + +const ( + // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP. + DailyRotation Rotation = iota + // SignalRotation tells DTail to rotate its logs only on SIGHUP. + SignalRotation Rotation = iota +) + +// Strategy is a pair of the rotation and the file base. +type Strategy struct { + // Rotation is the actual rotation strategy used. + Rotation Rotation + // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used. + FileBase string +} + +// NewStrategy returns the stratey based on its name. +func NewStrategy(name string) Strategy { + switch strings.ToLower(name) { + case "daily": + return Strategy{DailyRotation, ""} + default: + return Strategy{SignalRotation, filepath.Base(os.Args[0])} + } +} diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go new file mode 100644 index 0000000..15ce1fd --- /dev/null +++ b/internal/io/dlog/rotation.go @@ -0,0 +1,27 @@ +package dlog + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/mimecast/dtail/internal/io/dlog/loggers" +) + +func rotation(ctx context.Context) { + rotateCh := make(chan os.Signal, 1) + signal.Notify(rotateCh, syscall.SIGHUP) + go func() { + for { + select { + case <-rotateCh: + Common.Debug("Invoking log rotation") + loggers.FactoryRotate() + return + case <-ctx.Done(): + return + } + } + }() +} |
