main
1package pipeline
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/internetarchive/gowarcprox/internal/models"
13 "github.com/internetarchive/gowarcprox/pkg/config"
14)
15
16// mockProcessor is a test implementation of Processor
17type mockProcessor struct {
18 name string
19 processFunc func(ctx context.Context, ru *models.RecordedURL) error
20 processedURLs []*models.RecordedURL
21 mu sync.Mutex
22}
23
24func (m *mockProcessor) Process(ctx context.Context, ru *models.RecordedURL) error {
25 m.mu.Lock()
26 m.processedURLs = append(m.processedURLs, ru)
27 m.mu.Unlock()
28
29 if m.processFunc != nil {
30 return m.processFunc(ctx, ru)
31 }
32 return nil
33}
34
35func (m *mockProcessor) Name() string {
36 return m.name
37}
38
39func (m *mockProcessor) Count() int {
40 m.mu.Lock()
41 defer m.mu.Unlock()
42 return len(m.processedURLs)
43}
44
45func TestNewPipeline(t *testing.T) {
46 cfg := config.NewDefaultConfig()
47 logger := slog.Default()
48
49 p := NewPipeline(cfg, logger)
50
51 if p == nil {
52 t.Fatal("NewPipeline returned nil")
53 }
54 if p.config != cfg {
55 t.Error("config not set correctly")
56 }
57 if p.logger != logger {
58 t.Error("logger not set correctly")
59 }
60 if p.inputCh == nil {
61 t.Error("inputCh not initialized")
62 }
63 if cap(p.inputCh) != cfg.QueueSize {
64 t.Errorf("inputCh capacity = %d, want %d", cap(p.inputCh), cfg.QueueSize)
65 }
66 if p.ctx == nil {
67 t.Error("ctx not initialized")
68 }
69 if p.cancel == nil {
70 t.Error("cancel not initialized")
71 }
72 if p.stats == nil {
73 t.Error("stats not initialized")
74 }
75 if len(p.processors) != 0 {
76 t.Errorf("processors length = %d, want 0", len(p.processors))
77 }
78}
79
80func TestNewPipeline_NilLogger(t *testing.T) {
81 cfg := config.NewDefaultConfig()
82
83 p := NewPipeline(cfg, nil)
84
85 if p.logger == nil {
86 t.Error("logger should be set to default when nil passed")
87 }
88}
89
90func TestPipeline_AddProcessor(t *testing.T) {
91 cfg := config.NewDefaultConfig()
92 p := NewPipeline(cfg, slog.Default())
93
94 proc1 := &mockProcessor{name: "processor1"}
95 proc2 := &mockProcessor{name: "processor2"}
96
97 p.AddProcessor(proc1)
98 if len(p.processors) != 1 {
99 t.Errorf("after AddProcessor(1), processors length = %d, want 1", len(p.processors))
100 }
101
102 p.AddProcessor(proc2)
103 if len(p.processors) != 2 {
104 t.Errorf("after AddProcessor(2), processors length = %d, want 2", len(p.processors))
105 }
106
107 // Verify order is preserved
108 if p.processors[0].Name() != "processor1" {
109 t.Errorf("processors[0].Name() = %q, want %q", p.processors[0].Name(), "processor1")
110 }
111 if p.processors[1].Name() != "processor2" {
112 t.Errorf("processors[1].Name() = %q, want %q", p.processors[1].Name(), "processor2")
113 }
114}
115
116func TestPipeline_StartStop(t *testing.T) {
117 cfg := config.NewDefaultConfig()
118 cfg.QueueSize = 10
119 p := NewPipeline(cfg, slog.Default())
120
121 // Start pipeline
122 if err := p.Start(); err != nil {
123 t.Fatalf("Start() error = %v, want nil", err)
124 }
125
126 // Give workers time to start
127 time.Sleep(10 * time.Millisecond)
128
129 // Stop pipeline
130 if err := p.Stop(); err != nil {
131 t.Fatalf("Stop() error = %v, want nil", err)
132 }
133
134 // Verify stats are accessible after stop
135 stats := p.GetStats()
136 if stats.URLsQueued < 0 {
137 t.Error("stats should be valid after Stop()")
138 }
139}
140
141func TestPipeline_Enqueue(t *testing.T) {
142 cfg := config.NewDefaultConfig()
143 cfg.QueueSize = 10
144 p := NewPipeline(cfg, slog.Default())
145
146 // Add a mock processor
147 proc := &mockProcessor{name: "test"}
148 p.AddProcessor(proc)
149
150 // Start pipeline
151 if err := p.Start(); err != nil {
152 t.Fatalf("Start() error = %v", err)
153 }
154 defer p.Stop()
155
156 // Enqueue a URL
157 ru := &models.RecordedURL{
158 URL: "http://example.com",
159 Method: "GET",
160 }
161
162 if err := p.Enqueue(ru); err != nil {
163 t.Fatalf("Enqueue() error = %v, want nil", err)
164 }
165
166 // Wait for processing
167 time.Sleep(50 * time.Millisecond)
168
169 // Check stats
170 stats := p.GetStats()
171 if stats.URLsQueued != 1 {
172 t.Errorf("URLsQueued = %d, want 1", stats.URLsQueued)
173 }
174 if stats.URLsProcessed != 1 {
175 t.Errorf("URLsProcessed = %d, want 1", stats.URLsProcessed)
176 }
177 if stats.URLsFailed != 0 {
178 t.Errorf("URLsFailed = %d, want 0", stats.URLsFailed)
179 }
180
181 // Verify processor was called
182 if proc.Count() != 1 {
183 t.Errorf("processor called %d times, want 1", proc.Count())
184 }
185}
186
187func TestPipeline_MultipleProcessors(t *testing.T) {
188 cfg := config.NewDefaultConfig()
189 cfg.QueueSize = 10
190 p := NewPipeline(cfg, slog.Default())
191
192 // Add multiple processors
193 proc1 := &mockProcessor{name: "processor1"}
194 proc2 := &mockProcessor{name: "processor2"}
195 proc3 := &mockProcessor{name: "processor3"}
196
197 p.AddProcessor(proc1)
198 p.AddProcessor(proc2)
199 p.AddProcessor(proc3)
200
201 p.Start()
202 defer p.Stop()
203
204 // Enqueue a URL
205 ru := &models.RecordedURL{
206 URL: "http://example.com",
207 Method: "GET",
208 }
209
210 p.Enqueue(ru)
211 time.Sleep(50 * time.Millisecond)
212
213 // All processors should have processed the URL
214 if proc1.Count() != 1 {
215 t.Errorf("proc1 called %d times, want 1", proc1.Count())
216 }
217 if proc2.Count() != 1 {
218 t.Errorf("proc2 called %d times, want 1", proc2.Count())
219 }
220 if proc3.Count() != 1 {
221 t.Errorf("proc3 called %d times, want 1", proc3.Count())
222 }
223}
224
225func TestPipeline_ProcessorError(t *testing.T) {
226 cfg := config.NewDefaultConfig()
227 cfg.QueueSize = 10
228 p := NewPipeline(cfg, slog.Default())
229
230 // Add a processor that returns an error
231 testError := errors.New("test error")
232 proc := &mockProcessor{
233 name: "failing-processor",
234 processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
235 return testError
236 },
237 }
238 p.AddProcessor(proc)
239
240 p.Start()
241 defer p.Stop()
242
243 // Enqueue a URL
244 ru := &models.RecordedURL{URL: "http://example.com"}
245 p.Enqueue(ru)
246
247 time.Sleep(50 * time.Millisecond)
248
249 // Check stats - should be marked as failed
250 stats := p.GetStats()
251 if stats.URLsProcessed != 0 {
252 t.Errorf("URLsProcessed = %d, want 0", stats.URLsProcessed)
253 }
254 if stats.URLsFailed != 1 {
255 t.Errorf("URLsFailed = %d, want 1", stats.URLsFailed)
256 }
257}
258
259func TestPipeline_ProcessorChain_StopsOnError(t *testing.T) {
260 cfg := config.NewDefaultConfig()
261 cfg.QueueSize = 10
262 p := NewPipeline(cfg, slog.Default())
263
264 // Add processors: 1st succeeds, 2nd fails, 3rd should not be called
265 proc1 := &mockProcessor{name: "proc1"}
266 proc2 := &mockProcessor{
267 name: "proc2",
268 processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
269 return errors.New("proc2 error")
270 },
271 }
272 proc3 := &mockProcessor{name: "proc3"}
273
274 p.AddProcessor(proc1)
275 p.AddProcessor(proc2)
276 p.AddProcessor(proc3)
277
278 p.Start()
279 defer p.Stop()
280
281 ru := &models.RecordedURL{URL: "http://example.com"}
282 p.Enqueue(ru)
283
284 time.Sleep(50 * time.Millisecond)
285
286 // proc1 should have been called
287 if proc1.Count() != 1 {
288 t.Errorf("proc1 called %d times, want 1", proc1.Count())
289 }
290
291 // proc2 should have been called (and failed)
292 if proc2.Count() != 1 {
293 t.Errorf("proc2 called %d times, want 1", proc2.Count())
294 }
295
296 // proc3 should NOT have been called (chain stopped on error)
297 if proc3.Count() != 0 {
298 t.Errorf("proc3 called %d times, want 0 (chain should stop on error)", proc3.Count())
299 }
300}
301
302func TestPipeline_ConcurrentEnqueue(t *testing.T) {
303 cfg := config.NewDefaultConfig()
304 cfg.QueueSize = 100
305 p := NewPipeline(cfg, slog.Default())
306
307 proc := &mockProcessor{name: "test"}
308 p.AddProcessor(proc)
309
310 p.Start()
311 defer p.Stop()
312
313 // Enqueue many URLs concurrently
314 numURLs := 50
315 var wg sync.WaitGroup
316 for i := 0; i < numURLs; i++ {
317 wg.Add(1)
318 go func(id int) {
319 defer wg.Done()
320 ru := &models.RecordedURL{
321 URL: fmt.Sprintf("http://example.com/%d", id),
322 Method: "GET",
323 }
324 if err := p.Enqueue(ru); err != nil {
325 t.Errorf("Enqueue(%d) error = %v", id, err)
326 }
327 }(i)
328 }
329
330 wg.Wait()
331
332 // Wait for all processing to complete
333 time.Sleep(200 * time.Millisecond)
334
335 // Verify all URLs were processed
336 stats := p.GetStats()
337 if stats.URLsQueued != int64(numURLs) {
338 t.Errorf("URLsQueued = %d, want %d", stats.URLsQueued, numURLs)
339 }
340 if stats.URLsProcessed != int64(numURLs) {
341 t.Errorf("URLsProcessed = %d, want %d", stats.URLsProcessed, numURLs)
342 }
343 if proc.Count() != numURLs {
344 t.Errorf("processor called %d times, want %d", proc.Count(), numURLs)
345 }
346}
347
348func TestPipeline_GetStats(t *testing.T) {
349 cfg := config.NewDefaultConfig()
350 p := NewPipeline(cfg, slog.Default())
351
352 // Initial stats should be zero
353 stats := p.GetStats()
354 if stats.URLsQueued != 0 {
355 t.Errorf("initial URLsQueued = %d, want 0", stats.URLsQueued)
356 }
357 if stats.URLsProcessed != 0 {
358 t.Errorf("initial URLsProcessed = %d, want 0", stats.URLsProcessed)
359 }
360 if stats.URLsFailed != 0 {
361 t.Errorf("initial URLsFailed = %d, want 0", stats.URLsFailed)
362 }
363
364 // Modify internal stats
365 p.stats.mu.Lock()
366 p.stats.URLsQueued = 10
367 p.stats.URLsProcessed = 8
368 p.stats.URLsFailed = 2
369 p.stats.mu.Unlock()
370
371 // GetStats should return a copy
372 stats = p.GetStats()
373 if stats.URLsQueued != 10 {
374 t.Errorf("URLsQueued = %d, want 10", stats.URLsQueued)
375 }
376 if stats.URLsProcessed != 8 {
377 t.Errorf("URLsProcessed = %d, want 8", stats.URLsProcessed)
378 }
379 if stats.URLsFailed != 2 {
380 t.Errorf("URLsFailed = %d, want 2", stats.URLsFailed)
381 }
382}
383
384func TestPipeline_QueueLength(t *testing.T) {
385 cfg := config.NewDefaultConfig()
386 cfg.QueueSize = 100
387 p := NewPipeline(cfg, slog.Default())
388
389 // Initially empty
390 if qlen := p.QueueLength(); qlen != 0 {
391 t.Errorf("initial QueueLength() = %d, want 0", qlen)
392 }
393
394 // Add a slow processor to keep items in queue
395 proc := &mockProcessor{
396 name: "slow",
397 processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
398 time.Sleep(100 * time.Millisecond)
399 return nil
400 },
401 }
402 p.AddProcessor(proc)
403
404 p.Start()
405 defer p.Stop()
406
407 // Enqueue multiple URLs quickly
408 for i := 0; i < 10; i++ {
409 p.Enqueue(&models.RecordedURL{
410 URL: fmt.Sprintf("http://example.com/%d", i),
411 })
412 }
413
414 // Queue should have items
415 qlen := p.QueueLength()
416 if qlen == 0 {
417 t.Error("QueueLength() = 0, expected > 0 after enqueueing with slow processor")
418 }
419}
420
421func TestPipeline_ContextCancellation(t *testing.T) {
422 cfg := config.NewDefaultConfig()
423 cfg.QueueSize = 10
424 p := NewPipeline(cfg, slog.Default())
425
426 // Add a simple processor
427 proc := &mockProcessor{name: "test"}
428 p.AddProcessor(proc)
429
430 p.Start()
431
432 // Enqueue a URL
433 p.Enqueue(&models.RecordedURL{URL: "http://example.com"})
434
435 // Stop pipeline (cancels context)
436 p.Stop()
437
438 // Context should have been cancelled
439 if p.ctx.Err() == nil {
440 t.Error("context should be cancelled after Stop()")
441 }
442}