summaryrefslogtreecommitdiff
path: root/internal/storage/db.go
blob: d5005096c3892fc651027e9257786d89e63de5c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package storage

import (
	"bufio"
	"context"
	"database/sql"
	"fmt"
	"io/fs"
	"os"

	"codeberg.org/snonux/goprecords/internal/recordline"
	"codeberg.org/snonux/goprecords/internal/recordsdir"
	_ "modernc.org/sqlite"
)

const schemaSQL = `
CREATE TABLE IF NOT EXISTS record (
	host TEXT NOT NULL,
	uptime_sec INTEGER NOT NULL,
	boot_time INTEGER NOT NULL,
	os TEXT NOT NULL,
	os_kernel_name TEXT NOT NULL,
	os_kernel_major TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_record_host ON record(host);
CREATE INDEX IF NOT EXISTS idx_record_os ON record(os);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major);
CREATE TABLE IF NOT EXISTS excluded_host (
	host TEXT NOT NULL PRIMARY KEY,
	reason TEXT NOT NULL DEFAULT '',
	excluded_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))
);
`

// Record is one uptimed boot row stored in the record table.
type Record struct {
	Host        string
	Uptime      uint64
	BootTime    uint64
	OS          string
	KernelName  string
	KernelMajor string
}

// Open opens a SQLite database at path and verifies connectivity.
func Open(ctx context.Context, path string) (*sql.DB, error) {
	db, err := sql.Open("sqlite", path)
	if err != nil {
		return nil, fmt.Errorf("open sqlite: %w", err)
	}
	if err := db.PingContext(ctx); err != nil {
		db.Close()
		return nil, fmt.Errorf("ping sqlite: %w", err)
	}
	if _, err := db.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil {
		db.Close()
		return nil, fmt.Errorf("pragma foreign_keys: %w", err)
	}
	return db, nil
}

// CreateSchema creates the record table and indexes if they do not exist.
func CreateSchema(ctx context.Context, db *sql.DB) error {
	_, err := db.ExecContext(ctx, schemaSQL)
	return err
}

// ResetRecords deletes all rows from the record table.
func ResetRecords(ctx context.Context, db *sql.DB) error {
	_, err := db.ExecContext(ctx, "DELETE FROM record")
	return err
}

// ImportFromDir imports non-empty .records files from statsDir into the database,
// replacing existing rows. It is equivalent to ImportFromFS with os.DirFS(statsDir).
func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error {
	return ImportFromFS(ctx, db, os.DirFS(statsDir))
}

// ImportFromFS reads non-empty .records files from the root of fsys into the database.
func ImportFromFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
	if err := ResetRecords(ctx, db); err != nil {
		return fmt.Errorf("reset records: %w", err)
	}
	files, err := recordsdir.ListNonEmptyFilesFS(fsys, ".")
	if err != nil {
		return fmt.Errorf("read dir: %w", err)
	}
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("begin transaction: %w", err)
	}
	defer tx.Rollback()
	insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)")
	if err != nil {
		return fmt.Errorf("prepare insert: %w", err)
	}
	defer insert.Close()
	for _, f := range files {
		if err := importFile(ctx, insert, fsys, f.Path, f.Host); err != nil {
			return err
		}
	}
	if err := tx.Commit(); err != nil {
		return fmt.Errorf("commit transaction: %w", err)
	}
	return nil
}

// LoadRecords returns all rows from the record table ordered by host and boot time.
func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) {
	var n int
	if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM record").Scan(&n); err != nil {
		return nil, fmt.Errorf("count records: %w", err)
	}
	rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time")
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()
	out := make([]Record, 0, n)
	for rows.Next() {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}
		var rec Record
		var uptimeSec, bootTime int64
		if err := rows.Scan(&rec.Host, &uptimeSec, &bootTime, &rec.OS, &rec.KernelName, &rec.KernelMajor); err != nil {
			return nil, fmt.Errorf("scan row: %w", err)
		}
		rec.Uptime = uint64(uptimeSec)
		rec.BootTime = uint64(bootTime)
		out = append(out, rec)
	}
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("rows: %w", err)
	}
	return out, nil
}

// ExcludedHost holds an entry from the excluded_host table.
type ExcludedHost struct {
	Host       string
	Reason     string
	ExcludedAt int64
}

// AddExcludedHost inserts or replaces a host in the excluded_host table.
func AddExcludedHost(ctx context.Context, db *sql.DB, host, reason string) error {
	_, err := db.ExecContext(ctx,
		"INSERT OR REPLACE INTO excluded_host (host, reason) VALUES (?, ?)",
		host, reason)
	if err != nil {
		return fmt.Errorf("add excluded host: %w", err)
	}
	return nil
}

// RemoveExcludedHost removes a host from the excluded_host table.
func RemoveExcludedHost(ctx context.Context, db *sql.DB, host string) error {
	_, err := db.ExecContext(ctx, "DELETE FROM excluded_host WHERE host = ?", host)
	if err != nil {
		return fmt.Errorf("remove excluded host: %w", err)
	}
	return nil
}

// LoadExcludedHosts returns all rows from the excluded_host table.
func LoadExcludedHosts(ctx context.Context, db *sql.DB) ([]ExcludedHost, error) {
	rows, err := db.QueryContext(ctx, "SELECT host, reason, excluded_at FROM excluded_host ORDER BY host")
	if err != nil {
		return nil, fmt.Errorf("query excluded hosts: %w", err)
	}
	defer rows.Close()
	var out []ExcludedHost
	for rows.Next() {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}
		var e ExcludedHost
		if err := rows.Scan(&e.Host, &e.Reason, &e.ExcludedAt); err != nil {
			return nil, fmt.Errorf("scan excluded host: %w", err)
		}
		out = append(out, e)
	}
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("rows excluded hosts: %w", err)
	}
	return out, nil
}

// IsExcludedHost reports whether a host is in the excluded_host table.
func IsExcludedHost(ctx context.Context, db *sql.DB, host string) (bool, error) {
	var count int
	err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM excluded_host WHERE host = ?", host).Scan(&count)
	if err != nil {
		return false, fmt.Errorf("check excluded host: %w", err)
	}
	return count > 0, nil
}

func importFile(ctx context.Context, insert *sql.Stmt, fsys fs.FS, relPath, host string) error {
	f, err := fsys.Open(relPath)
	if err != nil {
		return fmt.Errorf("open %s: %w", relPath, err)
	}
	defer f.Close()
	sc := bufio.NewScanner(f)
	for sc.Scan() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		rec, ok := recordline.Parse(sc.Text())
		if !ok {
			continue
		}
		if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil {
			return fmt.Errorf("insert: %w", err)
		}
	}
	if err := sc.Err(); err != nil {
		return fmt.Errorf("scan %s: %w", relPath, err)
	}
	return nil
}