summaryrefslogtreecommitdiff
path: root/internal/fs
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/fs
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/fs')
-rw-r--r--internal/fs/catfile.go27
-rw-r--r--internal/fs/filereader.go9
-rw-r--r--internal/fs/lineread.go28
-rw-r--r--internal/fs/permissions/permission.go14
-rw-r--r--internal/fs/permissions/permission_linux.c395
-rw-r--r--internal/fs/permissions/permission_linux.go33
-rw-r--r--internal/fs/permissions/permission_linux.h60
-rw-r--r--internal/fs/permissions/permission_test.go112
-rw-r--r--internal/fs/readfile.go318
-rw-r--r--internal/fs/stats.go69
-rw-r--r--internal/fs/tailfile.go27
11 files changed, 0 insertions, 1092 deletions
diff --git a/internal/fs/catfile.go b/internal/fs/catfile.go
deleted file mode 100644
index 99f521f..0000000
--- a/internal/fs/catfile.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package fs
-
-import "sync"
-
-// CatFile is for reading a whole file.
-type CatFile struct {
- readFile
-}
-
-// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile {
- var mutex sync.Mutex
-
- return CatFile{
- readFile: readFile{
- filePath: filePath,
- stop: make(chan struct{}),
- globID: globID,
- serverMessages: serverMessages,
- retry: false,
- canSkipLines: false,
- seekEOF: false,
- limiter: limiter,
- mutex: &mutex,
- },
- }
-}
diff --git a/internal/fs/filereader.go b/internal/fs/filereader.go
deleted file mode 100644
index 5a08e27..0000000
--- a/internal/fs/filereader.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package fs
-
-// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
-type FileReader interface {
- Start(lines chan<- LineRead, regex string) error
- FilePath() string
- Retry() bool
- Stop()
-}
diff --git a/internal/fs/lineread.go b/internal/fs/lineread.go
deleted file mode 100644
index 7ee558e..0000000
--- a/internal/fs/lineread.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package fs
-
-import (
- "fmt"
-)
-
-// LineRead represents a read log line.
-type LineRead struct {
- // The content of the log line.
- Content []byte
- // Until now, how many log lines were processed?
- Count uint64
- // Sometimes we produce too many log lines so that the client
- // is too slow to process all of them. The server will drop log
- // lines if that happens but it will signal to the client how
- // many log lines in % could be transmitted to the client.
- TransmittedPerc int
- GlobID *string
-}
-
-// Return a human readable representation of the followed line.
-func (l LineRead) String() string {
- return fmt.Sprintf("LineRead(Content:%s,TransmittedPerc:%v,Count:%v,GlobID:%s)",
- string(l.Content),
- l.TransmittedPerc,
- l.Count,
- *l.GlobID)
-}
diff --git a/internal/fs/permissions/permission.go b/internal/fs/permissions/permission.go
deleted file mode 100644
index 6e83309..0000000
--- a/internal/fs/permissions/permission.go
+++ /dev/null
@@ -1,14 +0,0 @@
-// +build !linux
-
-package permissions
-
-import (
- "github.com/mimecast/dtail/internal/logger"
-)
-
-// ToRead is to check whether user has read permissions to a given file.
-func ToRead(user, filePath string) (bool, error) {
- // Only implemented for Linux, always expect true
- logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
- return true, nil
-}
diff --git a/internal/fs/permissions/permission_linux.c b/internal/fs/permissions/permission_linux.c
deleted file mode 100644
index cd10525..0000000
--- a/internal/fs/permissions/permission_linux.c
+++ /dev/null
@@ -1,395 +0,0 @@
-#include "permission_linux.h"
-
-#ifdef DEBUG
-void debug_print_checker(struct permission_checker *pc) {
- fprintf(stderr, "DEBUG: user_name:%s (%d)\n",
- pc->user_name, pc->uid);
-
- fprintf(stderr, "DEBUG: ngids:%d\n", pc->ngids);
- int j;
- for (j = 0; j < pc->ngids; j++) {
- fprintf(stderr, "DEBUG: %d", pc->gids[j]);
- struct group *gr = getgrgid(pc->gids[j]);
- if (gr != NULL)
- fprintf(stderr, " (%s)", gr->gr_name);
- fprintf(stderr, "\n");
- }
-
- fprintf(stderr, "DEBUG: file_path:%s (%d:%d)\n",
- pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid);
-}
-#endif // DEBUG
-
-int stat_file(struct permission_checker *pc) {
- if (stat(pc->file_path, &pc->file_stat) != 0)
- return -1;
-
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: File'%s' is owned by '%d:%d'\n",
- pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid);
-#endif
-
- return 0;
-}
-
-int get_user_uid(struct permission_checker *pc) {
- struct passwd *result = NULL;
-
- size_t bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
- if (bufsize == -1)
- bufsize = 16384;
-
- char *buf = malloc(bufsize);
- if (buf == NULL) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unabel to allocate bufer while retrieving user '%s'\n", pc->user_name);
-#endif
- return -1;
- }
-
- int rc = getpwnam_r(pc->user_name, &pc->pw, buf, bufsize, &result);
-
- if (result == NULL) {
-#ifdef DEBUG
- if (rc == 0) {
- fprintf(stderr, "DEBUG: No user '%s' found\n", pc->user_name);
- } else {
- fprintf(stderr, "DEBUG: Unknown error while retrieving user '%s'\n", pc->user_name);
- }
-#endif
-
- free(buf);
- return -1;
- }
-
- pc->uid = pc->pw.pw_uid;
-
- free(buf);
- return 0;
-}
-
-int get_user_groups(struct permission_checker *pc) {
- // First assume we are in 10 groups max
- pc->ngids = 10;
- pc->gids = malloc(pc->ngids * sizeof(gid_t));
-
- if (pc->gids == NULL) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unable to allocate space for gids.");
-#endif
- return -1;
- }
-
- // Try so many times to load group list until it fits into group array.
- while (getgrouplist(pc->user_name, pc->pw.pw_gid, pc->gids, &pc->ngids) == -1) {
- // Too many groups, enlarge group array and try again
- int newngids = pc->ngids + 100;
- size_t newsize = newngids * sizeof(gid_t);
-
- if (SIZE_MAX / newngids < sizeof(gid_t)) {
- // Overflow
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Overflow detected.");
-#endif
- return -1;
- }
-
- gid_t *newgids = realloc(pc->gids, newsize);
- if (newgids == NULL) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unable to allocate space for gids.");
-#endif
- free(pc->gids);
- return -1;
- }
-
- pc->gids = newgids;
- pc->ngids = newngids;
- }
-
- return 0;
-}
-
-int is_member_of_group(struct permission_checker *pc, gid_t gid) {
- int j;
- for (j = 0; j < pc->ngids; j++)
- if (pc->gids[j] == gid)
- return 1;
- return 0;
-}
-
-int check_acl_uid_matches(uid_t uid, acl_entry_t entry) {
- int ret = -1;
- uid_t *acl_uid = acl_get_qualifier(entry);
- if (acl_uid == NULL) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry");
-#endif
- return -1;
- }
-
- ret = *acl_uid == uid ? 0 : -1;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL user match?: %d <=> %d: %d\n", *acl_uid, uid, ret);
-#endif
- acl_free(acl_uid);
- return ret;
-}
-
-int check_acl_gid_matches(gid_t *gids, int ngids, acl_entry_t entry) {
- int ret = -1;
- gid_t *acl_gid = acl_get_qualifier(entry);
- if (acl_gid == NULL) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry");
-#endif
- return -1;
- }
-
- int j;
- for (j = 0; j < ngids; j++) {
- if (*acl_gid == gids[j]) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: User is in group %d", *acl_gid);
-#endif
- ret = 0;
- break;
- }
- }
-
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL group match?: %d <=> ...: %d\n", *acl_gid, ret);
-#endif
- acl_free(acl_gid);
- return ret;
-}
-
-int check_acl(struct permission_checker *pc, const int flag) {
- // By default user has no read perm.
- int has_read_perm = 0;
-
- // By default mask tells that there are read perm. However in order to have
- // read permissions both, has_read_perm and mask_allows_read_access must be 1!
- int mask_allows_read_access = 1;
-
- acl_type_t type = ACL_TYPE_ACCESS;
- acl_t acl = acl_get_file(pc->file_path, type);
-
- if (acl == NULL)
- // Unable to retrieve ACL.
- return -1;
-
- // Walk through each entry of this ACL.
- int id;
- for (id = ACL_FIRST_ENTRY; ; id = ACL_NEXT_ENTRY) {
- acl_entry_t entry;
- if (acl_get_entry(acl, id, &entry) != 1)
- // No more ACL entries.
- break;
-
- acl_tag_t tag;
- if (acl_get_tag_type(entry, &tag) == -1)
- // Unable to retrieve ACL tag.
- return -1;
-
- switch (tag) {
- case ACL_USER_OBJ:
- if (flag == GROUP_CHECK)
- continue;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_USER_OBJ\n");
-#endif
- // Ignore this ACL entry if user is not owner of file.
- if (pc->uid != pc->file_stat.st_uid)
- continue;
- break;
- case ACL_USER:
- if (flag == GROUP_CHECK)
- continue;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_USER\n");
-#endif
- // Ignore this ACL entry if uid does not match.
- if (check_acl_uid_matches(pc->uid, entry) != 0)
- continue;
- break;
- case ACL_GROUP_OBJ:
- if (flag == USER_CHECK)
- continue;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_GROUP_OBJ\n");
-#endif
- // Ignore ACL entry if user is not in group of file.
- if (!is_member_of_group(pc, pc->file_stat.st_gid))
- continue;
- break;
- case ACL_GROUP:
- if (flag == USER_CHECK)
- continue;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_GROUP\n");
-#endif
- // Ignore ACL entry if user is not in group of entry.
- if (check_acl_gid_matches(pc->gids, pc->ngids, entry) != 0)
- continue;
- break;
- case ACL_OTHER:
- if (flag == GROUP_CHECK)
- continue;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_OTHER\n");
-#endif
- break;
- case ACL_MASK:
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL_MASK\n");
-#endif
- break;
- default:
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Unknown ACL tag\n");
-#endif
- return -1;
- }
-
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Retrieving permset\n");
-#endif
- acl_permset_t permset;
- int permission;
- if (acl_get_permset(entry, &permset) == -1)
- // Unable to retrieve permset.
- return -1;
-
- if ((permission = acl_get_perm(permset, ACL_READ)) == -1)
- // Unable to retrieve permset value.
- return -1;
-
- if (permission == 1 && tag != ACL_MASK) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL says user has permission to read file.\n");
-#endif
- has_read_perm = 1;
- } else if (permission == 0 && tag == ACL_MASK) {
- // Mask says that there are no permissions to read.
- mask_allows_read_access = 0;
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL mask says no permission to read file.\n");
-#endif
- }
- }
-
- if (has_read_perm && mask_allows_read_access) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL end result: User has permission to read file.\n");
-#endif
- return 1;
- }
-
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: ACL end result: User has no permission to read file.\n");
-#endif
- return 0;
-}
-
-int check_traditional(struct permission_checker *pc, const int flag) {
- mode_t mode = pc->file_stat.st_mode;
- uid_t uid = pc->file_stat.st_uid;
- gid_t gid = pc->file_stat.st_gid;
-
- if (flag == USER_CHECK && (mode & S_IROTH)) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: Others can read file '%s'\n",
- pc->file_path);
-#endif
- return 1;
-
- } else if (flag == USER_CHECK && (mode & S_IRUSR) && uid == pc->uid) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: User '%s' can read file '%s'\n",
- pc->user_name, pc->file_path);
-#endif
- return 1;
-
- } else if (flag == GROUP_CHECK && (mode & S_IRGRP) && is_member_of_group(pc, gid)) {
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: User's '%s' group can read file '%s'\n",
- pc->user_name, pc->file_path);
-#endif
- return 1;
- }
-
- return 0;
-}
-
-int permission_to_read(char* user_name, char *file_path) {
- int rc = -1;
-
-#ifdef DEBUG
- fprintf(stderr, "DEBUG: User check '%s' for file '%s'\n", user_name, file_path);
-#endif
- struct permission_checker pc = {
- .user_name = user_name,
- .gids = NULL,
- .ngids = 0,
- .file_path = file_path,
- };
-
- // Gather user's UID.
- if ((rc = get_user_uid(&pc)) == -1)
- // Could not retrieve UID.
- goto cleanup;
-
- // Gather file owner (user and group).
- if ((rc = stat_file(&pc)) == -1)
- // Could not stat file.
- goto cleanup;
-
- // Check whether there is an ACL entry which would allow the user
- // to read the file. Don't check for any groups yet. The issue with
- // groups is that it can be very slow to retrieve the list of groups
- // of a specific user when done via a remote LDAP server!
- if ((rc = check_acl(&pc, USER_CHECK)) == 1)
- // Yes, has permissions.
- goto cleanup;
-
- // Check whether ACLs of file could be retrieved.
- if (rc == -1) {
- if (errno != ENOTSUP)
- // Unknown error.
- goto cleanup;
-
- // File system does not support ACLs.
- // Fallback to traditional permissions.
- if ((rc = check_traditional(&pc, USER_CHECK)) == 1)
- // Yes, has traditional permissions.
- goto cleanup;
-
- if ((rc = get_user_groups(&pc)) == -1)
- // Can not retrieve user's groups.
- goto cleanup;
-
- rc = check_traditional(&pc, GROUP_CHECK);
- goto cleanup;
- }
-
- if ((rc = get_user_groups(&pc)) == -1)
- // Can not retrieve use'r groups.
- goto cleanup;
-
- // Check whether there is an ACL entry which would allow any of the
- // user's groups to read the file.
- rc = check_acl(&pc, GROUP_CHECK);
-
-cleanup:
-#ifdef DEBUG
- debug_print_checker(&pc);
-#endif
-
- if (pc.ngids)
- free(pc.gids);
-
- return rc;
-}
-
-// vim: set tabstop=8 softtabstop=0 expandtab shiftwidth=4 smarttab
diff --git a/internal/fs/permissions/permission_linux.go b/internal/fs/permissions/permission_linux.go
deleted file mode 100644
index feae729..0000000
--- a/internal/fs/permissions/permission_linux.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package permissions
-
-/*
-#include "permission_linux.h"
-#cgo LDFLAGS: -L. -lacl
-*/
-import "C"
-
-import (
- "errors"
- "unsafe"
-)
-
-// To check whether user has Linux file system permissions to read a given file.
-func ToRead(user, filePath string) (bool, error) {
- cUser := C.CString(user)
- cFilePath := C.CString(filePath)
-
- defer C.free(unsafe.Pointer(cUser))
- defer C.free(unsafe.Pointer(cFilePath))
-
- cOk, err := C.permission_to_read(cUser, cFilePath)
- if cOk == 1 {
- return true, nil
- }
-
- if err != nil {
- // err contains errno message
- return false, err
- }
-
- return false, errors.New("User without permission to read file")
-}
diff --git a/internal/fs/permissions/permission_linux.h b/internal/fs/permissions/permission_linux.h
deleted file mode 100644
index a2c266e..0000000
--- a/internal/fs/permissions/permission_linux.h
+++ /dev/null
@@ -1,60 +0,0 @@
-#ifndef PERMISSION_LINUX_H
-#define PERMISSION_LINUX_H
-
-#include <acl/libacl.h>
-#include <errno.h>
-#include <grp.h>
-#include <pwd.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <sys/acl.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-//#define DEBUG
-#define USER_CHECK 0
-#define GROUP_CHECK 1
-
-struct permission_checker {
- char *user_name;
- uid_t uid;
- gid_t *gids;
- int ngids;
- char *file_path;
- struct stat file_stat;
- struct passwd pw;
-};
-
-
-#ifdef DEBUG
-// Print out permission_checker struct.
-void debug_print_checker(struct permission_checker *pc);
-#endif
-
-// Stat a given file to retrieve traditional UNIX permissions.
-int stat_file(struct permission_checker *pc);
-
-// Retrieve UID of user.
-int get_user_uid(struct permission_checker *pc);
-
-// Retrieve all groups of the user.
-int get_user_groups(struct permission_checker *pc);
-
-// Check whether user is member of a group or not.
-int is_member_of_group(struct permission_checker *pc, gid_t gid);
-
-// Check whether user can read file according Linux ACLs.
-// As flag use either USER_CHECK or GROUP_CHECK.
-int check_acl(struct permission_checker *pc, const int flag);
-
-// Check whether user has permissions to read file according traditional
-// UNIX permissions. As flag use either USER_CHECK or GROUP_CHECK.
-int check_traditional(struct permission_checker *pc, const int flag);
-
-// Returns 1 if user has permission to read file.
-// Returns <0 on error and returns 0 if no permissions.
-int permission_to_read(char* user, char *file_path);
-
-#endif // PERMISSION_LINUX_H
diff --git a/internal/fs/permissions/permission_test.go b/internal/fs/permissions/permission_test.go
deleted file mode 100644
index d415ac2..0000000
--- a/internal/fs/permissions/permission_test.go
+++ /dev/null
@@ -1,112 +0,0 @@
-// +build linux
-
-package permissions
-
-import (
- "os"
- "os/exec"
- "os/user"
- "strings"
- "testing"
-)
-
-const (
- setfacl string = "/usr/bin/setfacl"
- file string = "/tmp/acltest"
-)
-
-func TestLinuxACL(t *testing.T) {
- setfacl := "/usr/bin/setfacl"
- file := "/tmp/acltest"
-
- // Delete file if it exists.
- if _, err := os.Stat(file); err == nil {
- os.Remove(file)
- }
-
- f, err := os.Create(file)
- if err != nil {
- t.Errorf("%v", err)
- }
- defer func() {
- f.Close()
- //os.Remove(file)
- }()
-
- user, err := user.Current()
- if err != nil {
- t.Errorf("Unable to retrieve current user: %v", err)
- }
-
- // Test 1: Remove all permissions and perform a permission check
- cmd := exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, _ := ToRead(user.Username, file); ok {
- t.Errorf("Didn't expect permissions to read file!")
- }
-
- // Test 2: Add read permission to file owner
- cmd = exec.Command(setfacl, "-b", "-m", "u::r--,g::---,o::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, err := ToRead(user.Username, file); !ok {
- t.Errorf("Expected permissions to read file: %v", err)
- }
-
- // Test 3: Add read permission to file group
- cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::r--,o::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, err := ToRead(user.Username, file); !ok {
- t.Errorf("Expected permissions to read file: %v", err)
- }
-
- // Test 4: Add read permission to others
- cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::r--", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
-
- if ok, err := ToRead(user.Username, file); !ok {
- t.Errorf("Expected permissions to read file: %v", err)
- }
-
- // Test 5: Remove read permission from mask
- cmd = exec.Command(setfacl, "-m", "m::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, _ := ToRead(user.Username, file); ok {
- t.Errorf("Didn't expect permissions to read file!")
- }
- cmd = exec.Command(setfacl, "-m", "m::r--", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
-
- // Test 6: Add read permission to specific group
- cmd = exec.Command(setfacl, "-b", "-m", "u::---,g:"+user.Username+":r--,o::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, err := ToRead(user.Username, file); !ok {
- t.Errorf("Expected permissions to read file for user %v: %v", user.Username, err)
- }
-
- // Test 7: Remove all permissions but mask
- cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- cmd = exec.Command(setfacl, "-m", "m::r--", file)
- if err := cmd.Run(); err != nil {
- t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
- }
- if ok, _ := ToRead(user.Username, file); ok {
- t.Errorf("Didn't expect permissions to read file!")
- }
-}
diff --git a/internal/fs/readfile.go b/internal/fs/readfile.go
deleted file mode 100644
index 312447a..0000000
--- a/internal/fs/readfile.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package fs
-
-import (
- "bufio"
- "compress/gzip"
- "github.com/mimecast/dtail/internal/logger"
- "errors"
- "io"
- "os"
- "regexp"
- "strings"
- "sync"
- "time"
-
- "github.com/DataDog/zstd"
-)
-
-// Used to tail and filter a local log file.
-type readFile struct {
- // Various statistics (e.g. regex hit percentage, transfer percentage).
- stats
- // Path of log file to tail.
- filePath string
- // Only consider all log lines matching this regular expression.
- re *regexp.Regexp
- // The glob identifier of the file.
- globID string
- // Channel to send a server message to the dtail client
- serverMessages chan<- string
- // Signals to stop tailing the log file.
- stop chan struct{}
- // Periodically retry reading file.
- retry bool
- // Can I skip messages when there are too many?
- canSkipLines bool
- // Seek to the EOF before processing file?
- seekEOF bool
- // Mutex to control the stopping of the file
- mutex *sync.Mutex
- limiter chan struct{}
-}
-
-// FilePath returns the full file path.
-func (f readFile) FilePath() string {
- return f.filePath
-}
-
-// Retry reading the file on error?
-func (f readFile) Retry() bool {
- return f.retry
-}
-
-// Start tailing a log file.
-func (f readFile) Start(lines chan<- LineRead, regex string) error {
- defer func() {
- select {
- case <-f.limiter:
- default:
- }
- }()
-
- select {
- case f.limiter <- struct{}{}:
- default:
- select {
- case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
- case <-f.stop:
- return nil
- }
- f.limiter <- struct{}{}
- }
-
- fd, err := os.Open(f.filePath)
- if err != nil {
- return err
- }
- defer fd.Close()
-
- if f.seekEOF {
- fd.Seek(0, io.SeekEnd)
- }
-
- rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(truncate)
- go f.filter(&wg, rawLines, lines, regex)
-
- err = f.read(fd, rawLines, truncate)
- close(rawLines)
- wg.Wait()
-
- return err
-}
-
-func (f readFile) periodicTruncateCheck(truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-f.stop:
- }
- case <-f.stop:
- return
- }
- }
-}
-
-// Stop reading file.
-func (f readFile) Stop() {
- f.mutex.Lock()
- defer f.mutex.Unlock()
-
- select {
- case <-f.stop:
- return
- default:
- }
-
- close(f.stop)
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
-
- return
-}
-
-func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
- reader, err := f.makeReader(fd)
- if err != nil {
- return err
- }
- rawLine := make([]byte, 0, 512)
- var offset uint64
-
- lineLengthThreshold := 1024 * 1024 // 1mb
- longLineWarning := false
-
- for {
- select {
- case <-truncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return err
- }
- logger.Info(f.filePath, "Current offset", offset)
-
- case <-f.stop:
- return nil
- default:
- }
-
- // Read some bytes (max 4k at once as of go 1.12). isPrefix will
- // be set if line does not fit into 4k buffer.
- bytes, isPrefix, err := reader.ReadLine()
-
- if err != nil {
- // If EOF, sleep a couple of ms and return with nil error.
- // If other error, return with non-nil error.
- if err != io.EOF {
- return err
- }
- if !f.seekEOF {
- logger.Debug(f.FilePath(), "End of file reached")
- return nil
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- rawLine = append(rawLine, bytes...)
- offset += uint64(len(bytes))
-
- if !isPrefix {
- // last LineRead call returned contend until end of line.
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-f.stop:
- return nil
- }
- rawLine = make([]byte, 0, 512)
- if longLineWarning {
- longLineWarning = false
- }
- continue
- }
-
- // Last LineRead call could not read content until end of line, buffer
- // was too small. Determine whether we exceed the max line length we
- // want dtail to send to the client at once. Possibly split up log line
- // into multiple log lines.
- if len(rawLine) >= lineLengthThreshold {
- if !longLineWarning {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
- // Only print out one warning per long log line.
- longLineWarning = true
- }
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-f.stop:
- return nil
- }
- rawLine = make([]byte, 0, 512)
- }
- }
-}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) {
- defer wg.Done()
-
- if regex == "" {
- regex = "."
- }
-
- re, err := regexp.Compile(regex)
- if err != nil {
- logger.Error(regex, "Can't compile regex, using '.' instead", err)
- re = regexp.MustCompile(".")
- }
- f.re = re
-
- for {
- select {
- case line, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok {
- select {
- case lines <- filteredLine:
- case <-f.stop:
- return
- }
- }
- }
- }
-}
-
-func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) {
- var read LineRead
-
- if !f.re.Match(line) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineTransmitted()
-
- read = LineRead{
- Content: line,
- GlobID: &f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}
diff --git a/internal/fs/stats.go b/internal/fs/stats.go
deleted file mode 100644
index 4121ff7..0000000
--- a/internal/fs/stats.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package fs
-
-// Used to calculate how many log lines matched the regular expression
-// and how many log files could be transmitted from the server to the client.
-// Hit and transmit percentage takes only the last 100 log lines into calculation.
-type stats struct {
- pos int
- lineCount uint64
- matched [100]bool
- matchCount uint64
- transmitted [100]bool
- transmitCount int
-}
-
-// Return the total line count.
-func (f *stats) totalLineCount() uint64 {
- return f.lineCount
-}
-
-// Calculate the percentage of log lines transmitted to the client.
-func (f *stats) transmittedPerc() int {
- return int(percentOf(float64(f.matchCount), float64(f.transmitCount)))
-}
-
-// Update bucket position. We only take into consideration the last 100
-// lines for stats.
-func (f *stats) updatePosition() {
- f.pos = (f.pos + 1) % 100
- f.lineCount++
-}
-
-// Increment match counter.
-func (f *stats) updateLineMatched() {
- if !f.matched[f.pos] {
- f.matchCount++
- f.matched[f.pos] = true
- }
-}
-
-// Increment transmitted counter.
-func (f *stats) updateLineTransmitted() {
- if !f.transmitted[f.pos] {
- f.transmitCount++
- f.transmitted[f.pos] = true
- }
-}
-
-// Decrement match counter.
-func (f *stats) updateLineNotMatched() {
- if f.matched[f.pos] {
- f.matchCount--
- f.matched[f.pos] = false
- }
-}
-
-// Decrement transmitted counter.
-func (f *stats) updateLineNotTransmitted() {
- if f.transmitted[f.pos] {
- f.transmitCount--
- f.transmitted[f.pos] = false
- }
-}
-
-func percentOf(total float64, value float64) float64 {
- if total == 0 || total == value {
- return 100
- }
- return value / (total / 100.0)
-}
diff --git a/internal/fs/tailfile.go b/internal/fs/tailfile.go
deleted file mode 100644
index a19d4e6..0000000
--- a/internal/fs/tailfile.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package fs
-
-import "sync"
-
-// TailFile is to tail and filter a log file.
-type TailFile struct {
- readFile
-}
-
-// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile {
- var mutex sync.Mutex
-
- return TailFile{
- readFile: readFile{
- filePath: filePath,
- stop: make(chan struct{}),
- globID: globID,
- serverMessages: serverMessages,
- retry: true,
- canSkipLines: true,
- seekEOF: true,
- limiter: limiter,
- mutex: &mutex,
- },
- }
-}