diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2022-03-15 11:18:02 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2022-03-15 11:18:02 +0000 |
| commit | 5b8f5b0835fd41ae9a74d2ebce5e9d522de83cb1 (patch) | |
| tree | d50ab31411a1a13d8fd352aeb670f636cd26b908 /internal/mapr/groupsetresult.go | |
| parent | 45b2e60f7206f90af977e95e44e2529aa2ce8081 (diff) | |
| parent | d9601ca860367ec8ddf6bc5c108c826723a81abd (diff) | |
Merge branch 'append-query-results-to-outfile-when-in-streaming-mode' into 'master'
Add "append" modifier to "outfile" keyword in mapreduce query.
See merge request Storage/dtail!6
Diffstat (limited to 'internal/mapr/groupsetresult.go')
| -rw-r--r-- | internal/mapr/groupsetresult.go | 57 |
1 files changed, 39 insertions, 18 deletions
diff --git a/internal/mapr/groupsetresult.go b/internal/mapr/groupsetresult.go index 6d0ac1f..9c3c134 100644 --- a/internal/mapr/groupsetresult.go +++ b/internal/mapr/groupsetresult.go @@ -159,11 +159,11 @@ func (g *GroupSet) resultWriteFormattedDataEntry(query *Query, sb *strings.Build } func (*GroupSet) writeQueryFile(query *Query) error { - queryFile := fmt.Sprintf("%s.query", query.Outfile) + queryFile := fmt.Sprintf("%s.query", query.Outfile.FilePath) tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile) dlog.Common.Debug("Writing query file", queryFile) - fd, err := os.Create(tmpQueryFile) + fd, err := os.OpenFile(tmpQueryFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { return err } @@ -187,31 +187,49 @@ func (g *GroupSet) WriteResult(query *Query) error { return err } - dlog.Common.Info("Writing outfile", query.Outfile) - tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) + // By default, also write the CSV header. + writeHeader := true - fd, err := os.Create(tmpOutfile) + // In append mode, only write CSV header when file doesn't exist yet or is empty. + if query.Outfile.AppendMode { + if info, err := os.Stat(query.Outfile.FilePath); err == nil && info.Size() > 0 { + writeHeader = false + } + } + + fd, err := g.getOutfileFD(query) if err != nil { return err } defer fd.Close() - return g.resultWriteUnformatted(query, rows, tmpOutfile, fd) + return g.resultWriteUnformatted(query, rows, fd, writeHeader) } -func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, tmpOutfile string, - fd *os.File) error { +func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) { + if !query.Outfile.AppendMode { + dlog.Common.Info("Writing to outfile", query.Outfile.FilePath) + tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath) + return os.OpenFile(tmpOutfile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + } + + dlog.Common.Info("Appending to outfile", query.Outfile.FilePath) + return os.OpenFile(query.Outfile.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) +} - // Generate header now +func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader bool) error { lastColumn := len(query.Select) - 1 - for i, sc := range query.Select { - fd.WriteString(sc.FieldStorage) - if i == lastColumn { - continue + + if writeHeader { + for i, sc := range query.Select { + fd.WriteString(sc.FieldStorage) + if i == lastColumn { + continue + } + fd.WriteString(protocol.CSVDelimiter) } - fd.WriteString(protocol.CSVDelimiter) + fd.WriteString("\n") } - fd.WriteString("\n") // And now write the data for i, r := range rows { @@ -228,9 +246,12 @@ func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, tmpOutfil fd.WriteString("\n") } - if err := os.Rename(tmpOutfile, query.Outfile); err != nil { - os.Remove(tmpOutfile) - return err + if !query.Outfile.AppendMode { + tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath) + if err := os.Rename(tmpOutfile, query.Outfile.FilePath); err != nil { + os.Remove(tmpOutfile) + return err + } } return nil |
