diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-26 11:26:53 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-07 13:31:15 +0000 |
| commit | 0945da8dfefcbb723eecea0e5f4eafff63398253 (patch) | |
| tree | f06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/fs | |
| parent | 2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff) | |
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/fs')
| -rw-r--r-- | internal/fs/catfile.go | 27 | ||||
| -rw-r--r-- | internal/fs/filereader.go | 9 | ||||
| -rw-r--r-- | internal/fs/lineread.go | 28 | ||||
| -rw-r--r-- | internal/fs/permissions/permission.go | 14 | ||||
| -rw-r--r-- | internal/fs/permissions/permission_linux.c | 395 | ||||
| -rw-r--r-- | internal/fs/permissions/permission_linux.go | 33 | ||||
| -rw-r--r-- | internal/fs/permissions/permission_linux.h | 60 | ||||
| -rw-r--r-- | internal/fs/permissions/permission_test.go | 112 | ||||
| -rw-r--r-- | internal/fs/readfile.go | 318 | ||||
| -rw-r--r-- | internal/fs/stats.go | 69 | ||||
| -rw-r--r-- | internal/fs/tailfile.go | 27 |
11 files changed, 0 insertions, 1092 deletions
diff --git a/internal/fs/catfile.go b/internal/fs/catfile.go deleted file mode 100644 index 99f521f..0000000 --- a/internal/fs/catfile.go +++ /dev/null @@ -1,27 +0,0 @@ -package fs - -import "sync" - -// CatFile is for reading a whole file. -type CatFile struct { - readFile -} - -// NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { - var mutex sync.Mutex - - return CatFile{ - readFile: readFile{ - filePath: filePath, - stop: make(chan struct{}), - globID: globID, - serverMessages: serverMessages, - retry: false, - canSkipLines: false, - seekEOF: false, - limiter: limiter, - mutex: &mutex, - }, - } -} diff --git a/internal/fs/filereader.go b/internal/fs/filereader.go deleted file mode 100644 index 5a08e27..0000000 --- a/internal/fs/filereader.go +++ /dev/null @@ -1,9 +0,0 @@ -package fs - -// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. -type FileReader interface { - Start(lines chan<- LineRead, regex string) error - FilePath() string - Retry() bool - Stop() -} diff --git a/internal/fs/lineread.go b/internal/fs/lineread.go deleted file mode 100644 index 7ee558e..0000000 --- a/internal/fs/lineread.go +++ /dev/null @@ -1,28 +0,0 @@ -package fs - -import ( - "fmt" -) - -// LineRead represents a read log line. -type LineRead struct { - // The content of the log line. - Content []byte - // Until now, how many log lines were processed? - Count uint64 - // Sometimes we produce too many log lines so that the client - // is too slow to process all of them. The server will drop log - // lines if that happens but it will signal to the client how - // many log lines in % could be transmitted to the client. - TransmittedPerc int - GlobID *string -} - -// Return a human readable representation of the followed line. -func (l LineRead) String() string { - return fmt.Sprintf("LineRead(Content:%s,TransmittedPerc:%v,Count:%v,GlobID:%s)", - string(l.Content), - l.TransmittedPerc, - l.Count, - *l.GlobID) -} diff --git a/internal/fs/permissions/permission.go b/internal/fs/permissions/permission.go deleted file mode 100644 index 6e83309..0000000 --- a/internal/fs/permissions/permission.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build !linux - -package permissions - -import ( - "github.com/mimecast/dtail/internal/logger" -) - -// ToRead is to check whether user has read permissions to a given file. -func ToRead(user, filePath string) (bool, error) { - // Only implemented for Linux, always expect true - logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform") - return true, nil -} diff --git a/internal/fs/permissions/permission_linux.c b/internal/fs/permissions/permission_linux.c deleted file mode 100644 index cd10525..0000000 --- a/internal/fs/permissions/permission_linux.c +++ /dev/null @@ -1,395 +0,0 @@ -#include "permission_linux.h" - -#ifdef DEBUG -void debug_print_checker(struct permission_checker *pc) { - fprintf(stderr, "DEBUG: user_name:%s (%d)\n", - pc->user_name, pc->uid); - - fprintf(stderr, "DEBUG: ngids:%d\n", pc->ngids); - int j; - for (j = 0; j < pc->ngids; j++) { - fprintf(stderr, "DEBUG: %d", pc->gids[j]); - struct group *gr = getgrgid(pc->gids[j]); - if (gr != NULL) - fprintf(stderr, " (%s)", gr->gr_name); - fprintf(stderr, "\n"); - } - - fprintf(stderr, "DEBUG: file_path:%s (%d:%d)\n", - pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid); -} -#endif // DEBUG - -int stat_file(struct permission_checker *pc) { - if (stat(pc->file_path, &pc->file_stat) != 0) - return -1; - -#ifdef DEBUG - fprintf(stderr, "DEBUG: File'%s' is owned by '%d:%d'\n", - pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid); -#endif - - return 0; -} - -int get_user_uid(struct permission_checker *pc) { - struct passwd *result = NULL; - - size_t bufsize = sysconf(_SC_GETPW_R_SIZE_MAX); - if (bufsize == -1) - bufsize = 16384; - - char *buf = malloc(bufsize); - if (buf == NULL) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unabel to allocate bufer while retrieving user '%s'\n", pc->user_name); -#endif - return -1; - } - - int rc = getpwnam_r(pc->user_name, &pc->pw, buf, bufsize, &result); - - if (result == NULL) { -#ifdef DEBUG - if (rc == 0) { - fprintf(stderr, "DEBUG: No user '%s' found\n", pc->user_name); - } else { - fprintf(stderr, "DEBUG: Unknown error while retrieving user '%s'\n", pc->user_name); - } -#endif - - free(buf); - return -1; - } - - pc->uid = pc->pw.pw_uid; - - free(buf); - return 0; -} - -int get_user_groups(struct permission_checker *pc) { - // First assume we are in 10 groups max - pc->ngids = 10; - pc->gids = malloc(pc->ngids * sizeof(gid_t)); - - if (pc->gids == NULL) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unable to allocate space for gids."); -#endif - return -1; - } - - // Try so many times to load group list until it fits into group array. - while (getgrouplist(pc->user_name, pc->pw.pw_gid, pc->gids, &pc->ngids) == -1) { - // Too many groups, enlarge group array and try again - int newngids = pc->ngids + 100; - size_t newsize = newngids * sizeof(gid_t); - - if (SIZE_MAX / newngids < sizeof(gid_t)) { - // Overflow -#ifdef DEBUG - fprintf(stderr, "DEBUG: Overflow detected."); -#endif - return -1; - } - - gid_t *newgids = realloc(pc->gids, newsize); - if (newgids == NULL) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unable to allocate space for gids."); -#endif - free(pc->gids); - return -1; - } - - pc->gids = newgids; - pc->ngids = newngids; - } - - return 0; -} - -int is_member_of_group(struct permission_checker *pc, gid_t gid) { - int j; - for (j = 0; j < pc->ngids; j++) - if (pc->gids[j] == gid) - return 1; - return 0; -} - -int check_acl_uid_matches(uid_t uid, acl_entry_t entry) { - int ret = -1; - uid_t *acl_uid = acl_get_qualifier(entry); - if (acl_uid == NULL) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry"); -#endif - return -1; - } - - ret = *acl_uid == uid ? 0 : -1; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL user match?: %d <=> %d: %d\n", *acl_uid, uid, ret); -#endif - acl_free(acl_uid); - return ret; -} - -int check_acl_gid_matches(gid_t *gids, int ngids, acl_entry_t entry) { - int ret = -1; - gid_t *acl_gid = acl_get_qualifier(entry); - if (acl_gid == NULL) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry"); -#endif - return -1; - } - - int j; - for (j = 0; j < ngids; j++) { - if (*acl_gid == gids[j]) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: User is in group %d", *acl_gid); -#endif - ret = 0; - break; - } - } - -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL group match?: %d <=> ...: %d\n", *acl_gid, ret); -#endif - acl_free(acl_gid); - return ret; -} - -int check_acl(struct permission_checker *pc, const int flag) { - // By default user has no read perm. - int has_read_perm = 0; - - // By default mask tells that there are read perm. However in order to have - // read permissions both, has_read_perm and mask_allows_read_access must be 1! - int mask_allows_read_access = 1; - - acl_type_t type = ACL_TYPE_ACCESS; - acl_t acl = acl_get_file(pc->file_path, type); - - if (acl == NULL) - // Unable to retrieve ACL. - return -1; - - // Walk through each entry of this ACL. - int id; - for (id = ACL_FIRST_ENTRY; ; id = ACL_NEXT_ENTRY) { - acl_entry_t entry; - if (acl_get_entry(acl, id, &entry) != 1) - // No more ACL entries. - break; - - acl_tag_t tag; - if (acl_get_tag_type(entry, &tag) == -1) - // Unable to retrieve ACL tag. - return -1; - - switch (tag) { - case ACL_USER_OBJ: - if (flag == GROUP_CHECK) - continue; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_USER_OBJ\n"); -#endif - // Ignore this ACL entry if user is not owner of file. - if (pc->uid != pc->file_stat.st_uid) - continue; - break; - case ACL_USER: - if (flag == GROUP_CHECK) - continue; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_USER\n"); -#endif - // Ignore this ACL entry if uid does not match. - if (check_acl_uid_matches(pc->uid, entry) != 0) - continue; - break; - case ACL_GROUP_OBJ: - if (flag == USER_CHECK) - continue; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_GROUP_OBJ\n"); -#endif - // Ignore ACL entry if user is not in group of file. - if (!is_member_of_group(pc, pc->file_stat.st_gid)) - continue; - break; - case ACL_GROUP: - if (flag == USER_CHECK) - continue; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_GROUP\n"); -#endif - // Ignore ACL entry if user is not in group of entry. - if (check_acl_gid_matches(pc->gids, pc->ngids, entry) != 0) - continue; - break; - case ACL_OTHER: - if (flag == GROUP_CHECK) - continue; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_OTHER\n"); -#endif - break; - case ACL_MASK: -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL_MASK\n"); -#endif - break; - default: -#ifdef DEBUG - fprintf(stderr, "DEBUG: Unknown ACL tag\n"); -#endif - return -1; - } - -#ifdef DEBUG - fprintf(stderr, "DEBUG: Retrieving permset\n"); -#endif - acl_permset_t permset; - int permission; - if (acl_get_permset(entry, &permset) == -1) - // Unable to retrieve permset. - return -1; - - if ((permission = acl_get_perm(permset, ACL_READ)) == -1) - // Unable to retrieve permset value. - return -1; - - if (permission == 1 && tag != ACL_MASK) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL says user has permission to read file.\n"); -#endif - has_read_perm = 1; - } else if (permission == 0 && tag == ACL_MASK) { - // Mask says that there are no permissions to read. - mask_allows_read_access = 0; -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL mask says no permission to read file.\n"); -#endif - } - } - - if (has_read_perm && mask_allows_read_access) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL end result: User has permission to read file.\n"); -#endif - return 1; - } - -#ifdef DEBUG - fprintf(stderr, "DEBUG: ACL end result: User has no permission to read file.\n"); -#endif - return 0; -} - -int check_traditional(struct permission_checker *pc, const int flag) { - mode_t mode = pc->file_stat.st_mode; - uid_t uid = pc->file_stat.st_uid; - gid_t gid = pc->file_stat.st_gid; - - if (flag == USER_CHECK && (mode & S_IROTH)) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: Others can read file '%s'\n", - pc->file_path); -#endif - return 1; - - } else if (flag == USER_CHECK && (mode & S_IRUSR) && uid == pc->uid) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: User '%s' can read file '%s'\n", - pc->user_name, pc->file_path); -#endif - return 1; - - } else if (flag == GROUP_CHECK && (mode & S_IRGRP) && is_member_of_group(pc, gid)) { -#ifdef DEBUG - fprintf(stderr, "DEBUG: User's '%s' group can read file '%s'\n", - pc->user_name, pc->file_path); -#endif - return 1; - } - - return 0; -} - -int permission_to_read(char* user_name, char *file_path) { - int rc = -1; - -#ifdef DEBUG - fprintf(stderr, "DEBUG: User check '%s' for file '%s'\n", user_name, file_path); -#endif - struct permission_checker pc = { - .user_name = user_name, - .gids = NULL, - .ngids = 0, - .file_path = file_path, - }; - - // Gather user's UID. - if ((rc = get_user_uid(&pc)) == -1) - // Could not retrieve UID. - goto cleanup; - - // Gather file owner (user and group). - if ((rc = stat_file(&pc)) == -1) - // Could not stat file. - goto cleanup; - - // Check whether there is an ACL entry which would allow the user - // to read the file. Don't check for any groups yet. The issue with - // groups is that it can be very slow to retrieve the list of groups - // of a specific user when done via a remote LDAP server! - if ((rc = check_acl(&pc, USER_CHECK)) == 1) - // Yes, has permissions. - goto cleanup; - - // Check whether ACLs of file could be retrieved. - if (rc == -1) { - if (errno != ENOTSUP) - // Unknown error. - goto cleanup; - - // File system does not support ACLs. - // Fallback to traditional permissions. - if ((rc = check_traditional(&pc, USER_CHECK)) == 1) - // Yes, has traditional permissions. - goto cleanup; - - if ((rc = get_user_groups(&pc)) == -1) - // Can not retrieve user's groups. - goto cleanup; - - rc = check_traditional(&pc, GROUP_CHECK); - goto cleanup; - } - - if ((rc = get_user_groups(&pc)) == -1) - // Can not retrieve use'r groups. - goto cleanup; - - // Check whether there is an ACL entry which would allow any of the - // user's groups to read the file. - rc = check_acl(&pc, GROUP_CHECK); - -cleanup: -#ifdef DEBUG - debug_print_checker(&pc); -#endif - - if (pc.ngids) - free(pc.gids); - - return rc; -} - -// vim: set tabstop=8 softtabstop=0 expandtab shiftwidth=4 smarttab diff --git a/internal/fs/permissions/permission_linux.go b/internal/fs/permissions/permission_linux.go deleted file mode 100644 index feae729..0000000 --- a/internal/fs/permissions/permission_linux.go +++ /dev/null @@ -1,33 +0,0 @@ -package permissions - -/* -#include "permission_linux.h" -#cgo LDFLAGS: -L. -lacl -*/ -import "C" - -import ( - "errors" - "unsafe" -) - -// To check whether user has Linux file system permissions to read a given file. -func ToRead(user, filePath string) (bool, error) { - cUser := C.CString(user) - cFilePath := C.CString(filePath) - - defer C.free(unsafe.Pointer(cUser)) - defer C.free(unsafe.Pointer(cFilePath)) - - cOk, err := C.permission_to_read(cUser, cFilePath) - if cOk == 1 { - return true, nil - } - - if err != nil { - // err contains errno message - return false, err - } - - return false, errors.New("User without permission to read file") -} diff --git a/internal/fs/permissions/permission_linux.h b/internal/fs/permissions/permission_linux.h deleted file mode 100644 index a2c266e..0000000 --- a/internal/fs/permissions/permission_linux.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef PERMISSION_LINUX_H -#define PERMISSION_LINUX_H - -#include <acl/libacl.h> -#include <errno.h> -#include <grp.h> -#include <pwd.h> -#include <stdio.h> -#include <stdint.h> -#include <stdlib.h> -#include <sys/acl.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <unistd.h> - -//#define DEBUG -#define USER_CHECK 0 -#define GROUP_CHECK 1 - -struct permission_checker { - char *user_name; - uid_t uid; - gid_t *gids; - int ngids; - char *file_path; - struct stat file_stat; - struct passwd pw; -}; - - -#ifdef DEBUG -// Print out permission_checker struct. -void debug_print_checker(struct permission_checker *pc); -#endif - -// Stat a given file to retrieve traditional UNIX permissions. -int stat_file(struct permission_checker *pc); - -// Retrieve UID of user. -int get_user_uid(struct permission_checker *pc); - -// Retrieve all groups of the user. -int get_user_groups(struct permission_checker *pc); - -// Check whether user is member of a group or not. -int is_member_of_group(struct permission_checker *pc, gid_t gid); - -// Check whether user can read file according Linux ACLs. -// As flag use either USER_CHECK or GROUP_CHECK. -int check_acl(struct permission_checker *pc, const int flag); - -// Check whether user has permissions to read file according traditional -// UNIX permissions. As flag use either USER_CHECK or GROUP_CHECK. -int check_traditional(struct permission_checker *pc, const int flag); - -// Returns 1 if user has permission to read file. -// Returns <0 on error and returns 0 if no permissions. -int permission_to_read(char* user, char *file_path); - -#endif // PERMISSION_LINUX_H diff --git a/internal/fs/permissions/permission_test.go b/internal/fs/permissions/permission_test.go deleted file mode 100644 index d415ac2..0000000 --- a/internal/fs/permissions/permission_test.go +++ /dev/null @@ -1,112 +0,0 @@ -// +build linux - -package permissions - -import ( - "os" - "os/exec" - "os/user" - "strings" - "testing" -) - -const ( - setfacl string = "/usr/bin/setfacl" - file string = "/tmp/acltest" -) - -func TestLinuxACL(t *testing.T) { - setfacl := "/usr/bin/setfacl" - file := "/tmp/acltest" - - // Delete file if it exists. - if _, err := os.Stat(file); err == nil { - os.Remove(file) - } - - f, err := os.Create(file) - if err != nil { - t.Errorf("%v", err) - } - defer func() { - f.Close() - //os.Remove(file) - }() - - user, err := user.Current() - if err != nil { - t.Errorf("Unable to retrieve current user: %v", err) - } - - // Test 1: Remove all permissions and perform a permission check - cmd := exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, _ := ToRead(user.Username, file); ok { - t.Errorf("Didn't expect permissions to read file!") - } - - // Test 2: Add read permission to file owner - cmd = exec.Command(setfacl, "-b", "-m", "u::r--,g::---,o::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, err := ToRead(user.Username, file); !ok { - t.Errorf("Expected permissions to read file: %v", err) - } - - // Test 3: Add read permission to file group - cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::r--,o::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, err := ToRead(user.Username, file); !ok { - t.Errorf("Expected permissions to read file: %v", err) - } - - // Test 4: Add read permission to others - cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::r--", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - - if ok, err := ToRead(user.Username, file); !ok { - t.Errorf("Expected permissions to read file: %v", err) - } - - // Test 5: Remove read permission from mask - cmd = exec.Command(setfacl, "-m", "m::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, _ := ToRead(user.Username, file); ok { - t.Errorf("Didn't expect permissions to read file!") - } - cmd = exec.Command(setfacl, "-m", "m::r--", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - - // Test 6: Add read permission to specific group - cmd = exec.Command(setfacl, "-b", "-m", "u::---,g:"+user.Username+":r--,o::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, err := ToRead(user.Username, file); !ok { - t.Errorf("Expected permissions to read file for user %v: %v", user.Username, err) - } - - // Test 7: Remove all permissions but mask - cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - cmd = exec.Command(setfacl, "-m", "m::r--", file) - if err := cmd.Run(); err != nil { - t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err) - } - if ok, _ := ToRead(user.Username, file); ok { - t.Errorf("Didn't expect permissions to read file!") - } -} diff --git a/internal/fs/readfile.go b/internal/fs/readfile.go deleted file mode 100644 index 312447a..0000000 --- a/internal/fs/readfile.go +++ /dev/null @@ -1,318 +0,0 @@ -package fs - -import ( - "bufio" - "compress/gzip" - "github.com/mimecast/dtail/internal/logger" - "errors" - "io" - "os" - "regexp" - "strings" - "sync" - "time" - - "github.com/DataDog/zstd" -) - -// Used to tail and filter a local log file. -type readFile struct { - // Various statistics (e.g. regex hit percentage, transfer percentage). - stats - // Path of log file to tail. - filePath string - // Only consider all log lines matching this regular expression. - re *regexp.Regexp - // The glob identifier of the file. - globID string - // Channel to send a server message to the dtail client - serverMessages chan<- string - // Signals to stop tailing the log file. - stop chan struct{} - // Periodically retry reading file. - retry bool - // Can I skip messages when there are too many? - canSkipLines bool - // Seek to the EOF before processing file? - seekEOF bool - // Mutex to control the stopping of the file - mutex *sync.Mutex - limiter chan struct{} -} - -// FilePath returns the full file path. -func (f readFile) FilePath() string { - return f.filePath -} - -// Retry reading the file on error? -func (f readFile) Retry() bool { - return f.retry -} - -// Start tailing a log file. -func (f readFile) Start(lines chan<- LineRead, regex string) error { - defer func() { - select { - case <-f.limiter: - default: - } - }() - - select { - case f.limiter <- struct{}{}: - default: - select { - case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): - case <-f.stop: - return nil - } - f.limiter <- struct{}{} - } - - fd, err := os.Open(f.filePath) - if err != nil { - return err - } - defer fd.Close() - - if f.seekEOF { - fd.Seek(0, io.SeekEnd) - } - - rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(truncate) - go f.filter(&wg, rawLines, lines, regex) - - err = f.read(fd, rawLines, truncate) - close(rawLines) - wg.Wait() - - return err -} - -func (f readFile) periodicTruncateCheck(truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-f.stop: - } - case <-f.stop: - return - } - } -} - -// Stop reading file. -func (f readFile) Stop() { - f.mutex.Lock() - defer f.mutex.Unlock() - - select { - case <-f.stop: - return - default: - } - - close(f.stop) -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } - - return -} - -func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { - reader, err := f.makeReader(fd) - if err != nil { - return err - } - rawLine := make([]byte, 0, 512) - var offset uint64 - - lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false - - for { - select { - case <-truncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - logger.Info(f.filePath, "Current offset", offset) - - case <-f.stop: - return nil - default: - } - - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() - - if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. - if err != io.EOF { - return err - } - if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") - return nil - } - time.Sleep(time.Millisecond * 100) - continue - } - - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-f.stop: - return nil - } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false - } - continue - } - - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true - } - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-f.stop: - return nil - } - rawLine = make([]byte, 0, 512) - } - } -} - -// Filter log lines matching a given regular expression. -func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) { - defer wg.Done() - - if regex == "" { - regex = "." - } - - re, err := regexp.Compile(regex) - if err != nil { - logger.Error(regex, "Can't compile regex, using '.' instead", err) - re = regexp.MustCompile(".") - } - f.re = re - - for { - select { - case line, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok { - select { - case lines <- filteredLine: - case <-f.stop: - return - } - } - } - } -} - -func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) { - var read LineRead - - if !f.re.Match(line) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, false - } - f.updateLineTransmitted() - - read = LineRead{ - Content: line, - GlobID: &f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} diff --git a/internal/fs/stats.go b/internal/fs/stats.go deleted file mode 100644 index 4121ff7..0000000 --- a/internal/fs/stats.go +++ /dev/null @@ -1,69 +0,0 @@ -package fs - -// Used to calculate how many log lines matched the regular expression -// and how many log files could be transmitted from the server to the client. -// Hit and transmit percentage takes only the last 100 log lines into calculation. -type stats struct { - pos int - lineCount uint64 - matched [100]bool - matchCount uint64 - transmitted [100]bool - transmitCount int -} - -// Return the total line count. -func (f *stats) totalLineCount() uint64 { - return f.lineCount -} - -// Calculate the percentage of log lines transmitted to the client. -func (f *stats) transmittedPerc() int { - return int(percentOf(float64(f.matchCount), float64(f.transmitCount))) -} - -// Update bucket position. We only take into consideration the last 100 -// lines for stats. -func (f *stats) updatePosition() { - f.pos = (f.pos + 1) % 100 - f.lineCount++ -} - -// Increment match counter. -func (f *stats) updateLineMatched() { - if !f.matched[f.pos] { - f.matchCount++ - f.matched[f.pos] = true - } -} - -// Increment transmitted counter. -func (f *stats) updateLineTransmitted() { - if !f.transmitted[f.pos] { - f.transmitCount++ - f.transmitted[f.pos] = true - } -} - -// Decrement match counter. -func (f *stats) updateLineNotMatched() { - if f.matched[f.pos] { - f.matchCount-- - f.matched[f.pos] = false - } -} - -// Decrement transmitted counter. -func (f *stats) updateLineNotTransmitted() { - if f.transmitted[f.pos] { - f.transmitCount-- - f.transmitted[f.pos] = false - } -} - -func percentOf(total float64, value float64) float64 { - if total == 0 || total == value { - return 100 - } - return value / (total / 100.0) -} diff --git a/internal/fs/tailfile.go b/internal/fs/tailfile.go deleted file mode 100644 index a19d4e6..0000000 --- a/internal/fs/tailfile.go +++ /dev/null @@ -1,27 +0,0 @@ -package fs - -import "sync" - -// TailFile is to tail and filter a log file. -type TailFile struct { - readFile -} - -// NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { - var mutex sync.Mutex - - return TailFile{ - readFile: readFile{ - filePath: filePath, - stop: make(chan struct{}), - globID: globID, - serverMessages: serverMessages, - retry: true, - canSkipLines: true, - seekEOF: true, - limiter: limiter, - mutex: &mutex, - }, - } -} |
