summaryrefslogtreecommitdiff
path: root/internal/gui/queue.go
blob: cbf814c103c472d81a1faa8a1de0609a95c1d8b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package gui

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// WordJob represents a single word processing job
type WordJob struct {
	ID               int
	Word             string
	Translation      string
	AudioFile        string
	AudioFileBack    string // Back audio file (only for bg-bg cards)
	ImageFile        string // Changed from ImageFiles []string to single image
	Status           JobStatus
	Error            error
	StartedAt        time.Time
	CompletedAt      time.Time
	CustomPrompt     string // Custom prompt for image generation
	NeedsTranslation bool   // Whether translation is needed
	CardType         string // Card type: "en-bg" or "bg-bg"
}

// JobStatus represents the current state of a job
type JobStatus int

const (
	StatusQueued JobStatus = iota
	StatusProcessing
	StatusCompleted
	StatusFailed
)

func (s JobStatus) String() string {
	switch s {
	case StatusQueued:
		return "Queued"
	case StatusProcessing:
		return "Processing"
	case StatusCompleted:
		return "Completed"
	case StatusFailed:
		return "Failed"
	default:
		return "Unknown"
	}
}

// WordQueue manages the queue of words to be processed
type WordQueue struct {
	jobs       chan *WordJob
	results    map[int]*WordJob
	processing map[int]*WordJob
	completed  []*WordJob

	nextID int
	mu     sync.RWMutex

	// Callbacks for UI updates
	onStatusUpdate func(job *WordJob)
	onJobComplete  func(job *WordJob)

	ctx    context.Context
	cancel context.CancelFunc
}

// NewWordQueue creates a new word processing queue
func NewWordQueue(ctx context.Context) *WordQueue {
	queueCtx, cancel := context.WithCancel(ctx)

	q := &WordQueue{
		jobs:       make(chan *WordJob, 100),
		results:    make(map[int]*WordJob),
		processing: make(map[int]*WordJob),
		completed:  make([]*WordJob, 0),
		nextID:     1,
		ctx:        queueCtx,
		cancel:     cancel,
	}

	// Don't start a worker - the GUI will pull jobs

	return q
}

// SetCallbacks sets the callback functions for UI updates
func (q *WordQueue) SetCallbacks(onStatusUpdate func(*WordJob), onJobComplete func(*WordJob)) {
	q.mu.Lock()
	defer q.mu.Unlock()
	q.onStatusUpdate = onStatusUpdate
	q.onJobComplete = onJobComplete
}

// AddWord adds a word to the processing queue
func (q *WordQueue) AddWord(word string) *WordJob {
	return q.AddWordWithPrompt(word, "")
}

// AddWordWithPrompt adds a word to the processing queue with a custom prompt
func (q *WordQueue) AddWordWithPrompt(word, customPrompt string) *WordJob {
	q.mu.Lock()
	job := &WordJob{
		ID:           q.nextID,
		Word:         word,
		Status:       StatusQueued,
		CustomPrompt: customPrompt,
	}
	q.nextID++
	q.results[job.ID] = job
	q.mu.Unlock()

	// Try to add to queue
	select {
	case q.jobs <- job:
		q.updateJobStatus(job, StatusQueued)
		return job
	case <-q.ctx.Done():
		job.Status = StatusFailed
		job.Error = fmt.Errorf("queue is shutting down")
		return job
	}
}

// GetJob returns a job by ID
func (q *WordQueue) GetJob(id int) *WordJob {
	q.mu.RLock()
	defer q.mu.RUnlock()
	return q.results[id]
}

// GetQueueStatus returns the current queue statistics
func (q *WordQueue) GetQueueStatus() (queued, processing, completed, failed int) {
	q.mu.RLock()
	defer q.mu.RUnlock()

	// Count based on job statuses for accuracy
	for _, job := range q.results {
		switch job.Status {
		case StatusQueued:
			queued++
		case StatusProcessing:
			processing++
		case StatusCompleted:
			completed++
		case StatusFailed:
			failed++
		}
	}

	return
}

// GetActiveJobs returns all jobs that are currently queued or processing
func (q *WordQueue) GetActiveJobs() []*WordJob {
	q.mu.RLock()
	defer q.mu.RUnlock()

	var jobs []*WordJob

	// Add processing jobs
	for _, job := range q.processing {
		jobs = append(jobs, job)
	}

	// Add queued jobs from channel (non-blocking)
	queuedJobs := make([]*WordJob, 0)
	for {
		select {
		case job := <-q.jobs:
			queuedJobs = append(queuedJobs, job)
		default:
			// Re-add jobs back to queue
			for _, job := range queuedJobs {
				q.jobs <- job
			}
			jobs = append(jobs, queuedJobs...)
			return jobs
		}
	}
}

// GetCompletedJobs returns all completed jobs
func (q *WordQueue) GetCompletedJobs() []*WordJob {
	q.mu.RLock()
	defer q.mu.RUnlock()
	return append([]*WordJob{}, q.completed...)
}

// Stop gracefully shuts down the queue
func (q *WordQueue) Stop() {
	q.cancel()
	close(q.jobs)
}

// CompleteJob marks a job as completed with results
func (q *WordQueue) CompleteJob(jobID int, translation, audioFile, imageFile string) {
	q.mu.Lock()
	defer q.mu.Unlock()

	if job, exists := q.results[jobID]; exists {
		job.Status = StatusCompleted
		job.Translation = translation
		job.AudioFile = audioFile
		job.ImageFile = imageFile
		job.CompletedAt = time.Now()

		delete(q.processing, jobID)
		q.completed = append(q.completed, job)

		if q.onJobComplete != nil {
			q.onJobComplete(job)
		}
	}
}

// FailJob marks a job as failed with an error
func (q *WordQueue) FailJob(jobID int, err error) {
	q.mu.Lock()
	defer q.mu.Unlock()

	if job, exists := q.results[jobID]; exists {
		job.Status = StatusFailed
		job.Error = err
		job.CompletedAt = time.Now()

		delete(q.processing, jobID)

		if q.onJobComplete != nil {
			q.onJobComplete(job)
		}
	}
}

// updateJobStatus updates the status of a job and calls the callback
func (q *WordQueue) updateJobStatus(job *WordJob, status JobStatus) {
	job.Status = status
	if q.onStatusUpdate != nil {
		q.onStatusUpdate(job)
	}
}

// ProcessNextJob should be called by the GUI to process the next job in queue
func (q *WordQueue) ProcessNextJob() *WordJob {
	select {
	case job := <-q.jobs:
		q.mu.Lock()
		q.processing[job.ID] = job
		job.Status = StatusProcessing
		job.StartedAt = time.Now()
		q.mu.Unlock()

		// Call the status update callback
		if q.onStatusUpdate != nil {
			q.onStatusUpdate(job)
		}

		return job

	default:
		return nil
	}
}

// RemoveCompletedJobByWord removes a completed job for a specific word
func (q *WordQueue) RemoveCompletedJobByWord(word string) {
	q.mu.Lock()
	defer q.mu.Unlock()

	// Remove from completed jobs list
	newCompleted := make([]*WordJob, 0, len(q.completed))
	for _, job := range q.completed {
		if job.Word != word {
			newCompleted = append(newCompleted, job)
		}
	}
	q.completed = newCompleted

	// Also remove from results map
	for id, job := range q.results {
		if job.Word == word && job.Status == StatusCompleted {
			delete(q.results, id)
		}
	}
}

// IsWordProcessing checks if a word is currently being processed or queued
func (q *WordQueue) IsWordProcessing(word string) bool {
	q.mu.Lock()
	defer q.mu.Unlock()

	// Check all jobs in results
	for _, job := range q.results {
		if job.Word == word && (job.Status == StatusQueued || job.Status == StatusProcessing) {
			return true
		}
	}

	return false
}