summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/dlog/dlog.go272
-rw-r--r--internal/io/dlog/level.go84
-rw-r--r--internal/io/dlog/loggers/factory.go54
-rw-r--r--internal/io/dlog/loggers/file.go165
-rw-r--r--internal/io/dlog/loggers/fout.go46
-rw-r--r--internal/io/dlog/loggers/logger.go19
-rw-r--r--internal/io/dlog/loggers/none.go21
-rw-r--r--internal/io/dlog/loggers/stdout.go54
-rw-r--r--internal/io/dlog/loggers/strategy.go35
-rw-r--r--internal/io/dlog/rotation.go27
-rw-r--r--internal/io/fs/catfile.go4
-rw-r--r--internal/io/fs/filereader.go6
-rw-r--r--internal/io/fs/filter.go167
-rw-r--r--internal/io/fs/permissions/permission.go4
-rw-r--r--internal/io/fs/permissions/permission_linuxacl.go2
-rw-r--r--internal/io/fs/readfile.go384
-rw-r--r--internal/io/fs/tailfile.go4
-rw-r--r--internal/io/fs/truncate.go61
-rw-r--r--internal/io/line/line.go5
-rw-r--r--internal/io/logger/logger.go403
-rw-r--r--internal/io/logger/modes.go12
-rw-r--r--internal/io/logger/strategy.go22
-rw-r--r--internal/io/pool/builder.go21
-rw-r--r--internal/io/pool/bytesbuffer.go22
-rw-r--r--internal/io/prompt/prompt.go13
-rw-r--r--internal/io/signal/signal.go8
26 files changed, 1143 insertions, 772 deletions
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
new file mode 100644
index 0000000..5e0c3a1
--- /dev/null
+++ b/internal/io/dlog/dlog.go
@@ -0,0 +1,272 @@
+package dlog
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color/brush"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+ "github.com/mimecast/dtail/internal/source"
+)
+
+// Client is the log handler for the client packages.
+var Client *DLog
+
+// Server is the log handler for the server packages.
+var Server *DLog
+
+// Common is the log handler for all other packages.
+var Common *DLog
+
+var mutex sync.Mutex
+var started bool
+
+// Start logger(s).
+func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) {
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ if started {
+ Common.FatalPanic("Logger already started")
+ }
+
+ Client = new(sourceProcess, source.Client)
+ Server = new(sourceProcess, source.Server)
+ Common = Client
+ if sourceProcess == source.Server {
+ Common = Server
+ }
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ go Client.start(ctx, &wg2)
+ go Server.start(ctx, &wg2)
+
+ go rotation(ctx)
+ go func() {
+ wg2.Wait()
+ wg.Done()
+ }()
+
+ started = true
+}
+
+// DLog is the DTail logger.
+type DLog struct {
+ logger loggers.Logger
+ // Is this a DTail server or client process logging?
+ sourceProcess source.Source
+ // Is this a DTail server or client package logging? In serverless mode
+ // the client can also execute code from the server package.
+ sourcePackage source.Source
+ // Max log level to log.
+ maxLevel level
+ // Current hostname.
+ hostname string
+}
+
+// new creates a new DTail logger.
+func new(sourceProcess, sourcePackage source.Source) *DLog {
+ hostname, err := os.Hostname()
+ if err != nil {
+ panic(err)
+ }
+ logRotation := loggers.NewStrategy(config.Common.LogRotation)
+ loggerName := config.Common.Logger
+ level := newLevel(config.Common.LogLevel)
+
+ return &DLog{
+ logger: loggers.Factory(sourceProcess.String(), loggerName, logRotation),
+ sourceProcess: sourceProcess,
+ sourcePackage: sourcePackage,
+ maxLevel: level,
+ hostname: hostname,
+ }
+}
+
+func (d *DLog) start(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ var wg2 sync.WaitGroup
+ wg2.Add(1)
+ d.logger.Start(ctx, &wg2)
+ <-ctx.Done()
+ wg2.Wait()
+}
+
+func (d *DLog) log(level level, args []interface{}) string {
+ if d.maxLevel < level {
+ return ""
+ }
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+ now := time.Now()
+
+ switch d.sourceProcess {
+ case source.Client:
+ sb.WriteString(d.sourcePackage.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(d.hostname)
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(level.String())
+ default:
+ sb.WriteString(level.String())
+ sb.WriteString(protocol.FieldDelimiter)
+ sb.WriteString(now.Format("20060102-150405"))
+ }
+ sb.WriteString(protocol.FieldDelimiter)
+ d.writeArgStrings(sb, args)
+
+ message := sb.String()
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(now, message)
+ return message
+ }
+
+ d.logger.LogWithColors(now, message, brush.Colorfy(message))
+ return message
+}
+
+func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) {
+ for i, arg := range args {
+ if i > 0 {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ switch v := arg.(type) {
+ case string:
+ sb.WriteString(v)
+ case error:
+ sb.WriteString(v.Error())
+ default:
+ sb.WriteString(fmt.Sprintf("%v", v))
+ }
+ }
+}
+
+// FatalPanic terminates the process with a fatal error.
+func (d *DLog) FatalPanic(args ...interface{}) {
+ d.log(Fatal, args)
+ d.Flush()
+
+ var sb strings.Builder
+ d.writeArgStrings(&sb, args)
+ panic(sb.String())
+}
+
+// Fatal logs a fatal error.
+func (d *DLog) Fatal(args ...interface{}) string {
+ return d.log(Fatal, args)
+}
+
+// Error logging.
+func (d *DLog) Error(args ...interface{}) string {
+ return d.log(Error, args)
+}
+
+// Warn logs a warning message.
+func (d *DLog) Warn(args ...interface{}) string {
+ return d.log(Warn, args)
+}
+
+// Info logging.
+func (d *DLog) Info(args ...interface{}) string {
+ return d.log(Info, args)
+}
+
+// Verbose logging.
+func (d *DLog) Verbose(args ...interface{}) string {
+ return d.log(Verbose, args)
+}
+
+// Debug logging.
+func (d *DLog) Debug(args ...interface{}) string {
+ return d.log(Debug, args)
+}
+
+// Trace logging.
+func (d *DLog) Trace(args ...interface{}) string {
+ _, file, line, _ := runtime.Caller(1)
+ args = append(args, fmt.Sprintf("at %s:%d", file, line))
+ return d.log(Trace, args)
+}
+
+// Devel used for development purpose only logging (e.g. "print" debugging).
+func (d *DLog) Devel(args ...interface{}) string {
+ _, file, line, _ := runtime.Caller(1)
+ args = append(args, fmt.Sprintf("at %s:%d", file, line))
+ return d.log(Devel, args)
+}
+
+// Raw message logging.
+func (d *DLog) Raw(message string) string {
+ if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
+ d.logger.Log(time.Now(), message)
+ return message
+ }
+ d.logger.LogWithColors(time.Now(), message, brush.Colorfy(message))
+ return message
+}
+
+// Mapreduce logging.
+func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
+ args := make([]interface{}, len(data)+1)
+
+ if d.sourceProcess == source.Server {
+ // level|date-time|process|caller|cpus|goroutines|cgocalls|loadavg|uptime|MAPREDUCE:TABLE|key=value|...
+
+ var loadAvg string
+ if loadAvgBytes, err := ioutil.ReadFile("/proc/loadavg"); err == nil {
+ tmp := string(loadAvgBytes)
+ s := strings.SplitN(tmp, " ", 2)
+ loadAvg = s[0]
+ }
+
+ var uptime string
+ if uptimeBytes, err := ioutil.ReadFile("/proc/uptime"); err == nil {
+ tmp := string(uptimeBytes)
+ s := strings.SplitN(tmp, ".", 2)
+ i, _ := strconv.ParseInt(s[0], 10, 64)
+ t := time.Duration(i) * time.Second
+ uptime = fmt.Sprintf("%v", t)
+ }
+
+ _, file, line, _ := runtime.Caller(1)
+ args[0] = fmt.Sprintf("%d|%s:%d|%d|%d|%d|%s|%s|MAPREDUCE:%s",
+ os.Getpid(),
+ filepath.Base(file), line,
+ runtime.NumCPU(),
+ runtime.NumGoroutine(),
+ runtime.NumCgoCall(),
+ loadAvg,
+ uptime,
+ strings.ToUpper(table))
+ } else {
+ args[0] = fmt.Sprintf("STATS:%s", strings.ToUpper(table))
+ }
+
+ i := 1
+ for k, v := range data {
+ args[i] = fmt.Sprintf("%s=%v", k, v)
+ i++
+ }
+ return d.log(Info, args)
+}
+
+// Flush the log buffers.
+func (d *DLog) Flush() { d.logger.Flush() }
+
+// Pause the logging.
+func (d *DLog) Pause() { d.logger.Pause() }
+
+// Resume the logging.
+func (d *DLog) Resume() { d.logger.Resume() }
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
new file mode 100644
index 0000000..05d9ed9
--- /dev/null
+++ b/internal/io/dlog/level.go
@@ -0,0 +1,84 @@
+package dlog
+
+import (
+ "fmt"
+ "strings"
+)
+
+type level int
+
+// Available log levels.
+const (
+ None level = iota
+ Fatal level = iota
+ Error level = iota
+ Warn level = iota
+ Info level = iota
+ Default level = iota
+ Verbose level = iota
+ Debug level = iota
+ Devel level = iota
+ Trace level = iota
+ All level = iota
+)
+
+var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug,
+ Devel, Trace, All}
+
+func newLevel(l string) level {
+ switch strings.ToLower(l) {
+ case "none":
+ return None
+ case "fatal":
+ return Fatal
+ case "error":
+ return Error
+ case "warn":
+ return Warn
+ case "info":
+ return Info
+ case "":
+ fallthrough
+ case "default":
+ return Default
+ case "verbose":
+ return Verbose
+ case "debug":
+ return Debug
+ case "devel":
+ return Devel
+ case "trace":
+ return Trace
+ case "all":
+ return All
+ }
+ panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels))
+}
+
+func (l level) String() string {
+ switch l {
+ case None:
+ return "NONE"
+ case Fatal:
+ return "FATAL"
+ case Error:
+ return "ERROR"
+ case Warn:
+ return "WARN"
+ case Info:
+ return "INFO"
+ case Default:
+ return "DEFAULT"
+ case Verbose:
+ return "VERBOSE"
+ case Debug:
+ return "DEBUG"
+ case Devel:
+ return "DEVEL"
+ case Trace:
+ return "TRACE"
+ case All:
+ return "ALL"
+ }
+ panic("Unknown log level " + fmt.Sprintf("%d", l))
+}
diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go
new file mode 100644
index 0000000..a5cc7cf
--- /dev/null
+++ b/internal/io/dlog/loggers/factory.go
@@ -0,0 +1,54 @@
+package loggers
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+)
+
+var factoryMap map[string]Logger
+var factoryMutex sync.Mutex
+
+// Factory is there to retrieve a logger based on various settings.
+func Factory(sourceName, loggerName string, logRotation Strategy) Logger {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+
+ id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName,
+ logRotation.FileBase, loggerName)
+ if factoryMap == nil {
+ factoryMap = make(map[string]Logger)
+ }
+
+ singleton, ok := factoryMap[id]
+ if !ok {
+ switch strings.ToLower(loggerName) {
+ case "none":
+ singleton = none{}
+ case "stdout":
+ singleton = newStdout()
+ factoryMap[id] = singleton
+ case "file":
+ singleton = newFile(logRotation)
+ factoryMap[id] = singleton
+ case "fout":
+ singleton = newFout(logRotation)
+ factoryMap[id] = singleton
+ default:
+ panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName))
+ }
+ }
+ return singleton
+}
+
+// FactoryRotate invokes a log rotation of all loggers.
+func FactoryRotate() {
+ factoryMutex.Lock()
+ defer factoryMutex.Unlock()
+ if factoryMap == nil {
+ return
+ }
+ for _, logger := range factoryMap {
+ logger.Rotate()
+ }
+}
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
new file mode 100644
index 0000000..94824fe
--- /dev/null
+++ b/internal/io/dlog/loggers/file.go
@@ -0,0 +1,165 @@
+package loggers
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+)
+
+type fileWriter struct{}
+
+type fileMessageBuf struct {
+ now time.Time
+ message string
+}
+
+type file struct {
+ bufferCh chan *fileMessageBuf
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+ rotateCh chan struct{}
+ flushCh chan struct{}
+ fd *os.File
+ writer *bufio.Writer
+ mutex sync.Mutex
+ started bool
+ lastFileName string
+ strategy Strategy
+}
+
+func newFile(strategy Strategy) *file {
+ return &file{
+ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100),
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ rotateCh: make(chan struct{}),
+ flushCh: make(chan struct{}),
+ strategy: strategy,
+ }
+}
+
+func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) {
+ f.mutex.Lock()
+ defer func() {
+ f.started = true
+ f.mutex.Unlock()
+ }()
+
+ if f.started {
+ // Logger already started from another Goroutine.
+ wg.Done()
+ return
+ }
+
+ pause := func(ctx context.Context) {
+ select {
+ case <-f.resumeCh:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case m := <-f.bufferCh:
+ f.write(m)
+ case <-f.pauseCh:
+ pause(ctx)
+ case <-f.flushCh:
+ f.flush()
+ case <-ctx.Done():
+ f.flush()
+ f.fd.Close()
+ return
+ }
+ }
+ }()
+}
+
+func (f *file) Log(now time.Time, message string) {
+ f.bufferCh <- &fileMessageBuf{now, message}
+}
+
+func (f *file) LogWithColors(now time.Time, message, coloredMessage string) {
+ panic("Colors not supported in file logger")
+}
+
+func (f *file) Pause() { f.pauseCh <- struct{}{} }
+func (f *file) Resume() { f.resumeCh <- struct{}{} }
+func (f *file) Flush() { f.flushCh <- struct{}{} }
+
+func (f *file) Rotate() { f.rotateCh <- struct{}{} }
+func (*file) SupportsColors() bool { return false }
+
+func (f *file) write(m *fileMessageBuf) {
+ select {
+ case <-f.rotateCh:
+ // Force re-opening the outfile next time in getWriter.
+ f.lastFileName = ""
+ default:
+ }
+
+ var writer *bufio.Writer
+ if f.strategy.Rotation == DailyRotation {
+ writer = f.getWriter(m.now.Format("20060102"))
+ } else {
+ writer = f.getWriter(f.strategy.FileBase)
+ }
+
+ writer.WriteString(m.message)
+ writer.WriteByte('\n')
+}
+
+func (f *file) getWriter(name string) *bufio.Writer {
+ if f.lastFileName == name {
+ return f.writer
+ }
+ if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
+ if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
+ panic(err)
+ }
+ }
+
+ logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name)
+ newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ panic(err)
+ }
+
+ // Close old writer.
+ if f.fd != nil {
+ f.writer.Flush()
+ f.fd.Close()
+ }
+ // Set new writer.
+ f.fd = newFd
+ f.writer = bufio.NewWriterSize(f.fd, 1)
+ f.lastFileName = name
+
+ return f.writer
+}
+
+func (f *file) flush() {
+ defer func() {
+ if f.writer != nil {
+ f.writer.Flush()
+ }
+ }()
+ for {
+ select {
+ case m := <-f.bufferCh:
+ f.write(m)
+ default:
+ return
+ }
+ }
+}
diff --git a/internal/io/dlog/loggers/fout.go b/internal/io/dlog/loggers/fout.go
new file mode 100644
index 0000000..60c318d
--- /dev/null
+++ b/internal/io/dlog/loggers/fout.go
@@ -0,0 +1,46 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+type fout struct {
+ file *file
+ stdout *stdout
+}
+
+// Logs to both, a file and stdout
+func newFout(strategy Strategy) *fout {
+ return &fout{file: newFile(strategy), stdout: newStdout()}
+}
+
+func (f *fout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ go func() {
+ defer wg.Done()
+
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ f.file.Start(ctx, &wg2)
+ f.stdout.Start(ctx, &wg2)
+ wg2.Wait()
+ }()
+}
+
+func (f *fout) Log(now time.Time, message string) {
+ f.stdout.Log(now, message)
+ f.file.Log(now, message)
+}
+
+func (f *fout) LogWithColors(now time.Time, message, coloredMessage string) {
+ f.stdout.LogWithColors(now, "", coloredMessage)
+ f.file.Log(now, message)
+}
+
+func (f *fout) Flush() { f.stdout.Flush(); f.file.Flush() }
+func (f *fout) Pause() { f.stdout.Pause(); f.file.Pause() }
+func (f *fout) Resume() { f.stdout.Resume(); f.file.Resume() }
+func (f *fout) Rotate() { f.file.Rotate() }
+
+func (fout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go
new file mode 100644
index 0000000..d4e85de
--- /dev/null
+++ b/internal/io/dlog/loggers/logger.go
@@ -0,0 +1,19 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+// Logger is there to plug in your own log implementation.
+type Logger interface {
+ Log(now time.Time, message string)
+ LogWithColors(now time.Time, message, messageWithColors string)
+ Start(ctx context.Context, wg *sync.WaitGroup)
+ Flush()
+ Pause()
+ Resume()
+ Rotate()
+ SupportsColors() bool
+}
diff --git a/internal/io/dlog/loggers/none.go b/internal/io/dlog/loggers/none.go
new file mode 100644
index 0000000..270027f
--- /dev/null
+++ b/internal/io/dlog/loggers/none.go
@@ -0,0 +1,21 @@
+package loggers
+
+import (
+ "context"
+ "sync"
+ "time"
+)
+
+// don't log anything
+type none struct{}
+
+func (none) Start(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }
+func (none) Log(now time.Time, message string) {}
+
+func (none) LogWithColors(now time.Time, message, coloredMessage string) {}
+
+func (none) Flush() {}
+func (none) Pause() {}
+func (none) Resume() {}
+func (none) Rotate() {}
+func (none) SupportsColors() bool { return false }
diff --git a/internal/io/dlog/loggers/stdout.go b/internal/io/dlog/loggers/stdout.go
new file mode 100644
index 0000000..05485c6
--- /dev/null
+++ b/internal/io/dlog/loggers/stdout.go
@@ -0,0 +1,54 @@
+package loggers
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+)
+
+type stdout struct {
+ pauseCh chan struct{}
+ resumeCh chan struct{}
+ mutex sync.Mutex
+}
+
+func newStdout() *stdout {
+ return &stdout{
+ pauseCh: make(chan struct{}),
+ resumeCh: make(chan struct{}),
+ }
+}
+
+func (s *stdout) Start(ctx context.Context, wg *sync.WaitGroup) {
+ wg.Done()
+}
+
+func (s *stdout) Log(now time.Time, message string) {
+ s.log(message)
+}
+
+func (s *stdout) LogWithColors(now time.Time, message, coloredMessage string) {
+ s.log(coloredMessage)
+}
+
+func (s *stdout) log(message string) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ select {
+ case <-s.pauseCh:
+ // Pause until resumed.
+ <-s.resumeCh
+ default:
+ }
+
+ fmt.Println(message)
+}
+
+func (s *stdout) Pause() { s.pauseCh <- struct{}{} }
+func (s *stdout) Resume() { s.resumeCh <- struct{}{} }
+func (s *stdout) Flush() {}
+func (s *stdout) Rotate() {}
+
+func (stdout) SupportsColors() bool { return true }
diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go
new file mode 100644
index 0000000..48e7d44
--- /dev/null
+++ b/internal/io/dlog/loggers/strategy.go
@@ -0,0 +1,35 @@
+package loggers
+
+import (
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+// Rotation is the actual strategy used for log rotation..
+type Rotation int
+
+const (
+ // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP.
+ DailyRotation Rotation = iota
+ // SignalRotation tells DTail to rotate its logs only on SIGHUP.
+ SignalRotation Rotation = iota
+)
+
+// Strategy is a pair of the rotation and the file base.
+type Strategy struct {
+ // Rotation is the actual rotation strategy used.
+ Rotation Rotation
+ // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used.
+ FileBase string
+}
+
+// NewStrategy returns the stratey based on its name.
+func NewStrategy(name string) Strategy {
+ switch strings.ToLower(name) {
+ case "daily":
+ return Strategy{DailyRotation, ""}
+ default:
+ return Strategy{SignalRotation, filepath.Base(os.Args[0])}
+ }
+}
diff --git a/internal/io/dlog/rotation.go b/internal/io/dlog/rotation.go
new file mode 100644
index 0000000..15ce1fd
--- /dev/null
+++ b/internal/io/dlog/rotation.go
@@ -0,0 +1,27 @@
+package dlog
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/mimecast/dtail/internal/io/dlog/loggers"
+)
+
+func rotation(ctx context.Context) {
+ rotateCh := make(chan os.Signal, 1)
+ signal.Notify(rotateCh, syscall.SIGHUP)
+ go func() {
+ for {
+ select {
+ case <-rotateCh:
+ Common.Debug("Invoking log rotation")
+ loggers.FactoryRotate()
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index 7f387bc..01c15ba 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,7 +6,9 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile {
+func NewCatFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) CatFile {
+
return CatFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index efd410e..b05fd39 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -8,9 +8,11 @@ import (
"github.com/mimecast/dtail/internal/regex"
)
-// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
+// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
+// a file.
type FileReader interface {
- Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line,
+ re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go
deleted file mode 100644
index c4f605e..0000000
--- a/internal/io/fs/filter.go
+++ /dev/null
@@ -1,167 +0,0 @@
-package fs
-
-import (
- "context"
-
- "github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/lcontext"
- "github.com/mimecast/dtail/internal/regex"
-)
-
-func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
- // Do we have any kind of local context settings? If so then run the more complex
- // filterWithLContext method.
- if lContext.Has() {
- // We can not skip transmitting any lines to the client with a local
- // grep context specified.
- f.canSkipLines = false
- f.filterWithLContext(ctx, rawLines, lines, re, lContext)
- return
- }
-
- f.filterWithoutLContext(ctx, rawLines, lines, re)
-}
-
-// Filter log lines matching a given regular expression, however with local grep context.
-func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
- // Scenario 1: Finish once maxCount hits found
- maxCount := lContext.MaxCount
- processMaxCount := maxCount > 0
- maxReached := false
-
- // Scenario 2: Print prev. N lines when current line matches.
- before := lContext.BeforeContext
- processBefore := before > 0
- var beforeBuf chan []byte
- if processBefore {
- beforeBuf = make(chan []byte, before)
- }
-
- // Screnario 3: Print next N lines when current line matches.
- after := 0
- processAfter := lContext.AfterContext > 0
-
- for rawLine := range rawLines {
- // logger.Debug("rawLine", string(rawLine))
- f.updatePosition()
-
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
-
- if processAfter && after > 0 {
- after--
- myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return
- }
-
- } else if processBefore {
- // Keep last num BeforeContext raw messages.
- select {
- case beforeBuf <- rawLine:
- default:
- <-beforeBuf
- beforeBuf <- rawLine
- }
- }
- continue
- }
-
- f.updateLineMatched()
-
- if processAfter {
- if maxReached {
- return
- }
- after = lContext.AfterContext
- }
-
- if processBefore {
- i := uint64(len(beforeBuf))
- for {
- select {
- case myRawLine := <-beforeBuf:
- myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100}
- i--
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return
- }
- default:
- // beforeBuf is now empty.
- }
- if len(beforeBuf) == 0 {
- break
- }
- }
- }
-
- line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
-
- select {
- case lines <- line:
- if processMaxCount {
- maxCount--
- if maxCount == 0 {
- if !processAfter || after == 0 {
- return
- }
- // Unfortunatley we have to continue filter, as there might be more lines to print
- maxReached = true
- }
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-// Filter log lines matching a given regular expression, there is no local grep context specified.
-func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) {
- continue
- }
-
- line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()}
-
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- }
-}
-
-func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool {
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- // Regex dosn't match, so not interested in it.
- return true
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- // Matching, not transmittable
- return true
- }
- f.updateLineTransmitted()
-
- // Matching, transmittable
- return false
-}
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
index cc5dd9b..d621c09 100644
--- a/internal/io/fs/permissions/permission.go
+++ b/internal/io/fs/permissions/permission.go
@@ -3,12 +3,12 @@
package permissions
import (
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ToRead is to check whether user has read permissions to a given file.
func ToRead(user, filePath string) (bool, error) {
// Only implemented for Linux, always expect true
- logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
+ dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in")
return true, nil
}
diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go
index 7d2d7ca..904b90f 100644
--- a/internal/io/fs/permissions/permission_linuxacl.go
+++ b/internal/io/fs/permissions/permission_linuxacl.go
@@ -13,7 +13,7 @@ import (
"unsafe"
)
-// ToRead checks whether user has Linux file system permissions to read a given file.
+// ToRead checks whether user has Linux file system permissions to read a file.
func ToRead(user, filePath string) (bool, error) {
cUser := C.CString(user)
cFilePath := C.CString(filePath)
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 161e3f0..28cbe58 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -2,16 +2,20 @@ package fs
import (
"bufio"
+ "bytes"
"compress/gzip"
"context"
+ "errors"
"fmt"
"io"
"os"
"strings"
+ "sync"
"time"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
@@ -37,31 +41,10 @@ type readFile struct {
limiter chan struct{}
}
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
-
- return
-}
-
// String returns the string representation of the readFile
func (f readFile) String() string {
- return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
+ return fmt.Sprintf(
+ "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
f.filePath,
f.globID,
f.retry,
@@ -80,8 +63,10 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
- logger.Debug("readFile", f)
+func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
+ lines chan<- line.Line, re regex.Regex) error {
+
+ dlog.Common.Debug("readFile", f)
defer func() {
select {
case <-f.limiter:
@@ -93,7 +78,8 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
case f.limiter <- struct{}{}:
default:
select {
- case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID,
+ "Server limit reached. Queuing file..."):
case <-ctx.Done():
return nil
}
@@ -110,111 +96,335 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
fd.Seek(0, io.SeekEnd)
}
- rawLines := make(chan []byte, 100)
+ rawLines := make(chan *bytes.Buffer, 100)
+ truncate := make(chan struct{})
+
readCtx, readCancel := context.WithCancel(ctx)
+ var filterWg sync.WaitGroup
+ filterWg.Add(1)
- filterDone := make(chan struct{})
+ go f.periodicTruncateCheck(ctx, truncate)
go func() {
- f.filter(ctx, rawLines, lines, re, lContext)
- close(filterDone)
+ f.filter(ctx, ltx, rawLines, lines, re)
+ filterWg.Done()
// If the filter stopped, make the reader stop too, no need to read
// more data if there is nothing more the filter wants to filter for!
// E.g. it could be that we only want to filter N matches but not more.
readCancel()
}()
- err = f.read(readCtx, fd, rawLines)
+ err = f.read(readCtx, fd, rawLines, truncate)
close(rawLines)
-
- // Filter may flushes some data still. So wait until it is done here.
- <-filterDone
+ // Filter may sends some data still. So wait until it is done here.
+ filterWg.Wait()
return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
- var offset uint64
+func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case truncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ dlog.Common.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ dlog.Common.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+ return
+}
+
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
+ var offset uint64
reader, err := f.makeReader(fd)
if err != nil {
return err
}
- rawLine := make([]byte, 0, 512)
lineLengthThreshold := 1024 * 1024 // 1mb
- longLineWarning := false
-
- checkTruncate := f.truncateTimer(ctx)
+ warnedAboutLongLine := false
+ message := pool.BytesBuffer.Get().(*bytes.Buffer)
for {
- select {
- case <-ctx.Done():
- return nil
- default:
- }
-
- select {
- case <-checkTruncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return err
- }
- logger.Info(f.filePath, "Current offset", offset)
- default:
- }
-
- // Read some bytes (max 4k at once as of go 1.12). isPrefix will
- // be set if line does not fit into 4k buffer.
- bytes, isPrefix, err := reader.ReadLine()
+ b, err := reader.ReadByte()
if err != nil {
- // If EOF, sleep a couple of ms and return with nil error.
- // If other error, return with non-nil error.
if err != io.EOF {
return err
}
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ case <-ctx.Done():
+ return nil
+ default:
+ }
if !f.seekEOF {
- logger.Debug(f.FilePath(), "End of file reached")
+ dlog.Common.Info(f.FilePath(), "End of file reached")
return nil
}
time.Sleep(time.Millisecond * 100)
continue
}
+ offset++
- rawLine = append(rawLine, bytes...)
- offset += uint64(len(bytes))
-
- if !isPrefix {
- // last LineRead call returned contend until end of line.
- rawLine = append(rawLine, '\n')
+ switch b {
+ case '\n':
select {
- case rawLines <- rawLine:
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
+ warnedAboutLongLine = false
case <-ctx.Done():
return nil
}
- rawLine = make([]byte, 0, 512)
- if longLineWarning {
- longLineWarning = false
+ default:
+ if message.Len() >= lineLengthThreshold {
+ if !warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines")
+ warnedAboutLongLine = true
+ }
+ message.WriteString("\n")
+ select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ case <-ctx.Done():
+ return nil
+ }
+ }
+ message.WriteByte(b)
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if ltx.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, ltx, rawLines, lines, re)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
+ lines chan<- line.Line, re regex.Regex) {
+
+ for {
+ select {
+ case line, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+ if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+ select {
+ case lines <- filteredLine:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := ltx.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := ltx.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan *bytes.Buffer
+ if processBefore {
+ beforeBuf = make(chan *bytes.Buffer, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := ltx.AfterContext > 0
+
+ for lineBytesBuffer := range rawLines {
+ f.updatePosition()
+
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- lineBytesBuffer:
+ default:
+ pool.RecycleBytesBuffer(<-beforeBuf)
+ beforeBuf <- lineBytesBuffer
+ }
}
continue
}
- // Last LineRead call could not read content until end of line, buffer
- // was too small. Determine whether we exceed the max line length we
- // want dtail to send to the client at once. Possibly split up log line
- // into multiple log lines.
- if len(rawLine) >= lineLengthThreshold {
- if !longLineWarning {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
- // Only print out one warning per long log line.
- longLineWarning = true
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
}
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-ctx.Done():
- return nil
+ after = ltx.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case lineBytesBuffer := <-beforeBuf:
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount() - i,
+ TransmittedPerc: 100,
+ }
+ i--
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
}
- rawLine = make([]byte, 0, 512)
}
+
+ line := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
+ re regex.Regex) (line.Line, bool) {
+
+ var read line.Line
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineTransmitted()
+
+ read = line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: f.transmittedPerc(),
+ }
+ return read, true
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ dlog.Common.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
}
+ return false, nil
}
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index 14994e5..b03b45d 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,7 +6,9 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile {
+func NewTailFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) TailFile {
+
return TailFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go
deleted file mode 100644
index a8d59ac..0000000
--- a/internal/io/fs/truncate.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package fs
-
-import (
- "context"
- "errors"
- "io"
- "os"
- "time"
-
- "github.com/mimecast/dtail/internal/io/logger"
-)
-
-func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) {
- checkTruncate = make(chan struct{})
-
- go func() {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case checkTruncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
- }()
-
- return
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}
diff --git a/internal/io/line/line.go b/internal/io/line/line.go
index 715be34..d306c88 100644
--- a/internal/io/line/line.go
+++ b/internal/io/line/line.go
@@ -1,13 +1,14 @@
package line
import (
+ "bytes"
"fmt"
)
// Line represents a read log line.
type Line struct {
// The content of the log line.
- Content []byte
+ Content *bytes.Buffer
// Until now, how many log lines were processed?
Count uint64
// Sometimes we produce too many log lines so that the client
@@ -25,7 +26,7 @@ type Line struct {
// Return a human readable representation of the followed line.
func (l Line) String() string {
return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)",
- string(l.Content),
+ l.Content.String(),
l.TransmittedPerc,
l.Count,
l.SourceID)
diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go
deleted file mode 100644
index 4254eef..0000000
--- a/internal/io/logger/logger.go
+++ /dev/null
@@ -1,403 +0,0 @@
-package logger
-
-import (
- "bufio"
- "context"
- "fmt"
- "os"
- "os/signal"
- "runtime"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/mimecast/dtail/internal/color"
- "github.com/mimecast/dtail/internal/config"
-)
-
-const (
- clientStr string = "CLIENT"
- serverStr string = "SERVER"
- infoStr string = "INFO"
- warnStr string = "WARN"
- errorStr string = "ERROR"
- fatalStr string = "FATAL"
- debugStr string = "DEBUG"
- traceStr string = "TRACE"
-)
-
-// Mode specifies the configured logging mode(s)
-var Mode Modes
-
-// Strategy is the current log strattegy used.
-var strategy Strategy
-
-// Synchronise access to logging.
-var mutex sync.Mutex
-
-// File descriptor of log file when Mode.logToFile enabled.
-var fd *os.File
-
-// File write buffer of log file when Mode.logToFile enabled.
-var writer *bufio.Writer
-
-// File write buffer of stdout when Mode.logToStdout enabled.
-var stdoutWriter *bufio.Writer
-
-// Current hostname.
-var hostname string
-
-// Used to detect change of day (create one log file per day0
-var lastDateStr string
-
-// Used to make logging non-blocking.
-var fileLogBufCh chan buf
-var stdoutBufCh chan string
-
-// Stdout channel, required to pause output
-var pauseCh chan struct{}
-var resumeCh chan struct{}
-
-// Tell the logger about logrotation
-var rotateCh chan os.Signal
-
-// Helper type to make logging non-blocking.
-type buf struct {
- time time.Time
- message string
-}
-
-// Start logging.
-func Start(ctx context.Context, mode Modes) {
- Mode = mode
-
- switch {
- case Mode.Nothing:
- return
- case Mode.Quiet:
- Mode.Trace = false
- Mode.Debug = false
- case Mode.Trace:
- Mode.Debug = true
- default:
- }
-
- strategy := logStrategy()
- stdoutWriter = bufio.NewWriter(os.Stdout)
-
- switch strategy {
- case DailyStrategy:
- _, err := os.Stat(config.Common.LogDir)
- Mode.logToFile = !os.IsNotExist(err)
- Mode.logToStdout = !Mode.Server || Mode.Debug || Mode.Trace || Mode.Quiet
- case StdoutStrategy:
- fallthrough
- default:
- Mode.logToFile = !Mode.Server
- Mode.logToStdout = true
- }
-
- fqdn, err := os.Hostname()
- if err != nil {
- panic(err)
- }
- s := strings.Split(fqdn, ".")
- hostname = s[0]
-
- pauseCh = make(chan struct{})
- resumeCh = make(chan struct{})
-
- // Setup logrotation
- rotateCh = make(chan os.Signal, 1)
- signal.Notify(rotateCh, syscall.SIGHUP)
-
- if Mode.logToStdout {
- stdoutBufCh = make(chan string, runtime.NumCPU()*100)
- go writeToStdout(ctx)
- }
-
- if Mode.logToFile {
- fileLogBufCh = make(chan buf, runtime.NumCPU()*100)
- go writeToFile(ctx)
- }
-}
-
-// Info message logging.
-func Info(args ...interface{}) string {
- if Mode.Server {
- return log(serverStr, infoStr, args)
- }
-
- return log(clientStr, infoStr, args)
-}
-
-// Warn message logging.
-func Warn(args ...interface{}) string {
- if !Mode.Quiet {
- if Mode.Server {
- return log(serverStr, warnStr, args)
- }
- return log(clientStr, warnStr, args)
- }
-
- return ""
-}
-
-// Error message logging.
-func Error(args ...interface{}) string {
- if Mode.Server {
- return log(serverStr, errorStr, args)
- }
-
- return log(clientStr, errorStr, args)
-}
-
-// Fatal message logging.
-func Fatal(args ...interface{}) string {
- if Mode.Server {
- return log(serverStr, fatalStr, args)
- }
-
- return log(clientStr, fatalStr, args)
-}
-
-// FatalExit logs an error and exists the process.
-func FatalExit(args ...interface{}) {
- what := clientStr
- if Mode.Server {
- what = serverStr
- }
- log(what, fatalStr, args)
-
- time.Sleep(time.Second)
- mutex.Lock()
- defer mutex.Unlock()
-
- closeWriter()
- os.Exit(3)
-}
-
-// Debug message logging.
-func Debug(args ...interface{}) string {
- if Mode.Debug {
- if Mode.Server {
- return log(serverStr, debugStr, args)
- }
- return log(clientStr, debugStr, args)
- }
-
- return ""
-}
-
-// Trace message logging.
-func Trace(args ...interface{}) string {
- if Mode.Trace {
- if Mode.Server {
- return log(serverStr, traceStr, args)
- }
- return log(clientStr, traceStr, args)
- }
-
- return ""
-}
-
-// Write log line to buffer and/or log file.
-func write(what, severity, message string) {
- if Mode.logToStdout {
- line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message)
-
- if color.Colored {
- line = color.Colorfy(line)
- }
-
- stdoutBufCh <- line
- }
-
- if Mode.logToFile {
- t := time.Now()
- timeStr := t.Format("20060102-150405")
- fileLogBufCh <- buf{
- time: t,
- message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message),
- }
- }
-}
-
-// Generig log message.
-func log(what string, severity string, args []interface{}) string {
- if Mode.Nothing {
- return ""
- }
-
- messages := []string{}
-
- for _, arg := range args {
- switch v := arg.(type) {
- case string:
- messages = append(messages, v)
- case int:
- messages = append(messages, fmt.Sprintf("%d", v))
- case error:
- messages = append(messages, v.Error())
- default:
- messages = append(messages, fmt.Sprintf("%v", v))
- }
- }
-
- message := strings.Join(messages, "|")
- write(what, severity, message)
-
- return fmt.Sprintf("%s|%s", severity, message)
-}
-
-// Raw message logging.
-func Raw(message string) {
- if Mode.Nothing {
- return
- }
-
- if Mode.logToFile {
- fileLogBufCh <- buf{time.Now(), message}
- }
-
- if Mode.logToStdout {
- if color.Colored {
- message = color.Colorfy(message)
- }
- stdoutBufCh <- message
- }
-}
-
-// Close log writer (e.g. on change of day).
-func closeWriter() {
- if writer != nil {
- writer.Flush()
- fd.Close()
- }
-}
-
-// Return the correct log file writer
-func fileWriter(dateStr string) *bufio.Writer {
- if dateStr != lastDateStr {
- return updateFileWriter(dateStr)
- }
-
- // Check for log rotation signal
- select {
- case <-rotateCh:
- stdoutWriter.WriteString("Received signal for logrotation\n")
- return updateFileWriter(dateStr)
- default:
- }
-
- return writer
-}
-
-// Update log file writer
-func updateFileWriter(dateStr string) *bufio.Writer {
- // Detected change of day. Close current writer and create a new one.
- mutex.Lock()
- defer mutex.Unlock()
- closeWriter()
-
- if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
- if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
- panic(err)
- }
- }
-
- logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr)
- newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
- if err != nil {
- panic(err)
- }
-
- fd = newFd
- writer = bufio.NewWriterSize(fd, 1)
- lastDateStr = dateStr
-
- return writer
-}
-
-// Flush all outstanding lines.
-func Flush() {
- for {
- select {
- case message := <-stdoutBufCh:
- stdoutWriter.WriteString(message)
- default:
- stdoutWriter.Flush()
- return
- }
- }
-}
-
-func writeToStdout(ctx context.Context) {
- for {
- select {
- case message := <-stdoutBufCh:
- stdoutWriter.WriteString(message)
- case <-time.After(time.Millisecond * 100):
- stdoutWriter.Flush()
- case <-pauseCh:
- PAUSE:
- for {
- select {
- case <-stdoutBufCh:
- case <-resumeCh:
- break PAUSE
- case <-ctx.Done():
- return
- }
- }
- case <-ctx.Done():
- Flush()
- return
- }
- }
-}
-
-func writeToFile(ctx context.Context) {
- for {
- select {
- case buf := <-fileLogBufCh:
- dateStr := buf.time.Format("20060102")
- w := fileWriter(dateStr)
- w.WriteString(buf.message)
- case <-pauseCh:
- PAUSE:
- for {
- select {
- case <-stdoutBufCh:
- case <-resumeCh:
- break PAUSE
- case <-ctx.Done():
- return
- }
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-// Pause logging.
-func Pause() {
- if Mode.logToStdout {
- pauseCh <- struct{}{}
- }
- if Mode.logToFile {
- pauseCh <- struct{}{}
- }
-}
-
-// Resume logging (after pausing).
-func Resume() {
- if Mode.logToStdout {
- resumeCh <- struct{}{}
- }
- if Mode.logToFile {
- resumeCh <- struct{}{}
- }
-}
diff --git a/internal/io/logger/modes.go b/internal/io/logger/modes.go
deleted file mode 100644
index 8864179..0000000
--- a/internal/io/logger/modes.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package logger
-
-// Modes specifies the logging mode.
-type Modes struct {
- Server bool
- Trace bool
- Debug bool
- Nothing bool
- Quiet bool
- logToStdout bool
- logToFile bool
-}
diff --git a/internal/io/logger/strategy.go b/internal/io/logger/strategy.go
deleted file mode 100644
index 44bf393..0000000
--- a/internal/io/logger/strategy.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package logger
-
-import "github.com/mimecast/dtail/internal/config"
-
-// Strategy allows to specify a log rotation strategy.
-type Strategy int
-
-// Possible log strategies.
-const (
- NormalStrategy Strategy = iota
- DailyStrategy Strategy = iota
- StdoutStrategy Strategy = iota
-)
-
-func logStrategy() Strategy {
- switch config.Common.LogStrategy {
- case "daily":
- return DailyStrategy
- default:
- }
- return StdoutStrategy
-}
diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go
new file mode 100644
index 0000000..89fcf81
--- /dev/null
+++ b/internal/io/pool/builder.go
@@ -0,0 +1,21 @@
+package pool
+
+import (
+ "strings"
+ "sync"
+)
+
+// BuilderBuffer is there to optimize memory allocations (DTail allocates a lot
+// of memory while reading log data otherwise)
+var BuilderBuffer = sync.Pool{
+ New: func() interface{} {
+ sb := strings.Builder{}
+ return &sb
+ },
+}
+
+// RecycleBuilderBuffer recycles the buffer again.
+func RecycleBuilderBuffer(sb *strings.Builder) {
+ sb.Reset()
+ BuilderBuffer.Put(sb)
+}
diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go
new file mode 100644
index 0000000..3d48f2c
--- /dev/null
+++ b/internal/io/pool/bytesbuffer.go
@@ -0,0 +1,22 @@
+package pool
+
+import (
+ "bytes"
+ "sync"
+)
+
+// BytesBuffer is there to optimize memory allocations. DTail otherwise allocates
+// a lot of memory while reading logs.
+var BytesBuffer = sync.Pool{
+ New: func() interface{} {
+ b := bytes.Buffer{}
+ b.Grow(128)
+ return &b
+ },
+}
+
+// RecycleBytesBuffer recycles the buffer again.
+func RecycleBytesBuffer(b *bytes.Buffer) {
+ b.Reset()
+ BytesBuffer.Put(b)
+}
diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go
index 36ebdb5..e82132d 100644
--- a/internal/io/prompt/prompt.go
+++ b/internal/io/prompt/prompt.go
@@ -6,7 +6,7 @@ import (
"os"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// Answer is a user input of a prompt question.
@@ -19,7 +19,8 @@ type Answer struct {
Callback func()
// Runs after Callback and after logging resumes
EndCallback func()
- AskAgain bool
+ // AskAgain can be used to not to ask again about the question.
+ AskAgain bool
}
// Prompt used for interactive user input.
@@ -30,7 +31,6 @@ type Prompt struct {
func (p *Prompt) askString() string {
var sb strings.Builder
-
sb.WriteString(p.question)
sb.WriteString("? (")
@@ -41,7 +41,6 @@ func (p *Prompt) askString() string {
sb.WriteString(strings.Join(ax, ","))
sb.WriteString("): ")
-
return sb.String()
}
@@ -58,7 +57,7 @@ func (p *Prompt) Add(answer Answer) {
// Ask a question.
func (p *Prompt) Ask() {
reader := bufio.NewReader(os.Stdin)
- logger.Pause()
+ dlog.Common.Pause()
for {
fmt.Print(p.askString())
@@ -68,9 +67,8 @@ func (p *Prompt) Ask() {
if a.Callback != nil {
a.Callback()
}
-
if !a.AskAgain {
- logger.Resume()
+ dlog.Common.Resume()
if a.EndCallback != nil {
a.EndCallback()
}
@@ -90,6 +88,5 @@ func (p *Prompt) answer(answerStr string) (*Answer, bool) {
default:
}
}
-
return nil, false
}
diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go
index 500c530..584b59c 100644
--- a/internal/io/signal/signal.go
+++ b/internal/io/signal/signal.go
@@ -14,10 +14,8 @@ import (
func InterruptCh(ctx context.Context) <-chan string {
sigIntCh := make(chan os.Signal)
gosignal.Notify(sigIntCh, os.Interrupt)
-
sigOtherCh := make(chan os.Signal)
gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT)
-
statsCh := make(chan string)
go func() {
@@ -41,6 +39,10 @@ func InterruptCh(ctx context.Context) <-chan string {
}
}
}()
-
return statsCh
}
+
+// NoCh doesn't listen on a signal.
+func NoCh(ctx context.Context) <-chan string {
+ return make(chan string)
+}