diff options
Diffstat (limited to 'internal/io')
26 files changed, 1143 insertions, 772 deletions
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go new file mode 100644 index 0000000..5e0c3a1 --- /dev/null +++ b/internal/io/dlog/dlog.go @@ -0,0 +1,272 @@ +package dlog + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/mimecast/dtail/internal/color/brush" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog/loggers" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" + "github.com/mimecast/dtail/internal/source" +) + +// Client is the log handler for the client packages. +var Client *DLog + +// Server is the log handler for the server packages. +var Server *DLog + +// Common is the log handler for all other packages. +var Common *DLog + +var mutex sync.Mutex +var started bool + +// Start logger(s). +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) { + mutex.Lock() + defer mutex.Unlock() + + if started { + Common.FatalPanic("Logger already started") + } + + Client = new(sourceProcess, source.Client) + Server = new(sourceProcess, source.Server) + Common = Client + if sourceProcess == source.Server { + Common = Server + } + + var wg2 sync.WaitGroup + wg2.Add(2) + go Client.start(ctx, &wg2) + go Server.start(ctx, &wg2) + + go rotation(ctx) + go func() { + wg2.Wait() + wg.Done() + }() + + started = true +} + +// DLog is the DTail logger. +type DLog struct { + logger loggers.Logger + // Is this a DTail server or client process logging? + sourceProcess source.Source + // Is this a DTail server or client package logging? In serverless mode + // the client can also execute code from the server package. + sourcePackage source.Source + // Max log level to log. + maxLevel level + // Current hostname. + hostname string +} + +// new creates a new DTail logger. +func new(sourceProcess, sourcePackage source.Source) *DLog { + hostname, err := os.Hostname() + if err != nil { + panic(err) + } + logRotation := loggers.NewStrategy(config.Common.LogRotation) + loggerName := config.Common.Logger + level := newLevel(config.Common.LogLevel) + + return &DLog{ + logger: loggers.Factory(sourceProcess.String(), loggerName, logRotation), + sourceProcess: sourceProcess, + sourcePackage: sourcePackage, + maxLevel: level, + hostname: hostname, + } +} + +func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + var wg2 sync.WaitGroup + wg2.Add(1) + d.logger.Start(ctx, &wg2) + <-ctx.Done() + wg2.Wait() +} + +func (d *DLog) log(level level, args []interface{}) string { + if d.maxLevel < level { + return "" + } + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + now := time.Now() + + switch d.sourceProcess { + case source.Client: + sb.WriteString(d.sourcePackage.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(d.hostname) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(level.String()) + default: + sb.WriteString(level.String()) + sb.WriteString(protocol.FieldDelimiter) + sb.WriteString(now.Format("20060102-150405")) + } + sb.WriteString(protocol.FieldDelimiter) + d.writeArgStrings(sb, args) + + message := sb.String() + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(now, message) + return message + } + + d.logger.LogWithColors(now, message, brush.Colorfy(message)) + return message +} + +func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { + for i, arg := range args { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } + switch v := arg.(type) { + case string: + sb.WriteString(v) + case error: + sb.WriteString(v.Error()) + default: + sb.WriteString(fmt.Sprintf("%v", v)) + } + } +} + +// FatalPanic terminates the process with a fatal error. +func (d *DLog) FatalPanic(args ...interface{}) { + d.log(Fatal, args) + d.Flush() + + var sb strings.Builder + d.writeArgStrings(&sb, args) + panic(sb.String()) +} + +// Fatal logs a fatal error. +func (d *DLog) Fatal(args ...interface{}) string { + return d.log(Fatal, args) +} + +// Error logging. +func (d *DLog) Error(args ...interface{}) string { + return d.log(Error, args) +} + +// Warn logs a warning message. +func (d *DLog) Warn(args ...interface{}) string { + return d.log(Warn, args) +} + +// Info logging. +func (d *DLog) Info(args ...interface{}) string { + return d.log(Info, args) +} + +// Verbose logging. +func (d *DLog) Verbose(args ...interface{}) string { + return d.log(Verbose, args) +} + +// Debug logging. +func (d *DLog) Debug(args ...interface{}) string { + return d.log(Debug, args) +} + +// Trace logging. +func (d *DLog) Trace(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) + return d.log(Trace, args) +} + +// Devel used for development purpose only logging (e.g. "print" debugging). +func (d *DLog) Devel(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) + return d.log(Devel, args) +} + +// Raw message logging. +func (d *DLog) Raw(message string) string { + if !config.Client.TermColorsEnable || !d.logger.SupportsColors() { + d.logger.Log(time.Now(), message) + return message + } + d.logger.LogWithColors(time.Now(), message, brush.Colorfy(message)) + return message +} + +// Mapreduce logging. +func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { + args := make([]interface{}, len(data)+1) + + if d.sourceProcess == source.Server { + // level|date-time|process|caller|cpus|goroutines|cgocalls|loadavg|uptime|MAPREDUCE:TABLE|key=value|... + + var loadAvg string + if loadAvgBytes, err := ioutil.ReadFile("/proc/loadavg"); err == nil { + tmp := string(loadAvgBytes) + s := strings.SplitN(tmp, " ", 2) + loadAvg = s[0] + } + + var uptime string + if uptimeBytes, err := ioutil.ReadFile("/proc/uptime"); err == nil { + tmp := string(uptimeBytes) + s := strings.SplitN(tmp, ".", 2) + i, _ := strconv.ParseInt(s[0], 10, 64) + t := time.Duration(i) * time.Second + uptime = fmt.Sprintf("%v", t) + } + + _, file, line, _ := runtime.Caller(1) + args[0] = fmt.Sprintf("%d|%s:%d|%d|%d|%d|%s|%s|MAPREDUCE:%s", + os.Getpid(), + filepath.Base(file), line, + runtime.NumCPU(), + runtime.NumGoroutine(), + runtime.NumCgoCall(), + loadAvg, + uptime, + strings.ToUpper(table)) + } else { + args[0] = fmt.Sprintf("STATS:%s", strings.ToUpper(table)) + } + + i := 1 + for k, v := range data { + args[i] = fmt.Sprintf("%s=%v", k, v) + i++ + } + return d.log(Info, args) +} + +// Flush the log buffers. +func (d *DLog) Flush() { d.logger.Flush() } + +// Pause the logging. +func (d *DLog) Pause() { d.logger.Pause() } + +// Resume the logging. +func (d *DLog) Resume() { d.logger.Resume() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go new file mode 100644 index 0000000..05d9ed9 --- /dev/null +++ b/internal/io/dlog/level.go @@ -0,0 +1,84 @@ +package dlog + +import ( + "fmt" + "strings" +) + +type level int + +// Available log levels. +const ( + None level = iota + Fatal level = iota + Error level = iota + Warn level = iota + Info level = iota + Default level = iota + Verbose level = iota + Debug level = iota + Devel level = iota + Trace level = iota + All level = iota +) + +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, + Devel, Trace, All} + +func newLevel(l string) level { + switch strings.ToLower(l) { + case "none": + return None + case "fatal": + return Fatal + case "error": + return Error + case "warn": + return Warn + case "info": + return Info + case "": + fallthrough + case "default": + return Default + case "verbose": + return Verbose + case "debug": + return Debug + case "devel": + return Devel + case "trace": + return Trace + case "all": + return All + } + panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels)) +} + +func (l level) String() string { + switch l { + case None: + return "NONE" + case Fatal: + return "FATAL" + case Error: + return "ERROR" + case Warn: + return "WARN" + case Info: + return "INFO" + case Default: + return "DEFAULT" + case Verbose: + return "VERBOSE" + case Debug: + return "DEBUG" + case Devel: + return "DEVEL" + case Trace: + return "TRACE" + case All: + return "ALL" + } + panic("Unknown log level " + fmt.Sprintf("%d", l)) +} diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go new file mode 100644 index 0000000..a5cc7cf --- /dev/null +++ b/internal/io/dlog/loggers/factory.go @@ -0,0 +1,54 @@ +package loggers + +import ( + "fmt" + "strings" + "sync" +) + +var factoryMap map[string]Logger +var factoryMutex sync.Mutex + +// Factory is there to retrieve a logger based on various settings. +func Factory(sourceName, loggerName string, logRotation Strategy) Logger { + factoryMutex.Lock() + defer factoryMutex.Unlock() + + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, + logRotation.FileBase, loggerName) + if factoryMap == nil { + factoryMap = make(map[string]Logger) + } + + singleton, ok := factoryMap[id] + if !ok { + switch strings.ToLower(loggerName) { + case "none": + singleton = none{} + case "stdout": + singleton = newStdout() + factoryMap[id] = singleton + case "file": + singleton = newFile(logRotation) + factoryMap[id] = singleton + case "fout": + singleton = newFout(logRotation) + factoryMap[id] = singleton + default: + panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) + } + } + return singleton +} + +// FactoryRotate invokes a log rotation of all loggers. +func FactoryRotate() { + factoryMutex.Lock() + defer factoryMutex.Unlock() + if factoryMap == nil { + return + } + for _, logger := range factoryMap { + logger.Rotate() + } +} diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go new file mode 100644 index 0000000..94824fe --- /dev/null +++ b/internal/io/dlog/loggers/file.go @@ -0,0 +1,165 @@ +package loggers + +import ( + "bufio" + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/mimecast/dtail/internal/config" +) + +type fileWriter struct{} + +type fileMessageBuf struct { + now time.Time + message string +} + +type file struct { + bufferCh chan *fileMessageBuf + pauseCh chan struct{} + resumeCh chan struct{} + rotateCh chan struct{} + flushCh chan struct{} + fd *os.File + writer *bufio.Writer + mutex sync.Mutex + started bool + lastFileName string + strategy Strategy +} + +func newFile(strategy Strategy) *file { + return &file{ + bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + rotateCh: make(chan struct{}), + flushCh: make(chan struct{}), + strategy: strategy, + } +} + +func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) { + f.mutex.Lock() + defer func() { + f.started = true + f.mutex.Unlock() + }() + + if f.started { + // Logger already started from another Goroutine. + wg.Done() + return + } + + pause := func(ctx context.Context) { + select { + case <-f.resumeCh: + return + case <-ctx.Done(): + return + } + } + + go func() { + defer wg.Done() + for { + select { + case m := <-f.bufferCh: + f.write(m) + case <-f.pauseCh: + pause(ctx) + case <-f.flushCh: + f.flush() + case <-ctx.Done(): + f.flush() + f.fd.Close() + return + } + } + }() +} + +func (f *file) Log(now time.Time, message string) { + f.bufferCh <- &fileMessageBuf{now, message} +} + +func (f *file) LogWithColors(now time.Time, message, coloredMessage string) { + panic("Colors not supported in file logger") +} + +func (f *file) Pause() { f.pauseCh <- struct{}{} } +func (f *file) Resume() { f.resumeCh <- struct{}{} } +func (f *file) Flush() { f.flushCh <- struct{}{} } + +func (f *file) Rotate() { f.rotateCh <- struct{}{} } +func (*file) SupportsColors() bool { return false } + +func (f *file) write(m *fileMessageBuf) { + select { + case <-f.rotateCh: + // Force re-opening the outfile next time in getWriter. + f.lastFileName = "" + default: + } + + var writer *bufio.Writer + if f.strategy.Rotation == DailyRotation { + writer = f.getWriter(m.now.Format("20060102")) + } else { + writer = f.getWriter(f.strategy.FileBase) + } + + writer.WriteString(m.message) + writer.WriteByte('\n') +} + +func (f *file) getWriter(name string) *bufio.Writer { + if f.lastFileName == name { + return f.writer + } + if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { + if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { + panic(err) + } + } + + logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name) + newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + panic(err) + } + + // Close old writer. + if f.fd != nil { + f.writer.Flush() + f.fd.Close() + } + // Set new writer. + f.fd = newFd + f.writer = bufio.NewWriterSize(f.fd, 1) + f.lastFileName = name + + return f.writer +} + +func (f *file) flush() { + defer func() { + if f.writer != nil { + f.writer.Flush() + } + }() + for { + select { + case m := <-f.bufferCh: + f.write(m) + default: + return + } + } +} diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go new file mode 100644 index 0000000..60c318d --- /dev/null +++ b/internal/io/dlog/loggers/fout.go @@ -0,0 +1,46 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +type fout struct { + file *file + stdout *stdout +} + +// Logs to both, a file and stdout +func newFout(strategy Strategy) *fout { + return &fout{file: newFile(strategy), stdout: newStdout()} +} + +func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) { + go func() { + defer wg.Done() + + var wg2 sync.WaitGroup + wg2.Add(2) + f.file.Start(ctx, &wg2) + f.stdout.Start(ctx, &wg2) + wg2.Wait() + }() +} + +func (f *fout) Log(now time.Time, message string) { + f.stdout.Log(now, message) + f.file.Log(now, message) +} + +func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) { + f.stdout.LogWithColors(now, "", coloredMessage) + f.file.Log(now, message) +} + +func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() } +func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() } +func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() } +func (f *fout) Rotate() { f.file.Rotate() } + +func (fout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go new file mode 100644 index 0000000..d4e85de --- /dev/null +++ b/internal/io/dlog/loggers/logger.go @@ -0,0 +1,19 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +// Logger is there to plug in your own log implementation. +type Logger interface { + Log(now time.Time, message string) + LogWithColors(now time.Time, message, messageWithColors string) + Start(ctx context.Context, wg *sync.WaitGroup) + Flush() + Pause() + Resume() + Rotate() + SupportsColors() bool +} diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go new file mode 100644 index 0000000..270027f --- /dev/null +++ b/internal/io/dlog/loggers/none.go @@ -0,0 +1,21 @@ +package loggers + +import ( + "context" + "sync" + "time" +) + +// don't log anything +type none struct{} + +func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() } +func (none) Log(now time.Time, message string) {} + +func (none) LogWithColors(now time.Time, message, coloredMessage string) {} + +func (none) Flush() {} +func (none) Pause() {} +func (none) Resume() {} +func (none) Rotate() {} +func (none) SupportsColors() bool { return false } diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go new file mode 100644 index 0000000..05485c6 --- /dev/null +++ b/internal/io/dlog/loggers/stdout.go @@ -0,0 +1,54 @@ +package loggers + +import ( + "context" + "fmt" + "sync" + "time" +) + +type stdout struct { + pauseCh chan struct{} + resumeCh chan struct{} + mutex sync.Mutex +} + +func newStdout() *stdout { + return &stdout{ + pauseCh: make(chan struct{}), + resumeCh: make(chan struct{}), + } +} + +func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) { + wg.Done() +} + +func (s *stdout) Log(now time.Time, message string) { + s.log(message) +} + +func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) { + s.log(coloredMessage) +} + +func (s *stdout) log(message string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + select { + case <-s.pauseCh: + // Pause until resumed. + <-s.resumeCh + default: + } + + fmt.Println(message) +} + +func (s *stdout) Pause() { s.pauseCh <- struct{}{} } +func (s *stdout) Resume() { s.resumeCh <- struct{}{} } +func (s *stdout) Flush() {} +func (s *stdout) Rotate() {} + +func (stdout) SupportsColors() bool { return true } diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go new file mode 100644 index 0000000..48e7d44 --- /dev/null +++ b/internal/io/dlog/loggers/strategy.go @@ -0,0 +1,35 @@ +package loggers + +import ( + "os" + "path/filepath" + "strings" +) + +// Rotation is the actual strategy used for log rotation.. +type Rotation int + +const ( + // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP. + DailyRotation Rotation = iota + // SignalRotation tells DTail to rotate its logs only on SIGHUP. + SignalRotation Rotation = iota +) + +// Strategy is a pair of the rotation and the file base. +type Strategy struct { + // Rotation is the actual rotation strategy used. + Rotation Rotation + // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used. + FileBase string +} + +// NewStrategy returns the stratey based on its name. +func NewStrategy(name string) Strategy { + switch strings.ToLower(name) { + case "daily": + return Strategy{DailyRotation, ""} + default: + return Strategy{SignalRotation, filepath.Base(os.Args[0])} + } +} diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go new file mode 100644 index 0000000..15ce1fd --- /dev/null +++ b/internal/io/dlog/rotation.go @@ -0,0 +1,27 @@ +package dlog + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/mimecast/dtail/internal/io/dlog/loggers" +) + +func rotation(ctx context.Context) { + rotateCh := make(chan os.Signal, 1) + signal.Notify(rotateCh, syscall.SIGHUP) + go func() { + for { + select { + case <-rotateCh: + Common.Debug("Invoking log rotation") + loggers.FactoryRotate() + return + case <-ctx.Done(): + return + } + } + }() +} diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index 7f387bc..01c15ba 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index efd410e..b05fd39 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -8,9 +8,11 @@ import ( "github.com/mimecast/dtail/internal/regex" ) -// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. +// FileReader is the interface used on the dtail server to read/cat/grep/mapr... +// a file. type FileReader interface { - Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go deleted file mode 100644 index c4f605e..0000000 --- a/internal/io/fs/filter.go +++ /dev/null @@ -1,167 +0,0 @@ -package fs - -import ( - "context" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/lcontext" - "github.com/mimecast/dtail/internal/regex" -) - -func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { - // Do we have any kind of local context settings? If so then run the more complex - // filterWithLContext method. - if lContext.Has() { - // We can not skip transmitting any lines to the client with a local - // grep context specified. - f.canSkipLines = false - f.filterWithLContext(ctx, rawLines, lines, re, lContext) - return - } - - f.filterWithoutLContext(ctx, rawLines, lines, re) -} - -// Filter log lines matching a given regular expression, however with local grep context. -func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { - // Scenario 1: Finish once maxCount hits found - maxCount := lContext.MaxCount - processMaxCount := maxCount > 0 - maxReached := false - - // Scenario 2: Print prev. N lines when current line matches. - before := lContext.BeforeContext - processBefore := before > 0 - var beforeBuf chan []byte - if processBefore { - beforeBuf = make(chan []byte, before) - } - - // Screnario 3: Print next N lines when current line matches. - after := 0 - processAfter := lContext.AfterContext > 0 - - for rawLine := range rawLines { - // logger.Debug("rawLine", string(rawLine)) - f.updatePosition() - - if !re.Match(rawLine) { - f.updateLineNotMatched() - - if processAfter && after > 0 { - after-- - myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} - select { - case lines <- myLine: - case <-ctx.Done(): - return - } - - } else if processBefore { - // Keep last num BeforeContext raw messages. - select { - case beforeBuf <- rawLine: - default: - <-beforeBuf - beforeBuf <- rawLine - } - } - continue - } - - f.updateLineMatched() - - if processAfter { - if maxReached { - return - } - after = lContext.AfterContext - } - - if processBefore { - i := uint64(len(beforeBuf)) - for { - select { - case myRawLine := <-beforeBuf: - myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100} - i-- - select { - case lines <- myLine: - case <-ctx.Done(): - return - } - default: - // beforeBuf is now empty. - } - if len(beforeBuf) == 0 { - break - } - } - } - - line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} - - select { - case lines <- line: - if processMaxCount { - maxCount-- - if maxCount == 0 { - if !processAfter || after == 0 { - return - } - // Unfortunatley we have to continue filter, as there might be more lines to print - maxReached = true - } - } - case <-ctx.Done(): - return - } - } -} - -// Filter log lines matching a given regular expression, there is no local grep context specified. -func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) { - continue - } - - line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()} - - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - } -} - -func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool { - if !re.Match(rawLine) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - // Regex dosn't match, so not interested in it. - return true - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - // Matching, not transmittable - return true - } - f.updateLineTransmitted() - - // Matching, transmittable - return false -} diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index cc5dd9b..d621c09 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -3,12 +3,12 @@ package permissions import ( - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in") return true, nil } diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go index 7d2d7ca..904b90f 100644 --- a/internal/io/fs/permissions/permission_linuxacl.go +++ b/internal/io/fs/permissions/permission_linuxacl.go @@ -13,7 +13,7 @@ import ( "unsafe" ) -// ToRead checks whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 161e3f0..28cbe58 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -2,16 +2,20 @@ package fs import ( "bufio" + "bytes" "compress/gzip" "context" + "errors" "fmt" "io" "os" "strings" + "sync" "time" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" @@ -37,31 +41,10 @@ type readFile struct { limiter chan struct{} } -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } - - return -} - // String returns the string representation of the readFile func (f readFile) String() string { - return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", + return fmt.Sprintf( + "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", f.filePath, f.globID, f.retry, @@ -80,8 +63,10 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { - logger.Debug("readFile", f) +func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, + lines chan<- line.Line, re regex.Regex) error { + + dlog.Common.Debug("readFile", f) defer func() { select { case <-f.limiter: @@ -93,7 +78,8 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c case f.limiter <- struct{}{}: default: select { - case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, + "Server limit reached. Queuing file..."): case <-ctx.Done(): return nil } @@ -110,111 +96,335 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c fd.Seek(0, io.SeekEnd) } - rawLines := make(chan []byte, 100) + rawLines := make(chan *bytes.Buffer, 100) + truncate := make(chan struct{}) + readCtx, readCancel := context.WithCancel(ctx) + var filterWg sync.WaitGroup + filterWg.Add(1) - filterDone := make(chan struct{}) + go f.periodicTruncateCheck(ctx, truncate) go func() { - f.filter(ctx, rawLines, lines, re, lContext) - close(filterDone) + f.filter(ctx, ltx, rawLines, lines, re) + filterWg.Done() // If the filter stopped, make the reader stop too, no need to read // more data if there is nothing more the filter wants to filter for! // E.g. it could be that we only want to filter N matches but not more. readCancel() }() - err = f.read(readCtx, fd, rawLines) + err = f.read(readCtx, fd, rawLines, truncate) close(rawLines) - - // Filter may flushes some data still. So wait until it is done here. - <-filterDone + // Filter may sends some data still. So wait until it is done here. + filterWg.Wait() return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { - var offset uint64 +func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { + for { + select { + case <-time.After(time.Second * 3): + select { + case truncate <- struct{}{}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } +} +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + dlog.Common.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + dlog.Common.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + return +} + +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { + var offset uint64 reader, err := f.makeReader(fd) if err != nil { return err } - rawLine := make([]byte, 0, 512) lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false - - checkTruncate := f.truncateTimer(ctx) + warnedAboutLongLine := false + message := pool.BytesBuffer.Get().(*bytes.Buffer) for { - select { - case <-ctx.Done(): - return nil - default: - } - - select { - case <-checkTruncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - logger.Info(f.filePath, "Current offset", offset) - default: - } - - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() + b, err := reader.ReadByte() if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. if err != io.EOF { return err } + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + case <-ctx.Done(): + return nil + default: + } if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") + dlog.Common.Info(f.FilePath(), "End of file reached") return nil } time.Sleep(time.Millisecond * 100) continue } + offset++ - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') + switch b { + case '\n': select { - case rawLines <- rawLine: + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) + warnedAboutLongLine = false case <-ctx.Done(): return nil } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false + default: + if message.Len() >= lineLengthThreshold { + if !warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + warnedAboutLongLine = true + } + message.WriteString("\n") + select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + case <-ctx.Done(): + return nil + } + } + message.WriteByte(b) + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if ltx.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, ltx, rawLines, lines, re) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, + lines chan<- line.Line, re regex.Regex) { + + for { + select { + case line, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + select { + case lines <- filteredLine: + case <-ctx.Done(): + return + } + } + } + } +} + +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Scenario 1: Finish once maxCount hits found + maxCount := ltx.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := ltx.BeforeContext + processBefore := before > 0 + var beforeBuf chan *bytes.Buffer + if processBefore { + beforeBuf = make(chan *bytes.Buffer, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := ltx.AfterContext > 0 + + for lineBytesBuffer := range rawLines { + f.updatePosition() + + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- lineBytesBuffer: + default: + pool.RecycleBytesBuffer(<-beforeBuf) + beforeBuf <- lineBytesBuffer + } } continue } - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true + f.updateLineMatched() + + if processAfter { + if maxReached { + return } - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-ctx.Done(): - return nil + after = ltx.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case lineBytesBuffer := <-beforeBuf: + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount() - i, + TransmittedPerc: 100, + } + i-- + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } } - rawLine = make([]byte, 0, 512) } + + line := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, + re regex.Regex) (line.Line, bool) { + + var read line.Line + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + return read, false + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + return read, false + } + f.updateLineTransmitted() + + read = line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: f.transmittedPerc(), + } + return read, true +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + dlog.Common.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + if curPos > pathPos { + return true, errors.New("File got truncated") } + return false, nil } diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 14994e5..b03b45d 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go deleted file mode 100644 index a8d59ac..0000000 --- a/internal/io/fs/truncate.go +++ /dev/null @@ -1,61 +0,0 @@ -package fs - -import ( - "context" - "errors" - "io" - "os" - "time" - - "github.com/mimecast/dtail/internal/io/logger" -) - -func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) { - checkTruncate = make(chan struct{}) - - go func() { - for { - select { - case <-time.After(time.Second * 3): - select { - case checkTruncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } - }() - - return -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 715be34..d306c88 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -1,13 +1,14 @@ package line import ( + "bytes" "fmt" ) // Line represents a read log line. type Line struct { // The content of the log line. - Content []byte + Content *bytes.Buffer // Until now, how many log lines were processed? Count uint64 // Sometimes we produce too many log lines so that the client @@ -25,7 +26,7 @@ type Line struct { // Return a human readable representation of the followed line. func (l Line) String() string { return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)", - string(l.Content), + l.Content.String(), l.TransmittedPerc, l.Count, l.SourceID) diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go deleted file mode 100644 index 4254eef..0000000 --- a/internal/io/logger/logger.go +++ /dev/null @@ -1,403 +0,0 @@ -package logger - -import ( - "bufio" - "context" - "fmt" - "os" - "os/signal" - "runtime" - "strings" - "sync" - "syscall" - "time" - - "github.com/mimecast/dtail/internal/color" - "github.com/mimecast/dtail/internal/config" -) - -const ( - clientStr string = "CLIENT" - serverStr string = "SERVER" - infoStr string = "INFO" - warnStr string = "WARN" - errorStr string = "ERROR" - fatalStr string = "FATAL" - debugStr string = "DEBUG" - traceStr string = "TRACE" -) - -// Mode specifies the configured logging mode(s) -var Mode Modes - -// Strategy is the current log strattegy used. -var strategy Strategy - -// Synchronise access to logging. -var mutex sync.Mutex - -// File descriptor of log file when Mode.logToFile enabled. -var fd *os.File - -// File write buffer of log file when Mode.logToFile enabled. -var writer *bufio.Writer - -// File write buffer of stdout when Mode.logToStdout enabled. -var stdoutWriter *bufio.Writer - -// Current hostname. -var hostname string - -// Used to detect change of day (create one log file per day0 -var lastDateStr string - -// Used to make logging non-blocking. -var fileLogBufCh chan buf -var stdoutBufCh chan string - -// Stdout channel, required to pause output -var pauseCh chan struct{} -var resumeCh chan struct{} - -// Tell the logger about logrotation -var rotateCh chan os.Signal - -// Helper type to make logging non-blocking. -type buf struct { - time time.Time - message string -} - -// Start logging. -func Start(ctx context.Context, mode Modes) { - Mode = mode - - switch { - case Mode.Nothing: - return - case Mode.Quiet: - Mode.Trace = false - Mode.Debug = false - case Mode.Trace: - Mode.Debug = true - default: - } - - strategy := logStrategy() - stdoutWriter = bufio.NewWriter(os.Stdout) - - switch strategy { - case DailyStrategy: - _, err := os.Stat(config.Common.LogDir) - Mode.logToFile = !os.IsNotExist(err) - Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet - case StdoutStrategy: - fallthrough - default: - Mode.logToFile = !Mode.Server - Mode.logToStdout = true - } - - fqdn, err := os.Hostname() - if err != nil { - panic(err) - } - s := strings.Split(fqdn, ".") - hostname = s[0] - - pauseCh = make(chan struct{}) - resumeCh = make(chan struct{}) - - // Setup logrotation - rotateCh = make(chan os.Signal, 1) - signal.Notify(rotateCh, syscall.SIGHUP) - - if Mode.logToStdout { - stdoutBufCh = make(chan string, runtime.NumCPU()*100) - go writeToStdout(ctx) - } - - if Mode.logToFile { - fileLogBufCh = make(chan buf, runtime.NumCPU()*100) - go writeToFile(ctx) - } -} - -// Info message logging. -func Info(args ...interface{}) string { - if Mode.Server { - return log(serverStr, infoStr, args) - } - - return log(clientStr, infoStr, args) -} - -// Warn message logging. -func Warn(args ...interface{}) string { - if !Mode.Quiet { - if Mode.Server { - return log(serverStr, warnStr, args) - } - return log(clientStr, warnStr, args) - } - - return "" -} - -// Error message logging. -func Error(args ...interface{}) string { - if Mode.Server { - return log(serverStr, errorStr, args) - } - - return log(clientStr, errorStr, args) -} - -// Fatal message logging. -func Fatal(args ...interface{}) string { - if Mode.Server { - return log(serverStr, fatalStr, args) - } - - return log(clientStr, fatalStr, args) -} - -// FatalExit logs an error and exists the process. -func FatalExit(args ...interface{}) { - what := clientStr - if Mode.Server { - what = serverStr - } - log(what, fatalStr, args) - - time.Sleep(time.Second) - mutex.Lock() - defer mutex.Unlock() - - closeWriter() - os.Exit(3) -} - -// Debug message logging. -func Debug(args ...interface{}) string { - if Mode.Debug { - if Mode.Server { - return log(serverStr, debugStr, args) - } - return log(clientStr, debugStr, args) - } - - return "" -} - -// Trace message logging. -func Trace(args ...interface{}) string { - if Mode.Trace { - if Mode.Server { - return log(serverStr, traceStr, args) - } - return log(clientStr, traceStr, args) - } - - return "" -} - -// Write log line to buffer and/or log file. -func write(what, severity, message string) { - if Mode.logToStdout { - line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message) - - if color.Colored { - line = color.Colorfy(line) - } - - stdoutBufCh <- line - } - - if Mode.logToFile { - t := time.Now() - timeStr := t.Format("20060102-150405") - fileLogBufCh <- buf{ - time: t, - message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message), - } - } -} - -// Generig log message. -func log(what string, severity string, args []interface{}) string { - if Mode.Nothing { - return "" - } - - messages := []string{} - - for _, arg := range args { - switch v := arg.(type) { - case string: - messages = append(messages, v) - case int: - messages = append(messages, fmt.Sprintf("%d", v)) - case error: - messages = append(messages, v.Error()) - default: - messages = append(messages, fmt.Sprintf("%v", v)) - } - } - - message := strings.Join(messages, "|") - write(what, severity, message) - - return fmt.Sprintf("%s|%s", severity, message) -} - -// Raw message logging. -func Raw(message string) { - if Mode.Nothing { - return - } - - if Mode.logToFile { - fileLogBufCh <- buf{time.Now(), message} - } - - if Mode.logToStdout { - if color.Colored { - message = color.Colorfy(message) - } - stdoutBufCh <- message - } -} - -// Close log writer (e.g. on change of day). -func closeWriter() { - if writer != nil { - writer.Flush() - fd.Close() - } -} - -// Return the correct log file writer -func fileWriter(dateStr string) *bufio.Writer { - if dateStr != lastDateStr { - return updateFileWriter(dateStr) - } - - // Check for log rotation signal - select { - case <-rotateCh: - stdoutWriter.WriteString("Received signal for logrotation\n") - return updateFileWriter(dateStr) - default: - } - - return writer -} - -// Update log file writer -func updateFileWriter(dateStr string) *bufio.Writer { - // Detected change of day. Close current writer and create a new one. - mutex.Lock() - defer mutex.Unlock() - closeWriter() - - if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) { - if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil { - panic(err) - } - } - - logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr) - newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - panic(err) - } - - fd = newFd - writer = bufio.NewWriterSize(fd, 1) - lastDateStr = dateStr - - return writer -} - -// Flush all outstanding lines. -func Flush() { - for { - select { - case message := <-stdoutBufCh: - stdoutWriter.WriteString(message) - default: - stdoutWriter.Flush() - return - } - } -} - -func writeToStdout(ctx context.Context) { - for { - select { - case message := <-stdoutBufCh: - stdoutWriter.WriteString(message) - case <-time.After(time.Millisecond * 100): - stdoutWriter.Flush() - case <-pauseCh: - PAUSE: - for { - select { - case <-stdoutBufCh: - case <-resumeCh: - break PAUSE - case <-ctx.Done(): - return - } - } - case <-ctx.Done(): - Flush() - return - } - } -} - -func writeToFile(ctx context.Context) { - for { - select { - case buf := <-fileLogBufCh: - dateStr := buf.time.Format("20060102") - w := fileWriter(dateStr) - w.WriteString(buf.message) - case <-pauseCh: - PAUSE: - for { - select { - case <-stdoutBufCh: - case <-resumeCh: - break PAUSE - case <-ctx.Done(): - return - } - } - case <-ctx.Done(): - return - } - } -} - -// Pause logging. -func Pause() { - if Mode.logToStdout { - pauseCh <- struct{}{} - } - if Mode.logToFile { - pauseCh <- struct{}{} - } -} - -// Resume logging (after pausing). -func Resume() { - if Mode.logToStdout { - resumeCh <- struct{}{} - } - if Mode.logToFile { - resumeCh <- struct{}{} - } -} diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go deleted file mode 100644 index 8864179..0000000 --- a/internal/io/logger/modes.go +++ /dev/null @@ -1,12 +0,0 @@ -package logger - -// Modes specifies the logging mode. -type Modes struct { - Server bool - Trace bool - Debug bool - Nothing bool - Quiet bool - logToStdout bool - logToFile bool -} diff --git a/internal/io/logger/strategy.go b/internal/io/logger/strategy.go deleted file mode 100644 index 44bf393..0000000 --- a/internal/io/logger/strategy.go +++ /dev/null @@ -1,22 +0,0 @@ -package logger - -import "github.com/mimecast/dtail/internal/config" - -// Strategy allows to specify a log rotation strategy. -type Strategy int - -// Possible log strategies. -const ( - NormalStrategy Strategy = iota - DailyStrategy Strategy = iota - StdoutStrategy Strategy = iota -) - -func logStrategy() Strategy { - switch config.Common.LogStrategy { - case "daily": - return DailyStrategy - default: - } - return StdoutStrategy -} diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go new file mode 100644 index 0000000..89fcf81 --- /dev/null +++ b/internal/io/pool/builder.go @@ -0,0 +1,21 @@ +package pool + +import ( + "strings" + "sync" +) + +// BuilderBuffer is there to optimize memory allocations (DTail allocates a lot +// of memory while reading log data otherwise) +var BuilderBuffer = sync.Pool{ + New: func() interface{} { + sb := strings.Builder{} + return &sb + }, +} + +// RecycleBuilderBuffer recycles the buffer again. +func RecycleBuilderBuffer(sb *strings.Builder) { + sb.Reset() + BuilderBuffer.Put(sb) +} diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go new file mode 100644 index 0000000..3d48f2c --- /dev/null +++ b/internal/io/pool/bytesbuffer.go @@ -0,0 +1,22 @@ +package pool + +import ( + "bytes" + "sync" +) + +// BytesBuffer is there to optimize memory allocations. DTail otherwise allocates +// a lot of memory while reading logs. +var BytesBuffer = sync.Pool{ + New: func() interface{} { + b := bytes.Buffer{} + b.Grow(128) + return &b + }, +} + +// RecycleBytesBuffer recycles the buffer again. +func RecycleBytesBuffer(b *bytes.Buffer) { + b.Reset() + BytesBuffer.Put(b) +} diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go index 36ebdb5..e82132d 100644 --- a/internal/io/prompt/prompt.go +++ b/internal/io/prompt/prompt.go @@ -6,7 +6,7 @@ import ( "os" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // Answer is a user input of a prompt question. @@ -19,7 +19,8 @@ type Answer struct { Callback func() // Runs after Callback and after logging resumes EndCallback func() - AskAgain bool + // AskAgain can be used to not to ask again about the question. + AskAgain bool } // Prompt used for interactive user input. @@ -30,7 +31,6 @@ type Prompt struct { func (p *Prompt) askString() string { var sb strings.Builder - sb.WriteString(p.question) sb.WriteString("? (") @@ -41,7 +41,6 @@ func (p *Prompt) askString() string { sb.WriteString(strings.Join(ax, ",")) sb.WriteString("): ") - return sb.String() } @@ -58,7 +57,7 @@ func (p *Prompt) Add(answer Answer) { // Ask a question. func (p *Prompt) Ask() { reader := bufio.NewReader(os.Stdin) - logger.Pause() + dlog.Common.Pause() for { fmt.Print(p.askString()) @@ -68,9 +67,8 @@ func (p *Prompt) Ask() { if a.Callback != nil { a.Callback() } - if !a.AskAgain { - logger.Resume() + dlog.Common.Resume() if a.EndCallback != nil { a.EndCallback() } @@ -90,6 +88,5 @@ func (p *Prompt) answer(answerStr string) (*Answer, bool) { default: } } - return nil, false } diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go index 500c530..584b59c 100644 --- a/internal/io/signal/signal.go +++ b/internal/io/signal/signal.go @@ -14,10 +14,8 @@ import ( func InterruptCh(ctx context.Context) <-chan string { sigIntCh := make(chan os.Signal) gosignal.Notify(sigIntCh, os.Interrupt) - sigOtherCh := make(chan os.Signal) gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT) - statsCh := make(chan string) go func() { @@ -41,6 +39,10 @@ func InterruptCh(ctx context.Context) <-chan string { } } }() - return statsCh } + +// NoCh doesn't listen on a signal. +func NoCh(ctx context.Context) <-chan string { + return make(chan string) +} |
