main
Raw Download raw file
  1package pipeline
  2
  3import (
  4	"context"
  5	"errors"
  6	"fmt"
  7	"log/slog"
  8	"sync"
  9	"testing"
 10	"time"
 11
 12	"github.com/internetarchive/gowarcprox/internal/models"
 13	"github.com/internetarchive/gowarcprox/pkg/config"
 14)
 15
 16// mockProcessor is a test implementation of Processor
 17type mockProcessor struct {
 18	name          string
 19	processFunc   func(ctx context.Context, ru *models.RecordedURL) error
 20	processedURLs []*models.RecordedURL
 21	mu            sync.Mutex
 22}
 23
 24func (m *mockProcessor) Process(ctx context.Context, ru *models.RecordedURL) error {
 25	m.mu.Lock()
 26	m.processedURLs = append(m.processedURLs, ru)
 27	m.mu.Unlock()
 28
 29	if m.processFunc != nil {
 30		return m.processFunc(ctx, ru)
 31	}
 32	return nil
 33}
 34
 35func (m *mockProcessor) Name() string {
 36	return m.name
 37}
 38
 39func (m *mockProcessor) Count() int {
 40	m.mu.Lock()
 41	defer m.mu.Unlock()
 42	return len(m.processedURLs)
 43}
 44
 45func TestNewPipeline(t *testing.T) {
 46	cfg := config.NewDefaultConfig()
 47	logger := slog.Default()
 48
 49	p := NewPipeline(cfg, logger)
 50
 51	if p == nil {
 52		t.Fatal("NewPipeline returned nil")
 53	}
 54	if p.config != cfg {
 55		t.Error("config not set correctly")
 56	}
 57	if p.logger != logger {
 58		t.Error("logger not set correctly")
 59	}
 60	if p.inputCh == nil {
 61		t.Error("inputCh not initialized")
 62	}
 63	if cap(p.inputCh) != cfg.QueueSize {
 64		t.Errorf("inputCh capacity = %d, want %d", cap(p.inputCh), cfg.QueueSize)
 65	}
 66	if p.ctx == nil {
 67		t.Error("ctx not initialized")
 68	}
 69	if p.cancel == nil {
 70		t.Error("cancel not initialized")
 71	}
 72	if p.stats == nil {
 73		t.Error("stats not initialized")
 74	}
 75	if len(p.processors) != 0 {
 76		t.Errorf("processors length = %d, want 0", len(p.processors))
 77	}
 78}
 79
 80func TestNewPipeline_NilLogger(t *testing.T) {
 81	cfg := config.NewDefaultConfig()
 82
 83	p := NewPipeline(cfg, nil)
 84
 85	if p.logger == nil {
 86		t.Error("logger should be set to default when nil passed")
 87	}
 88}
 89
 90func TestPipeline_AddProcessor(t *testing.T) {
 91	cfg := config.NewDefaultConfig()
 92	p := NewPipeline(cfg, slog.Default())
 93
 94	proc1 := &mockProcessor{name: "processor1"}
 95	proc2 := &mockProcessor{name: "processor2"}
 96
 97	p.AddProcessor(proc1)
 98	if len(p.processors) != 1 {
 99		t.Errorf("after AddProcessor(1), processors length = %d, want 1", len(p.processors))
100	}
101
102	p.AddProcessor(proc2)
103	if len(p.processors) != 2 {
104		t.Errorf("after AddProcessor(2), processors length = %d, want 2", len(p.processors))
105	}
106
107	// Verify order is preserved
108	if p.processors[0].Name() != "processor1" {
109		t.Errorf("processors[0].Name() = %q, want %q", p.processors[0].Name(), "processor1")
110	}
111	if p.processors[1].Name() != "processor2" {
112		t.Errorf("processors[1].Name() = %q, want %q", p.processors[1].Name(), "processor2")
113	}
114}
115
116func TestPipeline_StartStop(t *testing.T) {
117	cfg := config.NewDefaultConfig()
118	cfg.QueueSize = 10
119	p := NewPipeline(cfg, slog.Default())
120
121	// Start pipeline
122	if err := p.Start(); err != nil {
123		t.Fatalf("Start() error = %v, want nil", err)
124	}
125
126	// Give workers time to start
127	time.Sleep(10 * time.Millisecond)
128
129	// Stop pipeline
130	if err := p.Stop(); err != nil {
131		t.Fatalf("Stop() error = %v, want nil", err)
132	}
133
134	// Verify stats are accessible after stop
135	stats := p.GetStats()
136	if stats.URLsQueued < 0 {
137		t.Error("stats should be valid after Stop()")
138	}
139}
140
141func TestPipeline_Enqueue(t *testing.T) {
142	cfg := config.NewDefaultConfig()
143	cfg.QueueSize = 10
144	p := NewPipeline(cfg, slog.Default())
145
146	// Add a mock processor
147	proc := &mockProcessor{name: "test"}
148	p.AddProcessor(proc)
149
150	// Start pipeline
151	if err := p.Start(); err != nil {
152		t.Fatalf("Start() error = %v", err)
153	}
154	defer p.Stop()
155
156	// Enqueue a URL
157	ru := &models.RecordedURL{
158		URL:    "http://example.com",
159		Method: "GET",
160	}
161
162	if err := p.Enqueue(ru); err != nil {
163		t.Fatalf("Enqueue() error = %v, want nil", err)
164	}
165
166	// Wait for processing
167	time.Sleep(50 * time.Millisecond)
168
169	// Check stats
170	stats := p.GetStats()
171	if stats.URLsQueued != 1 {
172		t.Errorf("URLsQueued = %d, want 1", stats.URLsQueued)
173	}
174	if stats.URLsProcessed != 1 {
175		t.Errorf("URLsProcessed = %d, want 1", stats.URLsProcessed)
176	}
177	if stats.URLsFailed != 0 {
178		t.Errorf("URLsFailed = %d, want 0", stats.URLsFailed)
179	}
180
181	// Verify processor was called
182	if proc.Count() != 1 {
183		t.Errorf("processor called %d times, want 1", proc.Count())
184	}
185}
186
187func TestPipeline_MultipleProcessors(t *testing.T) {
188	cfg := config.NewDefaultConfig()
189	cfg.QueueSize = 10
190	p := NewPipeline(cfg, slog.Default())
191
192	// Add multiple processors
193	proc1 := &mockProcessor{name: "processor1"}
194	proc2 := &mockProcessor{name: "processor2"}
195	proc3 := &mockProcessor{name: "processor3"}
196
197	p.AddProcessor(proc1)
198	p.AddProcessor(proc2)
199	p.AddProcessor(proc3)
200
201	p.Start()
202	defer p.Stop()
203
204	// Enqueue a URL
205	ru := &models.RecordedURL{
206		URL:    "http://example.com",
207		Method: "GET",
208	}
209
210	p.Enqueue(ru)
211	time.Sleep(50 * time.Millisecond)
212
213	// All processors should have processed the URL
214	if proc1.Count() != 1 {
215		t.Errorf("proc1 called %d times, want 1", proc1.Count())
216	}
217	if proc2.Count() != 1 {
218		t.Errorf("proc2 called %d times, want 1", proc2.Count())
219	}
220	if proc3.Count() != 1 {
221		t.Errorf("proc3 called %d times, want 1", proc3.Count())
222	}
223}
224
225func TestPipeline_ProcessorError(t *testing.T) {
226	cfg := config.NewDefaultConfig()
227	cfg.QueueSize = 10
228	p := NewPipeline(cfg, slog.Default())
229
230	// Add a processor that returns an error
231	testError := errors.New("test error")
232	proc := &mockProcessor{
233		name: "failing-processor",
234		processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
235			return testError
236		},
237	}
238	p.AddProcessor(proc)
239
240	p.Start()
241	defer p.Stop()
242
243	// Enqueue a URL
244	ru := &models.RecordedURL{URL: "http://example.com"}
245	p.Enqueue(ru)
246
247	time.Sleep(50 * time.Millisecond)
248
249	// Check stats - should be marked as failed
250	stats := p.GetStats()
251	if stats.URLsProcessed != 0 {
252		t.Errorf("URLsProcessed = %d, want 0", stats.URLsProcessed)
253	}
254	if stats.URLsFailed != 1 {
255		t.Errorf("URLsFailed = %d, want 1", stats.URLsFailed)
256	}
257}
258
259func TestPipeline_ProcessorChain_StopsOnError(t *testing.T) {
260	cfg := config.NewDefaultConfig()
261	cfg.QueueSize = 10
262	p := NewPipeline(cfg, slog.Default())
263
264	// Add processors: 1st succeeds, 2nd fails, 3rd should not be called
265	proc1 := &mockProcessor{name: "proc1"}
266	proc2 := &mockProcessor{
267		name: "proc2",
268		processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
269			return errors.New("proc2 error")
270		},
271	}
272	proc3 := &mockProcessor{name: "proc3"}
273
274	p.AddProcessor(proc1)
275	p.AddProcessor(proc2)
276	p.AddProcessor(proc3)
277
278	p.Start()
279	defer p.Stop()
280
281	ru := &models.RecordedURL{URL: "http://example.com"}
282	p.Enqueue(ru)
283
284	time.Sleep(50 * time.Millisecond)
285
286	// proc1 should have been called
287	if proc1.Count() != 1 {
288		t.Errorf("proc1 called %d times, want 1", proc1.Count())
289	}
290
291	// proc2 should have been called (and failed)
292	if proc2.Count() != 1 {
293		t.Errorf("proc2 called %d times, want 1", proc2.Count())
294	}
295
296	// proc3 should NOT have been called (chain stopped on error)
297	if proc3.Count() != 0 {
298		t.Errorf("proc3 called %d times, want 0 (chain should stop on error)", proc3.Count())
299	}
300}
301
302func TestPipeline_ConcurrentEnqueue(t *testing.T) {
303	cfg := config.NewDefaultConfig()
304	cfg.QueueSize = 100
305	p := NewPipeline(cfg, slog.Default())
306
307	proc := &mockProcessor{name: "test"}
308	p.AddProcessor(proc)
309
310	p.Start()
311	defer p.Stop()
312
313	// Enqueue many URLs concurrently
314	numURLs := 50
315	var wg sync.WaitGroup
316	for i := 0; i < numURLs; i++ {
317		wg.Add(1)
318		go func(id int) {
319			defer wg.Done()
320			ru := &models.RecordedURL{
321				URL:    fmt.Sprintf("http://example.com/%d", id),
322				Method: "GET",
323			}
324			if err := p.Enqueue(ru); err != nil {
325				t.Errorf("Enqueue(%d) error = %v", id, err)
326			}
327		}(i)
328	}
329
330	wg.Wait()
331
332	// Wait for all processing to complete
333	time.Sleep(200 * time.Millisecond)
334
335	// Verify all URLs were processed
336	stats := p.GetStats()
337	if stats.URLsQueued != int64(numURLs) {
338		t.Errorf("URLsQueued = %d, want %d", stats.URLsQueued, numURLs)
339	}
340	if stats.URLsProcessed != int64(numURLs) {
341		t.Errorf("URLsProcessed = %d, want %d", stats.URLsProcessed, numURLs)
342	}
343	if proc.Count() != numURLs {
344		t.Errorf("processor called %d times, want %d", proc.Count(), numURLs)
345	}
346}
347
348func TestPipeline_GetStats(t *testing.T) {
349	cfg := config.NewDefaultConfig()
350	p := NewPipeline(cfg, slog.Default())
351
352	// Initial stats should be zero
353	stats := p.GetStats()
354	if stats.URLsQueued != 0 {
355		t.Errorf("initial URLsQueued = %d, want 0", stats.URLsQueued)
356	}
357	if stats.URLsProcessed != 0 {
358		t.Errorf("initial URLsProcessed = %d, want 0", stats.URLsProcessed)
359	}
360	if stats.URLsFailed != 0 {
361		t.Errorf("initial URLsFailed = %d, want 0", stats.URLsFailed)
362	}
363
364	// Modify internal stats
365	p.stats.mu.Lock()
366	p.stats.URLsQueued = 10
367	p.stats.URLsProcessed = 8
368	p.stats.URLsFailed = 2
369	p.stats.mu.Unlock()
370
371	// GetStats should return a copy
372	stats = p.GetStats()
373	if stats.URLsQueued != 10 {
374		t.Errorf("URLsQueued = %d, want 10", stats.URLsQueued)
375	}
376	if stats.URLsProcessed != 8 {
377		t.Errorf("URLsProcessed = %d, want 8", stats.URLsProcessed)
378	}
379	if stats.URLsFailed != 2 {
380		t.Errorf("URLsFailed = %d, want 2", stats.URLsFailed)
381	}
382}
383
384func TestPipeline_QueueLength(t *testing.T) {
385	cfg := config.NewDefaultConfig()
386	cfg.QueueSize = 100
387	p := NewPipeline(cfg, slog.Default())
388
389	// Initially empty
390	if qlen := p.QueueLength(); qlen != 0 {
391		t.Errorf("initial QueueLength() = %d, want 0", qlen)
392	}
393
394	// Add a slow processor to keep items in queue
395	proc := &mockProcessor{
396		name: "slow",
397		processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
398			time.Sleep(100 * time.Millisecond)
399			return nil
400		},
401	}
402	p.AddProcessor(proc)
403
404	p.Start()
405	defer p.Stop()
406
407	// Enqueue multiple URLs quickly
408	for i := 0; i < 10; i++ {
409		p.Enqueue(&models.RecordedURL{
410			URL: fmt.Sprintf("http://example.com/%d", i),
411		})
412	}
413
414	// Queue should have items
415	qlen := p.QueueLength()
416	if qlen == 0 {
417		t.Error("QueueLength() = 0, expected > 0 after enqueueing with slow processor")
418	}
419}
420
421func TestPipeline_ContextCancellation(t *testing.T) {
422	cfg := config.NewDefaultConfig()
423	cfg.QueueSize = 10
424	p := NewPipeline(cfg, slog.Default())
425
426	// Add a simple processor
427	proc := &mockProcessor{name: "test"}
428	p.AddProcessor(proc)
429
430	p.Start()
431
432	// Enqueue a URL
433	p.Enqueue(&models.RecordedURL{URL: "http://example.com"})
434
435	// Stop pipeline (cancels context)
436	p.Stop()
437
438	// Context should have been cancelled
439	if p.ctx.Err() == nil {
440		t.Error("context should be cancelled after Stop()")
441	}
442}