summaryrefslogtreecommitdiff
path: root/internal/parquet/writer_test.go
blob: 0bd7d97c7feb764ff658b71f7d75910713e32205 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package parquet

import (
	"io"
	"os"
	"path/filepath"
	"reflect"
	"testing"

	parquetgo "github.com/parquet-go/parquet-go"
)

func TestWriterRoundTripAndFinalize(t *testing.T) {
	dir := t.TempDir()
	writer, err := NewWriter(filepath.Join(dir, "trace"), WriterConfig{}, FileMetadata{
		Hostname:          "test-host",
		StartedAtUnixNano: 1234,
		Mode:              "tui",
	})
	if err != nil {
		t.Fatalf("NewWriter() error = %v", err)
	}

	rows := []Record{
		{
			Seq:         1,
			TimeNS:      10,
			GapNS:       2,
			LatencyNS:   5,
			Comm:        "cat",
			PID:         11,
			TID:         12,
			Syscall:     "read",
			FD:          3,
			Ret:         42,
			Bytes:       42,
			File:        "/tmp/input",
			IsError:     false,
			FilterEpoch: 7,
		},
		{
			Seq:         2,
			TimeNS:      20,
			GapNS:       3,
			LatencyNS:   6,
			Comm:        "cp",
			PID:         21,
			TID:         22,
			Syscall:     "write",
			FD:          4,
			Ret:         -1,
			Bytes:       99,
			File:        "/tmp/output",
			IsError:     true,
			FilterEpoch: 8,
		},
	}

	if err := writer.WriteRows(rows); err != nil {
		t.Fatalf("WriteRows() error = %v", err)
	}
	if _, err := os.Stat(writer.TempPath()); err != nil {
		t.Fatalf("Stat(%q) error = %v, want temp file present", writer.TempPath(), err)
	}
	if _, err := os.Stat(writer.FinalPath()); !os.IsNotExist(err) {
		t.Fatalf("Stat(%q) error = %v, want not-exist before Close", writer.FinalPath(), err)
	}

	if err := writer.Close(); err != nil {
		t.Fatalf("Close() error = %v", err)
	}

	if _, err := os.Stat(writer.TempPath()); !os.IsNotExist(err) {
		t.Fatalf("Stat(%q) error = %v, want temp removed after Close", writer.TempPath(), err)
	}
	if _, err := os.Stat(writer.FinalPath()); err != nil {
		t.Fatalf("Stat(%q) error = %v, want finalized parquet file", writer.FinalPath(), err)
	}

	got := readAllRecords(t, writer.FinalPath())
	if !reflect.DeepEqual(got, rows) {
		t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, rows)
	}
}

func readAllRecords(t *testing.T, path string) []Record {
	t.Helper()

	f, err := os.Open(path)
	if err != nil {
		t.Fatalf("Open(%q) error = %v", path, err)
	}
	defer f.Close()

	reader := parquetgo.NewGenericReader[Record](f)
	defer reader.Close()

	var rows []Record
	buf := make([]Record, 4)
	for {
		n, err := reader.Read(buf)
		if n > 0 {
			rows = append(rows, buf[:n]...)
		}
		if err == nil {
			continue
		}
		if err == io.EOF {
			return rows
		}
		t.Fatalf("Read() error = %v", err)
	}
}