summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 21:10:29 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:41 +0300
commit97747ea0f3178f7f5890512d483fdccaa82846b0 (patch)
tree9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/io
parent7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff)
vetting and linting and some code restyling
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/dlog/dlog.go22
-rw-r--r--internal/io/dlog/level.go5
-rw-r--r--internal/io/dlog/loggers/factory.go6
-rw-r--r--internal/io/dlog/loggers/file.go17
-rw-r--r--internal/io/dlog/loggers/logger.go1
-rw-r--r--internal/io/dlog/loggers/strategy.go11
-rw-r--r--internal/io/fs/catfile.go4
-rw-r--r--internal/io/fs/filereader.go3
-rw-r--r--internal/io/fs/permissions/permission_linuxacl.go2
-rw-r--r--internal/io/fs/readfile.go29
-rw-r--r--internal/io/fs/tailfile.go4
-rw-r--r--internal/io/pool/builder.go3
-rw-r--r--internal/io/pool/bytesbuffer.go3
-rw-r--r--internal/io/prompt/prompt.go7
-rw-r--r--internal/io/signal/signal.go3
15 files changed, 75 insertions, 45 deletions
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index 28e6882..da67585 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -82,7 +82,7 @@ func new(sourceProcess, sourcePackage source.Source) *DLog {
if err != nil {
panic(err)
}
- strategy := loggers.GetStrategy(config.Common.LogStrategy)
+ strategy := loggers.NewStrategy(config.Common.LogStrategy)
loggerName := config.Common.Logger
level := newLevel(config.Common.LogLevel)
@@ -153,6 +153,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) {
}
}
+// FatalPanic terminates the process with a fatal error.
func (d *DLog) FatalPanic(args ...interface{}) {
d.log(Fatal, args)
d.Flush()
@@ -162,42 +163,51 @@ func (d *DLog) FatalPanic(args ...interface{}) {
panic(sb.String())
}
+// Fatal logs a fatal error.
func (d *DLog) Fatal(args ...interface{}) string {
return d.log(Fatal, args)
}
+// Error logging.
func (d *DLog) Error(args ...interface{}) string {
return d.log(Error, args)
}
+// Warn logs a warning message.
func (d *DLog) Warn(args ...interface{}) string {
return d.log(Warn, args)
}
+// Info logging.
func (d *DLog) Info(args ...interface{}) string {
return d.log(Info, args)
}
+// Verbose logging.
func (d *DLog) Verbose(args ...interface{}) string {
return d.log(Verbose, args)
}
+// Debug logging.
func (d *DLog) Debug(args ...interface{}) string {
return d.log(Debug, args)
}
+// Trace logging.
func (d *DLog) Trace(args ...interface{}) string {
_, file, line, _ := runtime.Caller(1)
args = append(args, fmt.Sprintf("at %s:%d", file, line))
return d.log(Trace, args)
}
+// Devel used for development purpose only logging (e.g. "print" debugging).
func (d *DLog) Devel(args ...interface{}) string {
_, file, line, _ := runtime.Caller(1)
args = append(args, fmt.Sprintf("at %s:%d", file, line))
return d.log(Devel, args)
}
+// Raw message logging.
func (d *DLog) Raw(message string) string {
if !config.Client.TermColorsEnable || !d.logger.SupportsColors() {
d.logger.Log(time.Now(), message)
@@ -207,6 +217,7 @@ func (d *DLog) Raw(message string) string {
return message
}
+// Mapreduce logging.
func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
args := make([]interface{}, len(data)+1)
@@ -251,6 +262,11 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
return d.log(Info, args)
}
-func (d *DLog) Flush() { d.logger.Flush() }
-func (d *DLog) Pause() { d.logger.Pause() }
+// Flush the log buffers.
+func (d *DLog) Flush() { d.logger.Flush() }
+
+// Pause the logging.
+func (d *DLog) Pause() { d.logger.Pause() }
+
+// Resume the logging.
func (d *DLog) Resume() { d.logger.Resume() }
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
index 248ad83..0971094 100644
--- a/internal/io/dlog/level.go
+++ b/internal/io/dlog/level.go
@@ -7,6 +7,7 @@ import (
type level int
+// Available log levels.
const (
Fatal level = iota
Error level = iota
@@ -20,7 +21,8 @@ const (
All level = iota
)
-var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All}
+var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug,
+ Devel, Trace, All}
func newLevel(l string) level {
switch strings.ToLower(l) {
@@ -73,6 +75,5 @@ func (l level) String() string {
case All:
return "ALL"
}
-
panic("Unknown log level " + fmt.Sprintf("%d", l))
}
diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go
index dda3ee6..415d7fb 100644
--- a/internal/io/dlog/loggers/factory.go
+++ b/internal/io/dlog/loggers/factory.go
@@ -9,11 +9,13 @@ import (
var factoryMap map[string]Logger
var factoryMutex sync.Mutex
+// Factory is there to retrieve a logger based on various settings.
func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger {
factoryMutex.Lock()
defer factoryMutex.Unlock()
- id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName)
+ id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName,
+ rotationStrategy.FileBase, loggerName)
if factoryMap == nil {
factoryMap = make(map[string]Logger)
}
@@ -36,10 +38,10 @@ func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger {
panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName))
}
}
-
return singleton
}
+// FactoryRotate invokes a log rotation of all loggers.
func FactoryRotate() {
factoryMutex.Lock()
defer factoryMutex.Unlock()
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
index 87280fd..94824fe 100644
--- a/internal/io/dlog/loggers/file.go
+++ b/internal/io/dlog/loggers/file.go
@@ -12,8 +12,7 @@ import (
"github.com/mimecast/dtail/internal/config"
)
-type fileWriter struct {
-}
+type fileWriter struct{}
type fileMessageBuf struct {
now time.Time
@@ -35,7 +34,7 @@ type file struct {
}
func newFile(strategy Strategy) *file {
- f := file{
+ return &file{
bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100),
pauseCh: make(chan struct{}),
resumeCh: make(chan struct{}),
@@ -43,16 +42,17 @@ func newFile(strategy Strategy) *file {
flushCh: make(chan struct{}),
strategy: strategy,
}
-
- return &f
}
func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) {
f.mutex.Lock()
- defer f.mutex.Unlock()
+ defer func() {
+ f.started = true
+ f.mutex.Unlock()
+ }()
- // Logger already started from another Goroutine.
if f.started {
+ // Logger already started from another Goroutine.
wg.Done()
return
}
@@ -68,7 +68,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) {
go func() {
defer wg.Done()
-
for {
select {
case m := <-f.bufferCh:
@@ -84,8 +83,6 @@ func (f *file) Start(ctx context.Context, wg *sync.WaitGroup) {
}
}
}()
-
- f.started = true
}
func (f *file) Log(now time.Time, message string) {
diff --git a/internal/io/dlog/loggers/logger.go b/internal/io/dlog/loggers/logger.go
index c88900d..d4e85de 100644
--- a/internal/io/dlog/loggers/logger.go
+++ b/internal/io/dlog/loggers/logger.go
@@ -6,6 +6,7 @@ import (
"time"
)
+// Logger is there to plug in your own log implementation.
type Logger interface {
Log(now time.Time, message string)
LogWithColors(now time.Time, message, messageWithColors string)
diff --git a/internal/io/dlog/loggers/strategy.go b/internal/io/dlog/loggers/strategy.go
index a1b9355..25d10f0 100644
--- a/internal/io/dlog/loggers/strategy.go
+++ b/internal/io/dlog/loggers/strategy.go
@@ -5,19 +5,26 @@ import (
"path/filepath"
)
+// Rotation is the actual strategy used for log rotation..
type Rotation int
const (
- DailyRotation Rotation = iota
+ // DailyRotation tells DTail to rotate its logs on a daily basis or on SIGHUP.
+ DailyRotation Rotation = iota
+ // SignalRotation tells DTail to rotate its logs only on SIGHUP.
SignalRotation Rotation = iota
)
+// Strategy is a pair of the rotation and the file base.
type Strategy struct {
+ // Rotation is the actual rotation strategy used.
Rotation Rotation
+ // FileBase can be a name (e.g. "dserver", "dmap") when signal rotation is used.
FileBase string
}
-func GetStrategy(name string) Strategy {
+// NewStrategy returns the stratey based on its name.
+func NewStrategy(name string) Strategy {
switch name {
case "daily":
return Strategy{DailyRotation, ""}
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index 7f387bc..01c15ba 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,7 +6,9 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile {
+func NewCatFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) CatFile {
+
return CatFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 0774837..7773142 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -7,7 +7,8 @@ import (
"github.com/mimecast/dtail/internal/regex"
)
-// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
+// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
+// a file.
type FileReader interface {
Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
FilePath() string
diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go
index 7d2d7ca..904b90f 100644
--- a/internal/io/fs/permissions/permission_linuxacl.go
+++ b/internal/io/fs/permissions/permission_linuxacl.go
@@ -13,7 +13,7 @@ import (
"unsafe"
)
-// ToRead checks whether user has Linux file system permissions to read a given file.
+// ToRead checks whether user has Linux file system permissions to read a file.
func ToRead(user, filePath string) (bool, error) {
cUser := C.CString(user)
cFilePath := C.CString(filePath)
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index f128c07..92f85b6 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -42,7 +42,8 @@ type readFile struct {
// String returns the string representation of the readFile
func (f readFile) String() string {
- return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
+ return fmt.Sprintf(
+ "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
f.filePath,
f.globID,
f.retry,
@@ -61,7 +62,9 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, lines chan<- line.Line,
+ re regex.Regex) error {
+
dlog.Common.Debug("readFile", f)
defer func() {
select {
@@ -74,7 +77,8 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
case f.limiter <- struct{}{}:
default:
select {
- case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID,
+ "Server limit reached. Queuing file..."):
case <-ctx.Done():
return nil
}
@@ -139,13 +143,11 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
default:
reader = bufio.NewReader(fd)
}
-
return
}
func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
var offset uint64
-
reader, err := f.makeReader(fd)
if err != nil {
return err
@@ -193,7 +195,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
default:
if message.Len() >= lineLengthThreshold {
if !warnedAboutLongLine {
- f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines")
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines")
warnedAboutLongLine = true
}
message.WriteString("\n")
@@ -210,9 +213,10 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
+func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+ defer wg.Done()
for {
select {
case line, ok := <-rawLines:
@@ -231,9 +235,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha
}
}
-func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) {
- var read line.Line
+func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int,
+ re regex.Regex) (line.Line, bool) {
+ var read line.Line
if !re.Match(lineBytes.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
@@ -254,7 +259,6 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, r
Count: f.totalLineCount(),
TransmittedPerc: f.transmittedPerc(),
}
-
return read, true
}
@@ -267,7 +271,6 @@ func (f readFile) truncated(fd *os.File) (bool, error) {
if err != nil {
return true, err
}
-
// Can not open file at original path.
pathFd, err := os.Open(f.filePath)
if err != nil {
@@ -280,10 +283,8 @@ func (f readFile) truncated(fd *os.File) (bool, error) {
if err != nil {
return true, err
}
-
if curPos > pathPos {
return true, errors.New("File got truncated")
}
-
return false, nil
}
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index 14994e5..b03b45d 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,7 +6,9 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile {
+func NewTailFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) TailFile {
+
return TailFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/pool/builder.go b/internal/io/pool/builder.go
index c9dc221..89fcf81 100644
--- a/internal/io/pool/builder.go
+++ b/internal/io/pool/builder.go
@@ -5,6 +5,8 @@ import (
"sync"
)
+// BuilderBuffer is there to optimize memory allocations (DTail allocates a lot
+// of memory while reading log data otherwise)
var BuilderBuffer = sync.Pool{
New: func() interface{} {
sb := strings.Builder{}
@@ -12,6 +14,7 @@ var BuilderBuffer = sync.Pool{
},
}
+// RecycleBuilderBuffer recycles the buffer again.
func RecycleBuilderBuffer(sb *strings.Builder) {
sb.Reset()
BuilderBuffer.Put(sb)
diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go
index 0a159f5..3d48f2c 100644
--- a/internal/io/pool/bytesbuffer.go
+++ b/internal/io/pool/bytesbuffer.go
@@ -5,6 +5,8 @@ import (
"sync"
)
+// BytesBuffer is there to optimize memory allocations. DTail otherwise allocates
+// a lot of memory while reading logs.
var BytesBuffer = sync.Pool{
New: func() interface{} {
b := bytes.Buffer{}
@@ -13,6 +15,7 @@ var BytesBuffer = sync.Pool{
},
}
+// RecycleBytesBuffer recycles the buffer again.
func RecycleBytesBuffer(b *bytes.Buffer) {
b.Reset()
BytesBuffer.Put(b)
diff --git a/internal/io/prompt/prompt.go b/internal/io/prompt/prompt.go
index 7c3cdb5..e82132d 100644
--- a/internal/io/prompt/prompt.go
+++ b/internal/io/prompt/prompt.go
@@ -19,7 +19,8 @@ type Answer struct {
Callback func()
// Runs after Callback and after logging resumes
EndCallback func()
- AskAgain bool
+ // AskAgain can be used to not to ask again about the question.
+ AskAgain bool
}
// Prompt used for interactive user input.
@@ -30,7 +31,6 @@ type Prompt struct {
func (p *Prompt) askString() string {
var sb strings.Builder
-
sb.WriteString(p.question)
sb.WriteString("? (")
@@ -41,7 +41,6 @@ func (p *Prompt) askString() string {
sb.WriteString(strings.Join(ax, ","))
sb.WriteString("): ")
-
return sb.String()
}
@@ -68,7 +67,6 @@ func (p *Prompt) Ask() {
if a.Callback != nil {
a.Callback()
}
-
if !a.AskAgain {
dlog.Common.Resume()
if a.EndCallback != nil {
@@ -90,6 +88,5 @@ func (p *Prompt) answer(answerStr string) (*Answer, bool) {
default:
}
}
-
return nil, false
}
diff --git a/internal/io/signal/signal.go b/internal/io/signal/signal.go
index 14056c4..584b59c 100644
--- a/internal/io/signal/signal.go
+++ b/internal/io/signal/signal.go
@@ -14,10 +14,8 @@ import (
func InterruptCh(ctx context.Context) <-chan string {
sigIntCh := make(chan os.Signal)
gosignal.Notify(sigIntCh, os.Interrupt)
-
sigOtherCh := make(chan os.Signal)
gosignal.Notify(sigOtherCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT)
-
statsCh := make(chan string)
go func() {
@@ -41,7 +39,6 @@ func InterruptCh(ctx context.Context) <-chan string {
}
}
}()
-
return statsCh
}