diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/io/dlog/loggers/file.go | 2 | ||||
| -rw-r--r-- | internal/mapr/groupsetresult.go | 57 | ||||
| -rw-r--r-- | internal/mapr/query.go | 27 | ||||
| -rw-r--r-- | internal/mapr/query_test.go | 46 |
4 files changed, 107 insertions, 25 deletions
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 9dce251..6a09353 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -141,7 +141,7 @@ func (f *file) getWriter(name string) *bufio.Writer { } logFile := fmt.Sprintf("%s/%s.log", config.Common.LogDir, name) - newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + newFd, err := os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) if err != nil { panic(err) } diff --git a/internal/mapr/groupsetresult.go b/internal/mapr/groupsetresult.go index 6d0ac1f..9c3c134 100644 --- a/internal/mapr/groupsetresult.go +++ b/internal/mapr/groupsetresult.go @@ -159,11 +159,11 @@ func (g *GroupSet) resultWriteFormattedDataEntry(query *Query, sb *strings.Build } func (*GroupSet) writeQueryFile(query *Query) error { - queryFile := fmt.Sprintf("%s.query", query.Outfile) + queryFile := fmt.Sprintf("%s.query", query.Outfile.FilePath) tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile) dlog.Common.Debug("Writing query file", queryFile) - fd, err := os.Create(tmpQueryFile) + fd, err := os.OpenFile(tmpQueryFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { return err } @@ -187,31 +187,49 @@ func (g *GroupSet) WriteResult(query *Query) error { return err } - dlog.Common.Info("Writing outfile", query.Outfile) - tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) + // By default, also write the CSV header. + writeHeader := true - fd, err := os.Create(tmpOutfile) + // In append mode, only write CSV header when file doesn't exist yet or is empty. + if query.Outfile.AppendMode { + if info, err := os.Stat(query.Outfile.FilePath); err == nil && info.Size() > 0 { + writeHeader = false + } + } + + fd, err := g.getOutfileFD(query) if err != nil { return err } defer fd.Close() - return g.resultWriteUnformatted(query, rows, tmpOutfile, fd) + return g.resultWriteUnformatted(query, rows, fd, writeHeader) } -func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, tmpOutfile string, - fd *os.File) error { +func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) { + if !query.Outfile.AppendMode { + dlog.Common.Info("Writing to outfile", query.Outfile.FilePath) + tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath) + return os.OpenFile(tmpOutfile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + } + + dlog.Common.Info("Appending to outfile", query.Outfile.FilePath) + return os.OpenFile(query.Outfile.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) +} - // Generate header now +func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader bool) error { lastColumn := len(query.Select) - 1 - for i, sc := range query.Select { - fd.WriteString(sc.FieldStorage) - if i == lastColumn { - continue + + if writeHeader { + for i, sc := range query.Select { + fd.WriteString(sc.FieldStorage) + if i == lastColumn { + continue + } + fd.WriteString(protocol.CSVDelimiter) } - fd.WriteString(protocol.CSVDelimiter) + fd.WriteString("\n") } - fd.WriteString("\n") // And now write the data for i, r := range rows { @@ -228,9 +246,12 @@ func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, tmpOutfil fd.WriteString("\n") } - if err := os.Rename(tmpOutfile, query.Outfile); err != nil { - os.Remove(tmpOutfile) - return err + if !query.Outfile.AppendMode { + tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath) + if err := os.Rename(tmpOutfile, query.Outfile.FilePath); err != nil { + os.Remove(tmpOutfile) + return err + } } return nil diff --git a/internal/mapr/query.go b/internal/mapr/query.go index d70675f..4eeb7b6 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -13,6 +13,15 @@ const ( unexpectedEnd string = "Unexpected end of query" ) +type Outfile struct { + FilePath string + AppendMode bool +} + +func (o Outfile) String() string { + return fmt.Sprintf("Outfile(FilePath:%v,AppendMode:%v)", o.FilePath, o.AppendMode) +} + // Query represents a parsed mapr query. type Query struct { Select []selectCondition @@ -25,7 +34,7 @@ type Query struct { GroupKey string Interval time.Duration Limit int - Outfile string + Outfile *Outfile RawQuery string tokens []token LogFormat string @@ -68,7 +77,7 @@ func NewQuery(queryStr string) (*Query, error) { // HasOutfile returns true if query result will be written to a CVS output file. func (q *Query) HasOutfile() bool { - return q.Outfile != "" + return q.Outfile != nil } // Has is a helper to determine whether a query contains a substring @@ -193,10 +202,18 @@ func (q *Query) parseTokens(tokens []token) ([]token, error) { q.Limit = i case "outfile": tokens, found = tokensConsume(tokens[1:]) - if len(found) == 0 { - return tokens, errors.New(invalidQuery + unexpectedEnd) + switch len(found) { + case 1: + q.Outfile = &Outfile{FilePath: found[0].str, AppendMode: false} + case 2: + if found[0].str == "append" { + q.Outfile = &Outfile{FilePath: found[1].str, AppendMode: true} + } else { + return tokens, errors.New(invalidQuery + invalidQuery) + } + default: + return tokens, errors.New(invalidQuery + invalidQuery) } - q.Outfile = found[0].str case "logformat": tokens, found = tokensConsume(tokens[1:]) if len(found) == 0 { diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go index a0913fd..f03ccba 100644 --- a/internal/mapr/query_test.go +++ b/internal/mapr/query_test.go @@ -5,6 +5,48 @@ import ( "time" ) +func TestParseQueryOutfile(t *testing.T) { + queryStr := "select foo from bar outfile \"baz.csv\"" + + q, err := NewQuery(queryStr) + if err != nil { + t.Errorf("Query parse error: %s\n%v: %v", queryStr, q, err) + } + + if q.Outfile == nil { + t.Errorf("Expected non-nil outfile: %s\n%v", queryStr, q) + } + + if q.Outfile.FilePath != "baz.csv" { + t.Errorf("Expected \"baz.csv\" as outfile file path: %s\n%v", queryStr, q) + } + + if q.Outfile.AppendMode { + t.Errorf("Expected append mode of outfile to be false: %s\n%v", queryStr, q) + } +} + +func TestParseQueryOutfileAppend(t *testing.T) { + queryStr := "select foo from bar outfile append \"baz.csv\"" + + q, err := NewQuery(queryStr) + if err != nil { + t.Errorf("Query parse error: %s\n%v: %v", queryStr, q, err) + } + + if q.Outfile == nil { + t.Errorf("Expected non-nil outfile: %s\n%v", queryStr, q) + } + + if q.Outfile.FilePath != "baz.csv" { + t.Errorf("Expected \"baz.csv\" as outfile file path: %s\n%v", queryStr, q) + } + + if !q.Outfile.AppendMode { + t.Errorf("Expected append mode of outfile to be true: %s\n%v", queryStr, q) + } +} + func TestParseQuerySimple(t *testing.T) { errorQueries := []string{ "select", @@ -30,7 +72,9 @@ func TestParseQuerySimple(t *testing.T) { "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + "order by foo limit 23 outfile \"result.csv\"", "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + - "order by foo limit 23 outfile \"result.csv\" " + + "order by foo limit 23 outfile append \"result.csv\"", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile append \"result.csv\" " + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", } |
