summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/io
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/catfile.go21
-rw-r--r--internal/io/fs/filereader.go14
-rw-r--r--internal/io/fs/permissions/permission.go14
-rw-r--r--internal/io/fs/permissions/permission_linux.c395
-rw-r--r--internal/io/fs/permissions/permission_linux.go33
-rw-r--r--internal/io/fs/permissions/permission_linux.h60
-rw-r--r--internal/io/fs/permissions/permission_test.go112
-rw-r--r--internal/io/fs/readfile.go307
-rw-r--r--internal/io/fs/stats.go69
-rw-r--r--internal/io/fs/tailfile.go21
-rw-r--r--internal/io/line/line.go28
-rw-r--r--internal/io/logger/logger.go445
-rw-r--r--internal/io/run/run.go104
13 files changed, 1623 insertions, 0 deletions
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
new file mode 100644
index 0000000..7f387bc
--- /dev/null
+++ b/internal/io/fs/catfile.go
@@ -0,0 +1,21 @@
+package fs
+
+// CatFile is for reading a whole file.
+type CatFile struct {
+ readFile
+}
+
+// NewCatFile returns a new file catter.
+func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile {
+ return CatFile{
+ readFile: readFile{
+ filePath: filePath,
+ globID: globID,
+ serverMessages: serverMessages,
+ retry: false,
+ canSkipLines: false,
+ seekEOF: false,
+ limiter: limiter,
+ },
+ }
+}
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
new file mode 100644
index 0000000..05e58a1
--- /dev/null
+++ b/internal/io/fs/filereader.go
@@ -0,0 +1,14 @@
+package fs
+
+import (
+ "context"
+
+ "github.com/mimecast/dtail/internal/io/line"
+)
+
+// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
+type FileReader interface {
+ Start(ctx context.Context, lines chan<- line.Line, regex string) error
+ FilePath() string
+ Retry() bool
+}
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
new file mode 100644
index 0000000..0ed4f17
--- /dev/null
+++ b/internal/io/fs/permissions/permission.go
@@ -0,0 +1,14 @@
+// +build !linux
+
+package permissions
+
+import (
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+// ToRead is to check whether user has read permissions to a given file.
+func ToRead(user, filePath string) (bool, error) {
+ // Only implemented for Linux, always expect true
+ logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
+ return true, nil
+}
diff --git a/internal/io/fs/permissions/permission_linux.c b/internal/io/fs/permissions/permission_linux.c
new file mode 100644
index 0000000..cd10525
--- /dev/null
+++ b/internal/io/fs/permissions/permission_linux.c
@@ -0,0 +1,395 @@
+#include "permission_linux.h"
+
+#ifdef DEBUG
+void debug_print_checker(struct permission_checker *pc) {
+ fprintf(stderr, "DEBUG: user_name:%s (%d)\n",
+ pc->user_name, pc->uid);
+
+ fprintf(stderr, "DEBUG: ngids:%d\n", pc->ngids);
+ int j;
+ for (j = 0; j < pc->ngids; j++) {
+ fprintf(stderr, "DEBUG: %d", pc->gids[j]);
+ struct group *gr = getgrgid(pc->gids[j]);
+ if (gr != NULL)
+ fprintf(stderr, " (%s)", gr->gr_name);
+ fprintf(stderr, "\n");
+ }
+
+ fprintf(stderr, "DEBUG: file_path:%s (%d:%d)\n",
+ pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid);
+}
+#endif // DEBUG
+
+int stat_file(struct permission_checker *pc) {
+ if (stat(pc->file_path, &pc->file_stat) != 0)
+ return -1;
+
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: File'%s' is owned by '%d:%d'\n",
+ pc->file_path, pc->file_stat.st_uid, pc->file_stat.st_gid);
+#endif
+
+ return 0;
+}
+
+int get_user_uid(struct permission_checker *pc) {
+ struct passwd *result = NULL;
+
+ size_t bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
+ if (bufsize == -1)
+ bufsize = 16384;
+
+ char *buf = malloc(bufsize);
+ if (buf == NULL) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unabel to allocate bufer while retrieving user '%s'\n", pc->user_name);
+#endif
+ return -1;
+ }
+
+ int rc = getpwnam_r(pc->user_name, &pc->pw, buf, bufsize, &result);
+
+ if (result == NULL) {
+#ifdef DEBUG
+ if (rc == 0) {
+ fprintf(stderr, "DEBUG: No user '%s' found\n", pc->user_name);
+ } else {
+ fprintf(stderr, "DEBUG: Unknown error while retrieving user '%s'\n", pc->user_name);
+ }
+#endif
+
+ free(buf);
+ return -1;
+ }
+
+ pc->uid = pc->pw.pw_uid;
+
+ free(buf);
+ return 0;
+}
+
+int get_user_groups(struct permission_checker *pc) {
+ // First assume we are in 10 groups max
+ pc->ngids = 10;
+ pc->gids = malloc(pc->ngids * sizeof(gid_t));
+
+ if (pc->gids == NULL) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unable to allocate space for gids.");
+#endif
+ return -1;
+ }
+
+ // Try so many times to load group list until it fits into group array.
+ while (getgrouplist(pc->user_name, pc->pw.pw_gid, pc->gids, &pc->ngids) == -1) {
+ // Too many groups, enlarge group array and try again
+ int newngids = pc->ngids + 100;
+ size_t newsize = newngids * sizeof(gid_t);
+
+ if (SIZE_MAX / newngids < sizeof(gid_t)) {
+ // Overflow
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Overflow detected.");
+#endif
+ return -1;
+ }
+
+ gid_t *newgids = realloc(pc->gids, newsize);
+ if (newgids == NULL) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unable to allocate space for gids.");
+#endif
+ free(pc->gids);
+ return -1;
+ }
+
+ pc->gids = newgids;
+ pc->ngids = newngids;
+ }
+
+ return 0;
+}
+
+int is_member_of_group(struct permission_checker *pc, gid_t gid) {
+ int j;
+ for (j = 0; j < pc->ngids; j++)
+ if (pc->gids[j] == gid)
+ return 1;
+ return 0;
+}
+
+int check_acl_uid_matches(uid_t uid, acl_entry_t entry) {
+ int ret = -1;
+ uid_t *acl_uid = acl_get_qualifier(entry);
+ if (acl_uid == NULL) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry");
+#endif
+ return -1;
+ }
+
+ ret = *acl_uid == uid ? 0 : -1;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL user match?: %d <=> %d: %d\n", *acl_uid, uid, ret);
+#endif
+ acl_free(acl_uid);
+ return ret;
+}
+
+int check_acl_gid_matches(gid_t *gids, int ngids, acl_entry_t entry) {
+ int ret = -1;
+ gid_t *acl_gid = acl_get_qualifier(entry);
+ if (acl_gid == NULL) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unable to retrieve user uid from ACL entry");
+#endif
+ return -1;
+ }
+
+ int j;
+ for (j = 0; j < ngids; j++) {
+ if (*acl_gid == gids[j]) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: User is in group %d", *acl_gid);
+#endif
+ ret = 0;
+ break;
+ }
+ }
+
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL group match?: %d <=> ...: %d\n", *acl_gid, ret);
+#endif
+ acl_free(acl_gid);
+ return ret;
+}
+
+int check_acl(struct permission_checker *pc, const int flag) {
+ // By default user has no read perm.
+ int has_read_perm = 0;
+
+ // By default mask tells that there are read perm. However in order to have
+ // read permissions both, has_read_perm and mask_allows_read_access must be 1!
+ int mask_allows_read_access = 1;
+
+ acl_type_t type = ACL_TYPE_ACCESS;
+ acl_t acl = acl_get_file(pc->file_path, type);
+
+ if (acl == NULL)
+ // Unable to retrieve ACL.
+ return -1;
+
+ // Walk through each entry of this ACL.
+ int id;
+ for (id = ACL_FIRST_ENTRY; ; id = ACL_NEXT_ENTRY) {
+ acl_entry_t entry;
+ if (acl_get_entry(acl, id, &entry) != 1)
+ // No more ACL entries.
+ break;
+
+ acl_tag_t tag;
+ if (acl_get_tag_type(entry, &tag) == -1)
+ // Unable to retrieve ACL tag.
+ return -1;
+
+ switch (tag) {
+ case ACL_USER_OBJ:
+ if (flag == GROUP_CHECK)
+ continue;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_USER_OBJ\n");
+#endif
+ // Ignore this ACL entry if user is not owner of file.
+ if (pc->uid != pc->file_stat.st_uid)
+ continue;
+ break;
+ case ACL_USER:
+ if (flag == GROUP_CHECK)
+ continue;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_USER\n");
+#endif
+ // Ignore this ACL entry if uid does not match.
+ if (check_acl_uid_matches(pc->uid, entry) != 0)
+ continue;
+ break;
+ case ACL_GROUP_OBJ:
+ if (flag == USER_CHECK)
+ continue;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_GROUP_OBJ\n");
+#endif
+ // Ignore ACL entry if user is not in group of file.
+ if (!is_member_of_group(pc, pc->file_stat.st_gid))
+ continue;
+ break;
+ case ACL_GROUP:
+ if (flag == USER_CHECK)
+ continue;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_GROUP\n");
+#endif
+ // Ignore ACL entry if user is not in group of entry.
+ if (check_acl_gid_matches(pc->gids, pc->ngids, entry) != 0)
+ continue;
+ break;
+ case ACL_OTHER:
+ if (flag == GROUP_CHECK)
+ continue;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_OTHER\n");
+#endif
+ break;
+ case ACL_MASK:
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL_MASK\n");
+#endif
+ break;
+ default:
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Unknown ACL tag\n");
+#endif
+ return -1;
+ }
+
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Retrieving permset\n");
+#endif
+ acl_permset_t permset;
+ int permission;
+ if (acl_get_permset(entry, &permset) == -1)
+ // Unable to retrieve permset.
+ return -1;
+
+ if ((permission = acl_get_perm(permset, ACL_READ)) == -1)
+ // Unable to retrieve permset value.
+ return -1;
+
+ if (permission == 1 && tag != ACL_MASK) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL says user has permission to read file.\n");
+#endif
+ has_read_perm = 1;
+ } else if (permission == 0 && tag == ACL_MASK) {
+ // Mask says that there are no permissions to read.
+ mask_allows_read_access = 0;
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL mask says no permission to read file.\n");
+#endif
+ }
+ }
+
+ if (has_read_perm && mask_allows_read_access) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL end result: User has permission to read file.\n");
+#endif
+ return 1;
+ }
+
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: ACL end result: User has no permission to read file.\n");
+#endif
+ return 0;
+}
+
+int check_traditional(struct permission_checker *pc, const int flag) {
+ mode_t mode = pc->file_stat.st_mode;
+ uid_t uid = pc->file_stat.st_uid;
+ gid_t gid = pc->file_stat.st_gid;
+
+ if (flag == USER_CHECK && (mode & S_IROTH)) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: Others can read file '%s'\n",
+ pc->file_path);
+#endif
+ return 1;
+
+ } else if (flag == USER_CHECK && (mode & S_IRUSR) && uid == pc->uid) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: User '%s' can read file '%s'\n",
+ pc->user_name, pc->file_path);
+#endif
+ return 1;
+
+ } else if (flag == GROUP_CHECK && (mode & S_IRGRP) && is_member_of_group(pc, gid)) {
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: User's '%s' group can read file '%s'\n",
+ pc->user_name, pc->file_path);
+#endif
+ return 1;
+ }
+
+ return 0;
+}
+
+int permission_to_read(char* user_name, char *file_path) {
+ int rc = -1;
+
+#ifdef DEBUG
+ fprintf(stderr, "DEBUG: User check '%s' for file '%s'\n", user_name, file_path);
+#endif
+ struct permission_checker pc = {
+ .user_name = user_name,
+ .gids = NULL,
+ .ngids = 0,
+ .file_path = file_path,
+ };
+
+ // Gather user's UID.
+ if ((rc = get_user_uid(&pc)) == -1)
+ // Could not retrieve UID.
+ goto cleanup;
+
+ // Gather file owner (user and group).
+ if ((rc = stat_file(&pc)) == -1)
+ // Could not stat file.
+ goto cleanup;
+
+ // Check whether there is an ACL entry which would allow the user
+ // to read the file. Don't check for any groups yet. The issue with
+ // groups is that it can be very slow to retrieve the list of groups
+ // of a specific user when done via a remote LDAP server!
+ if ((rc = check_acl(&pc, USER_CHECK)) == 1)
+ // Yes, has permissions.
+ goto cleanup;
+
+ // Check whether ACLs of file could be retrieved.
+ if (rc == -1) {
+ if (errno != ENOTSUP)
+ // Unknown error.
+ goto cleanup;
+
+ // File system does not support ACLs.
+ // Fallback to traditional permissions.
+ if ((rc = check_traditional(&pc, USER_CHECK)) == 1)
+ // Yes, has traditional permissions.
+ goto cleanup;
+
+ if ((rc = get_user_groups(&pc)) == -1)
+ // Can not retrieve user's groups.
+ goto cleanup;
+
+ rc = check_traditional(&pc, GROUP_CHECK);
+ goto cleanup;
+ }
+
+ if ((rc = get_user_groups(&pc)) == -1)
+ // Can not retrieve use'r groups.
+ goto cleanup;
+
+ // Check whether there is an ACL entry which would allow any of the
+ // user's groups to read the file.
+ rc = check_acl(&pc, GROUP_CHECK);
+
+cleanup:
+#ifdef DEBUG
+ debug_print_checker(&pc);
+#endif
+
+ if (pc.ngids)
+ free(pc.gids);
+
+ return rc;
+}
+
+// vim: set tabstop=8 softtabstop=0 expandtab shiftwidth=4 smarttab
diff --git a/internal/io/fs/permissions/permission_linux.go b/internal/io/fs/permissions/permission_linux.go
new file mode 100644
index 0000000..feae729
--- /dev/null
+++ b/internal/io/fs/permissions/permission_linux.go
@@ -0,0 +1,33 @@
+package permissions
+
+/*
+#include "permission_linux.h"
+#cgo LDFLAGS: -L. -lacl
+*/
+import "C"
+
+import (
+ "errors"
+ "unsafe"
+)
+
+// To check whether user has Linux file system permissions to read a given file.
+func ToRead(user, filePath string) (bool, error) {
+ cUser := C.CString(user)
+ cFilePath := C.CString(filePath)
+
+ defer C.free(unsafe.Pointer(cUser))
+ defer C.free(unsafe.Pointer(cFilePath))
+
+ cOk, err := C.permission_to_read(cUser, cFilePath)
+ if cOk == 1 {
+ return true, nil
+ }
+
+ if err != nil {
+ // err contains errno message
+ return false, err
+ }
+
+ return false, errors.New("User without permission to read file")
+}
diff --git a/internal/io/fs/permissions/permission_linux.h b/internal/io/fs/permissions/permission_linux.h
new file mode 100644
index 0000000..a2c266e
--- /dev/null
+++ b/internal/io/fs/permissions/permission_linux.h
@@ -0,0 +1,60 @@
+#ifndef PERMISSION_LINUX_H
+#define PERMISSION_LINUX_H
+
+#include <acl/libacl.h>
+#include <errno.h>
+#include <grp.h>
+#include <pwd.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/acl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+//#define DEBUG
+#define USER_CHECK 0
+#define GROUP_CHECK 1
+
+struct permission_checker {
+ char *user_name;
+ uid_t uid;
+ gid_t *gids;
+ int ngids;
+ char *file_path;
+ struct stat file_stat;
+ struct passwd pw;
+};
+
+
+#ifdef DEBUG
+// Print out permission_checker struct.
+void debug_print_checker(struct permission_checker *pc);
+#endif
+
+// Stat a given file to retrieve traditional UNIX permissions.
+int stat_file(struct permission_checker *pc);
+
+// Retrieve UID of user.
+int get_user_uid(struct permission_checker *pc);
+
+// Retrieve all groups of the user.
+int get_user_groups(struct permission_checker *pc);
+
+// Check whether user is member of a group or not.
+int is_member_of_group(struct permission_checker *pc, gid_t gid);
+
+// Check whether user can read file according Linux ACLs.
+// As flag use either USER_CHECK or GROUP_CHECK.
+int check_acl(struct permission_checker *pc, const int flag);
+
+// Check whether user has permissions to read file according traditional
+// UNIX permissions. As flag use either USER_CHECK or GROUP_CHECK.
+int check_traditional(struct permission_checker *pc, const int flag);
+
+// Returns 1 if user has permission to read file.
+// Returns <0 on error and returns 0 if no permissions.
+int permission_to_read(char* user, char *file_path);
+
+#endif // PERMISSION_LINUX_H
diff --git a/internal/io/fs/permissions/permission_test.go b/internal/io/fs/permissions/permission_test.go
new file mode 100644
index 0000000..d415ac2
--- /dev/null
+++ b/internal/io/fs/permissions/permission_test.go
@@ -0,0 +1,112 @@
+// +build linux
+
+package permissions
+
+import (
+ "os"
+ "os/exec"
+ "os/user"
+ "strings"
+ "testing"
+)
+
+const (
+ setfacl string = "/usr/bin/setfacl"
+ file string = "/tmp/acltest"
+)
+
+func TestLinuxACL(t *testing.T) {
+ setfacl := "/usr/bin/setfacl"
+ file := "/tmp/acltest"
+
+ // Delete file if it exists.
+ if _, err := os.Stat(file); err == nil {
+ os.Remove(file)
+ }
+
+ f, err := os.Create(file)
+ if err != nil {
+ t.Errorf("%v", err)
+ }
+ defer func() {
+ f.Close()
+ //os.Remove(file)
+ }()
+
+ user, err := user.Current()
+ if err != nil {
+ t.Errorf("Unable to retrieve current user: %v", err)
+ }
+
+ // Test 1: Remove all permissions and perform a permission check
+ cmd := exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, _ := ToRead(user.Username, file); ok {
+ t.Errorf("Didn't expect permissions to read file!")
+ }
+
+ // Test 2: Add read permission to file owner
+ cmd = exec.Command(setfacl, "-b", "-m", "u::r--,g::---,o::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, err := ToRead(user.Username, file); !ok {
+ t.Errorf("Expected permissions to read file: %v", err)
+ }
+
+ // Test 3: Add read permission to file group
+ cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::r--,o::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, err := ToRead(user.Username, file); !ok {
+ t.Errorf("Expected permissions to read file: %v", err)
+ }
+
+ // Test 4: Add read permission to others
+ cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::r--", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+
+ if ok, err := ToRead(user.Username, file); !ok {
+ t.Errorf("Expected permissions to read file: %v", err)
+ }
+
+ // Test 5: Remove read permission from mask
+ cmd = exec.Command(setfacl, "-m", "m::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, _ := ToRead(user.Username, file); ok {
+ t.Errorf("Didn't expect permissions to read file!")
+ }
+ cmd = exec.Command(setfacl, "-m", "m::r--", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+
+ // Test 6: Add read permission to specific group
+ cmd = exec.Command(setfacl, "-b", "-m", "u::---,g:"+user.Username+":r--,o::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, err := ToRead(user.Username, file); !ok {
+ t.Errorf("Expected permissions to read file for user %v: %v", user.Username, err)
+ }
+
+ // Test 7: Remove all permissions but mask
+ cmd = exec.Command(setfacl, "-b", "-m", "u::---,g::---,o::---", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ cmd = exec.Command(setfacl, "-m", "m::r--", file)
+ if err := cmd.Run(); err != nil {
+ t.Errorf("%s -> %v", strings.Join(cmd.Args, " "), err)
+ }
+ if ok, _ := ToRead(user.Username, file); ok {
+ t.Errorf("Didn't expect permissions to read file!")
+ }
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
new file mode 100644
index 0000000..321432e
--- /dev/null
+++ b/internal/io/fs/readfile.go
@@ -0,0 +1,307 @@
+package fs
+
+import (
+ "bufio"
+ "compress/gzip"
+ "context"
+ "errors"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/logger"
+
+ "github.com/DataDog/zstd"
+)
+
+// Used to tail and filter a local log file.
+type readFile struct {
+ // Various statistics (e.g. regex hit percentage, transfer percentage).
+ stats
+ // Path of log file to tail.
+ filePath string
+ // Only consider all log lines matching this regular expression.
+ re *regexp.Regexp
+ // The glob identifier of the file.
+ globID string
+ // Channel to send a server message to the dtail client
+ serverMessages chan<- string
+ // Periodically retry reading file.
+ retry bool
+ // Can I skip messages when there are too many?
+ canSkipLines bool
+ // Seek to the EOF before processing file?
+ seekEOF bool
+ limiter chan struct{}
+}
+
+// FilePath returns the full file path.
+func (f readFile) FilePath() string {
+ return f.filePath
+}
+
+// Retry reading the file on error?
+func (f readFile) Retry() bool {
+ return f.retry
+}
+
+// Start tailing a log file.
+func (f readFile) Start(ctx context.Context, lines chan<- line.Line, regex string) error {
+ defer func() {
+ select {
+ case <-f.limiter:
+ default:
+ }
+ }()
+
+ select {
+ case f.limiter <- struct{}{}:
+ default:
+ select {
+ case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case <-ctx.Done():
+ return nil
+ }
+ f.limiter <- struct{}{}
+ }
+
+ fd, err := os.Open(f.filePath)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ if f.seekEOF {
+ fd.Seek(0, io.SeekEnd)
+ }
+
+ rawLines := make(chan []byte, 100)
+ truncate := make(chan struct{})
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ go f.periodicTruncateCheck(ctx, truncate)
+ go f.filter(ctx, &wg, rawLines, lines, regex)
+
+ err = f.read(ctx, fd, rawLines, truncate)
+ close(rawLines)
+ wg.Wait()
+
+ return err
+}
+
+func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case truncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+ var offset uint64
+
+ reader, err := f.makeReader(fd)
+ if err != nil {
+ return err
+ }
+ rawLine := make([]byte, 0, 512)
+
+ lineLengthThreshold := 1024 * 1024 // 1mb
+ longLineWarning := false
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ }
+
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ logger.Info(f.filePath, "Current offset", offset)
+ default:
+ }
+
+ // Read some bytes (max 4k at once as of go 1.12). isPrefix will
+ // be set if line does not fit into 4k buffer.
+ bytes, isPrefix, err := reader.ReadLine()
+
+ if err != nil {
+ // If EOF, sleep a couple of ms and return with nil error.
+ // If other error, return with non-nil error.
+ if err != io.EOF {
+ return err
+ }
+ if !f.seekEOF {
+ logger.Debug(f.FilePath(), "End of file reached")
+ return nil
+ }
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ rawLine = append(rawLine, bytes...)
+ offset += uint64(len(bytes))
+
+ if !isPrefix {
+ // last LineRead call returned contend until end of line.
+ rawLine = append(rawLine, '\n')
+ select {
+ case rawLines <- rawLine:
+ case <-ctx.Done():
+ return nil
+ }
+ rawLine = make([]byte, 0, 512)
+ if longLineWarning {
+ longLineWarning = false
+ }
+ continue
+ }
+
+ // Last LineRead call could not read content until end of line, buffer
+ // was too small. Determine whether we exceed the max line length we
+ // want dtail to send to the client at once. Possibly split up log line
+ // into multiple log lines.
+ if len(rawLine) >= lineLengthThreshold {
+ if !longLineWarning {
+ f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
+ // Only print out one warning per long log line.
+ longLineWarning = true
+ }
+ rawLine = append(rawLine, '\n')
+ select {
+ case rawLines <- rawLine:
+ case <-ctx.Done():
+ return nil
+ }
+ rawLine = make([]byte, 0, 512)
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, regex string) {
+ defer wg.Done()
+
+ if regex == "" {
+ regex = "."
+ }
+
+ re, err := regexp.Compile(regex)
+ if err != nil {
+ logger.Error(regex, "Can't compile regex, using '.' instead", err)
+ re = regexp.MustCompile(".")
+ }
+ f.re = re
+
+ for {
+ select {
+ case line, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+ if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok {
+ select {
+ case lines <- filteredLine:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+}
+
+func (f readFile) transmittable(lineBytes []byte, length, capacity int) (line.Line, bool) {
+ var read line.Line
+
+ if !f.re.Match(lineBytes) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineTransmitted()
+
+ read = line.Line{
+ Content: lineBytes,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: f.transmittedPerc(),
+ }
+
+ return read, true
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ logger.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
+ }
+
+ return false, nil
+}
diff --git a/internal/io/fs/stats.go b/internal/io/fs/stats.go
new file mode 100644
index 0000000..4121ff7
--- /dev/null
+++ b/internal/io/fs/stats.go
@@ -0,0 +1,69 @@
+package fs
+
+// Used to calculate how many log lines matched the regular expression
+// and how many log files could be transmitted from the server to the client.
+// Hit and transmit percentage takes only the last 100 log lines into calculation.
+type stats struct {
+ pos int
+ lineCount uint64
+ matched [100]bool
+ matchCount uint64
+ transmitted [100]bool
+ transmitCount int
+}
+
+// Return the total line count.
+func (f *stats) totalLineCount() uint64 {
+ return f.lineCount
+}
+
+// Calculate the percentage of log lines transmitted to the client.
+func (f *stats) transmittedPerc() int {
+ return int(percentOf(float64(f.matchCount), float64(f.transmitCount)))
+}
+
+// Update bucket position. We only take into consideration the last 100
+// lines for stats.
+func (f *stats) updatePosition() {
+ f.pos = (f.pos + 1) % 100
+ f.lineCount++
+}
+
+// Increment match counter.
+func (f *stats) updateLineMatched() {
+ if !f.matched[f.pos] {
+ f.matchCount++
+ f.matched[f.pos] = true
+ }
+}
+
+// Increment transmitted counter.
+func (f *stats) updateLineTransmitted() {
+ if !f.transmitted[f.pos] {
+ f.transmitCount++
+ f.transmitted[f.pos] = true
+ }
+}
+
+// Decrement match counter.
+func (f *stats) updateLineNotMatched() {
+ if f.matched[f.pos] {
+ f.matchCount--
+ f.matched[f.pos] = false
+ }
+}
+
+// Decrement transmitted counter.
+func (f *stats) updateLineNotTransmitted() {
+ if f.transmitted[f.pos] {
+ f.transmitCount--
+ f.transmitted[f.pos] = false
+ }
+}
+
+func percentOf(total float64, value float64) float64 {
+ if total == 0 || total == value {
+ return 100
+ }
+ return value / (total / 100.0)
+}
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
new file mode 100644
index 0000000..14994e5
--- /dev/null
+++ b/internal/io/fs/tailfile.go
@@ -0,0 +1,21 @@
+package fs
+
+// TailFile is to tail and filter a log file.
+type TailFile struct {
+ readFile
+}
+
+// NewTailFile returns a new file tailer.
+func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile {
+ return TailFile{
+ readFile: readFile{
+ filePath: filePath,
+ globID: globID,
+ serverMessages: serverMessages,
+ retry: true,
+ canSkipLines: true,
+ seekEOF: true,
+ limiter: limiter,
+ },
+ }
+}
diff --git a/internal/io/line/line.go b/internal/io/line/line.go
new file mode 100644
index 0000000..9db93c0
--- /dev/null
+++ b/internal/io/line/line.go
@@ -0,0 +1,28 @@
+package line
+
+import (
+ "fmt"
+)
+
+// Line represents a read log line.
+type Line struct {
+ // The content of the log line.
+ Content []byte
+ // Until now, how many log lines were processed?
+ Count uint64
+ // Sometimes we produce too many log lines so that the client
+ // is too slow to process all of them. The server will drop log
+ // lines if that happens but it will signal to the client how
+ // many log lines in % could be transmitted to the client.
+ TransmittedPerc int
+ SourceID string
+}
+
+// Return a human readable representation of the followed line.
+func (l Line) String() string {
+ return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)",
+ string(l.Content),
+ l.TransmittedPerc,
+ l.Count,
+ l.SourceID)
+}
diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go
new file mode 100644
index 0000000..e30b907
--- /dev/null
+++ b/internal/io/logger/logger.go
@@ -0,0 +1,445 @@
+package logger
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "os/signal"
+ "runtime"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
+)
+
+const (
+ clientStr string = "CLIENT"
+ serverStr string = "SERVER"
+ infoStr string = "INFO"
+ warnStr string = "WARN"
+ errorStr string = "ERROR"
+ fatalStr string = "FATAL"
+ debugStr string = "DEBUG"
+ traceStr string = "TRACE"
+)
+
+// Synchronise access to logging.
+var mutex sync.Mutex
+
+// File descriptor of log file when logToFile enabled.
+var fd *os.File
+
+// File write buffer of log file when logToFile enabled.
+var writer *bufio.Writer
+
+// File write buffer of stdout when logToStdout enabled.
+var stdoutWriter *bufio.Writer
+
+// Current hostname.
+var hostname string
+
+// Used to detect change of day (create one log file per day0
+var lastDateStr string
+
+// True if log in server mode, false if log in client mode.
+var serverEnable bool
+
+// Used to make logging non-blocking.
+var fileLogBufCh chan buf
+var stdoutBufCh chan string
+
+// Stdout channel, required to pause output
+var pauseCh chan struct{}
+var resumeCh chan struct{}
+
+// Tell the logger about logrotation
+var rotateCh chan os.Signal
+
+// LogMode allows to specify the verbosity of logging.
+type LogMode int
+
+// Possible log modes.
+const (
+ NormalMode LogMode = iota
+ DebugMode LogMode = iota
+ SilentMode LogMode = iota
+ TraceMode LogMode = iota
+ NothingMode LogMode = iota
+)
+
+// Mode is the current log mode in use.
+var Mode LogMode
+
+// LogStrategy allows to specify a log rotation strategy.
+type LogStrategy int
+
+// Possible log strategies.
+const (
+ NormalStrategy LogStrategy = iota
+ DailyStrategy LogStrategy = iota
+ StdoutStrategy LogStrategy = iota
+)
+
+// Strategy is the current log strattegy used.
+var Strategy LogStrategy
+
+// Enables logging to stdout.
+var logToStdout bool
+
+// Enables logging to file.
+var logToFile bool
+
+// Helper type to make logging non-blocking.
+type buf struct {
+ time time.Time
+ message string
+}
+
+// Start logging.
+func Start(ctx context.Context, myServerEnable, debugEnable, silentEnable, nothingEnable bool) {
+ serverEnable = myServerEnable
+
+ mode := logMode(debugEnable, silentEnable, nothingEnable)
+ strategy := logStrategy()
+
+ stdoutWriter = bufio.NewWriter(os.Stdout)
+ Mode = mode
+ Strategy = strategy
+
+ if Mode == NothingMode {
+ return
+ }
+
+ switch Strategy {
+ case DailyStrategy:
+ _, err := os.Stat(config.Common.LogDir)
+ logToFile = !os.IsNotExist(err)
+ logToStdout = !serverEnable || Mode == DebugMode || Mode == TraceMode
+ case StdoutStrategy:
+ fallthrough
+ default:
+ logToFile = !serverEnable
+ logToStdout = true
+ }
+
+ fqdn, err := os.Hostname()
+ if err != nil {
+ panic(err)
+ }
+ s := strings.Split(fqdn, ".")
+ hostname = s[0]
+
+ pauseCh = make(chan struct{})
+ resumeCh = make(chan struct{})
+
+ // Setup logrotation
+ rotateCh = make(chan os.Signal, 1)
+ signal.Notify(rotateCh, syscall.SIGHUP)
+
+ if logToStdout {
+ stdoutBufCh = make(chan string, runtime.NumCPU()*100)
+ go writeToStdout(ctx)
+ }
+
+ if logToFile {
+ fileLogBufCh = make(chan buf, runtime.NumCPU()*100)
+ go writeToFile(ctx)
+ }
+}
+
+func logMode(debugEnable, silentEnable, nothingEnable bool) LogMode {
+ switch {
+ case debugEnable:
+ return DebugMode
+ case nothingEnable:
+ return NothingMode
+ case config.Common.TraceEnable:
+ return TraceMode
+ case config.Common.DebugEnable:
+ return DebugMode
+ case silentEnable:
+ return SilentMode
+ default:
+ }
+ return NormalMode
+}
+
+func logStrategy() LogStrategy {
+ switch config.Common.LogStrategy {
+ case "daily":
+ return DailyStrategy
+ default:
+ }
+ return StdoutStrategy
+}
+
+// Info message logging.
+func Info(args ...interface{}) string {
+ if serverEnable {
+ return log(serverStr, infoStr, args)
+ }
+
+ return log(clientStr, infoStr, args)
+}
+
+// Warn message logging.
+func Warn(args ...interface{}) string {
+ if serverEnable {
+ return log(serverStr, warnStr, args)
+ }
+
+ return log(clientStr, warnStr, args)
+}
+
+// Error message logging.
+func Error(args ...interface{}) string {
+ if serverEnable {
+ return log(serverStr, errorStr, args)
+ }
+
+ return log(clientStr, errorStr, args)
+}
+
+// FatalExit logs an error and exists the process.
+func FatalExit(args ...interface{}) {
+ what := clientStr
+ if serverEnable {
+ what = serverStr
+ }
+ log(what, fatalStr, args)
+
+ time.Sleep(time.Second)
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ closeWriter()
+ os.Exit(3)
+}
+
+// Debug message logging.
+func Debug(args ...interface{}) string {
+ if Mode == DebugMode || Mode == TraceMode {
+ if serverEnable {
+ return log(serverStr, debugStr, args)
+ }
+ return log(clientStr, debugStr, args)
+ }
+
+ return ""
+}
+
+// Trace message logging.
+func Trace(args ...interface{}) string {
+ if Mode == TraceMode {
+ if serverEnable {
+ return log(serverStr, traceStr, args)
+ }
+ return log(clientStr, traceStr, args)
+ }
+
+ return ""
+}
+
+// Write log line to buffer and/or log file.
+func write(what, severity, message string) {
+ if logToStdout && (Mode != SilentMode || severity != warnStr) {
+ line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message)
+
+ if color.Colored {
+ line = color.Colorfy(line)
+ }
+
+ stdoutBufCh <- line
+ }
+
+ if logToFile {
+ t := time.Now()
+ timeStr := t.Format("20060102-150405")
+ fileLogBufCh <- buf{
+ time: t,
+ message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message),
+ }
+ }
+}
+
+// Generig log message.
+func log(what string, severity string, args []interface{}) string {
+ if Mode == NothingMode {
+ return ""
+ }
+
+ var messages []string
+
+ for _, arg := range args {
+ switch v := arg.(type) {
+ case string:
+ messages = append(messages, v)
+ case int:
+ messages = append(messages, fmt.Sprintf("%d", v))
+ case error:
+ messages = append(messages, v.Error())
+ default:
+ messages = append(messages, fmt.Sprintf("%v", v))
+ }
+ }
+
+ message := strings.Join(messages, "|")
+ write(what, severity, message)
+
+ return fmt.Sprintf("%s|%s", severity, message)
+}
+
+// Raw message logging.
+func Raw(message string) {
+ if Mode == NothingMode {
+ return
+ }
+
+ if logToFile {
+ fileLogBufCh <- buf{time.Now(), message}
+ }
+
+ if logToStdout {
+ if color.Colored {
+ message = color.Colorfy(message)
+ }
+ stdoutBufCh <- message
+ }
+}
+
+// Close log writer (e.g. on change of day).
+func closeWriter() {
+ if writer != nil {
+ writer.Flush()
+ fd.Close()
+ }
+}
+
+// Return the correct log file writer
+func fileWriter(dateStr string) *bufio.Writer {
+ if dateStr != lastDateStr {
+ return updateFileWriter(dateStr)
+ }
+
+ // Check for log rotation signal
+ select {
+ case <-rotateCh:
+ stdoutWriter.WriteString("Received signal for logrotation\n")
+ return updateFileWriter(dateStr)
+ default:
+ }
+
+ return writer
+}
+
+// Update log file writer
+func updateFileWriter(dateStr string) *bufio.Writer {
+ // Detected change of day. Close current writer and create a new one.
+ mutex.Lock()
+ defer mutex.Unlock()
+ closeWriter()
+
+ if _, err := os.Stat(config.Common.LogDir); os.IsNotExist(err) {
+ if err = os.MkdirAll(config.Common.LogDir, 0755); err != nil {
+ panic(err)
+ }
+ }
+
+ logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, dateStr)
+ newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ panic(err)
+ }
+
+ fd = newFd
+ writer = bufio.NewWriterSize(fd, 1)
+ lastDateStr = dateStr
+
+ return writer
+}
+
+// Flush all outstanding lines.
+func Flush() {
+ for {
+ select {
+ case message := <-stdoutBufCh:
+ stdoutWriter.WriteString(message)
+ default:
+ stdoutWriter.Flush()
+ return
+ }
+ }
+}
+
+func writeToStdout(ctx context.Context) {
+ for {
+ select {
+ case message := <-stdoutBufCh:
+ stdoutWriter.WriteString(message)
+ case <-time.After(time.Millisecond * 100):
+ stdoutWriter.Flush()
+ case <-pauseCh:
+ PAUSE:
+ for {
+ select {
+ case <-stdoutBufCh:
+ case <-resumeCh:
+ break PAUSE
+ case <-ctx.Done():
+ return
+ }
+ }
+ case <-ctx.Done():
+ Flush()
+ return
+ }
+ }
+}
+
+func writeToFile(ctx context.Context) {
+ for {
+ select {
+ case buf := <-fileLogBufCh:
+ dateStr := buf.time.Format("20060102")
+ w := fileWriter(dateStr)
+ w.WriteString(buf.message)
+ case <-pauseCh:
+ PAUSE:
+ for {
+ select {
+ case <-stdoutBufCh:
+ case <-resumeCh:
+ break PAUSE
+ case <-ctx.Done():
+ return
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// Pause logging.
+func Pause() {
+ if logToStdout {
+ pauseCh <- struct{}{}
+ }
+ if logToFile {
+ pauseCh <- struct{}{}
+ }
+}
+
+// Resume logging (after pausing).
+func Resume() {
+ if logToStdout {
+ resumeCh <- struct{}{}
+ }
+ if logToFile {
+ resumeCh <- struct{}{}
+ }
+}
diff --git a/internal/io/run/run.go b/internal/io/run/run.go
new file mode 100644
index 0000000..b608639
--- /dev/null
+++ b/internal/io/run/run.go
@@ -0,0 +1,104 @@
+package run
+
+import (
+ "bufio"
+ "context"
+ "io"
+ "os/exec"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+// Run is for execute a command.
+type Run struct {
+ commandPath string
+ args []string
+ cmd *exec.Cmd
+}
+
+// New returns a new command runner.
+func New(commandPath string, args []string) Run {
+ return Run{
+ commandPath: commandPath,
+ args: args,
+ }
+}
+
+// Start running the command.
+func (r Run) Start(ctx context.Context, lines chan<- line.Line) (pid int, ec int, err error) {
+ done := make(chan struct{})
+ defer close(done)
+
+ ec = -1
+ pid = -1
+
+ if len(r.args) > 0 {
+ logger.Debug(r.commandPath, strings.Join(r.args, " "))
+ r.cmd = exec.CommandContext(ctx, r.commandPath, strings.Join(r.args, " "))
+ } else {
+ logger.Debug(r.commandPath)
+ r.cmd = exec.CommandContext(ctx, r.commandPath)
+ }
+
+ stdoutPipe, myErr := r.cmd.StdoutPipe()
+ if err != nil {
+ err = myErr
+ return
+ }
+
+ stderrPipe, myErr := r.cmd.StderrPipe()
+ if myErr != nil {
+ err = myErr
+ return
+ }
+
+ if myErr := r.cmd.Start(); err != nil {
+ err = myErr
+ return
+ }
+
+ pid = r.cmd.Process.Pid
+ ec = 0
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go r.pipeToLines(done, &wg, pid, stdoutPipe, "STDOUT", lines)
+ go r.pipeToLines(done, &wg, pid, stderrPipe, "STDERR", lines)
+
+ if err = r.cmd.Wait(); err != nil {
+ if exitError, ok := err.(*exec.ExitError); ok {
+ ec = exitError.ExitCode()
+ }
+ }
+
+ return
+}
+
+func (r Run) pipeToLines(done chan struct{}, wg *sync.WaitGroup, pid int, reader io.Reader, what string, lines chan<- line.Line) {
+ defer wg.Done()
+ bufReader := bufio.NewReader(reader)
+
+ for {
+ lineStr, err := bufReader.ReadString('\n')
+ for err == nil {
+ lines <- line.Line{
+ Content: []byte(lineStr),
+ Count: uint64(pid),
+ TransmittedPerc: 100,
+ SourceID: what,
+ }
+ lineStr, err = bufReader.ReadString('\n')
+ }
+ select {
+ case <-done:
+ return
+ default:
+ }
+ time.Sleep(time.Millisecond * 10)
+ }
+}