Commit 397da66
Changed files (1)
internal
pipeline
internal/pipeline/pipeline_test.go
@@ -0,0 +1,442 @@
+package pipeline
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/internetarchive/gowarcprox/internal/models"
+ "github.com/internetarchive/gowarcprox/pkg/config"
+)
+
+// mockProcessor is a test implementation of Processor
+type mockProcessor struct {
+ name string
+ processFunc func(ctx context.Context, ru *models.RecordedURL) error
+ processedURLs []*models.RecordedURL
+ mu sync.Mutex
+}
+
+func (m *mockProcessor) Process(ctx context.Context, ru *models.RecordedURL) error {
+ m.mu.Lock()
+ m.processedURLs = append(m.processedURLs, ru)
+ m.mu.Unlock()
+
+ if m.processFunc != nil {
+ return m.processFunc(ctx, ru)
+ }
+ return nil
+}
+
+func (m *mockProcessor) Name() string {
+ return m.name
+}
+
+func (m *mockProcessor) Count() int {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return len(m.processedURLs)
+}
+
+func TestNewPipeline(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ logger := slog.Default()
+
+ p := NewPipeline(cfg, logger)
+
+ if p == nil {
+ t.Fatal("NewPipeline returned nil")
+ }
+ if p.config != cfg {
+ t.Error("config not set correctly")
+ }
+ if p.logger != logger {
+ t.Error("logger not set correctly")
+ }
+ if p.inputCh == nil {
+ t.Error("inputCh not initialized")
+ }
+ if cap(p.inputCh) != cfg.QueueSize {
+ t.Errorf("inputCh capacity = %d, want %d", cap(p.inputCh), cfg.QueueSize)
+ }
+ if p.ctx == nil {
+ t.Error("ctx not initialized")
+ }
+ if p.cancel == nil {
+ t.Error("cancel not initialized")
+ }
+ if p.stats == nil {
+ t.Error("stats not initialized")
+ }
+ if len(p.processors) != 0 {
+ t.Errorf("processors length = %d, want 0", len(p.processors))
+ }
+}
+
+func TestNewPipeline_NilLogger(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+
+ p := NewPipeline(cfg, nil)
+
+ if p.logger == nil {
+ t.Error("logger should be set to default when nil passed")
+ }
+}
+
+func TestPipeline_AddProcessor(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ p := NewPipeline(cfg, slog.Default())
+
+ proc1 := &mockProcessor{name: "processor1"}
+ proc2 := &mockProcessor{name: "processor2"}
+
+ p.AddProcessor(proc1)
+ if len(p.processors) != 1 {
+ t.Errorf("after AddProcessor(1), processors length = %d, want 1", len(p.processors))
+ }
+
+ p.AddProcessor(proc2)
+ if len(p.processors) != 2 {
+ t.Errorf("after AddProcessor(2), processors length = %d, want 2", len(p.processors))
+ }
+
+ // Verify order is preserved
+ if p.processors[0].Name() != "processor1" {
+ t.Errorf("processors[0].Name() = %q, want %q", p.processors[0].Name(), "processor1")
+ }
+ if p.processors[1].Name() != "processor2" {
+ t.Errorf("processors[1].Name() = %q, want %q", p.processors[1].Name(), "processor2")
+ }
+}
+
+func TestPipeline_StartStop(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Start pipeline
+ if err := p.Start(); err != nil {
+ t.Fatalf("Start() error = %v, want nil", err)
+ }
+
+ // Give workers time to start
+ time.Sleep(10 * time.Millisecond)
+
+ // Stop pipeline
+ if err := p.Stop(); err != nil {
+ t.Fatalf("Stop() error = %v, want nil", err)
+ }
+
+ // Verify stats are accessible after stop
+ stats := p.GetStats()
+ if stats.URLsQueued < 0 {
+ t.Error("stats should be valid after Stop()")
+ }
+}
+
+func TestPipeline_Enqueue(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Add a mock processor
+ proc := &mockProcessor{name: "test"}
+ p.AddProcessor(proc)
+
+ // Start pipeline
+ if err := p.Start(); err != nil {
+ t.Fatalf("Start() error = %v", err)
+ }
+ defer p.Stop()
+
+ // Enqueue a URL
+ ru := &models.RecordedURL{
+ URL: "http://example.com",
+ Method: "GET",
+ }
+
+ if err := p.Enqueue(ru); err != nil {
+ t.Fatalf("Enqueue() error = %v, want nil", err)
+ }
+
+ // Wait for processing
+ time.Sleep(50 * time.Millisecond)
+
+ // Check stats
+ stats := p.GetStats()
+ if stats.URLsQueued != 1 {
+ t.Errorf("URLsQueued = %d, want 1", stats.URLsQueued)
+ }
+ if stats.URLsProcessed != 1 {
+ t.Errorf("URLsProcessed = %d, want 1", stats.URLsProcessed)
+ }
+ if stats.URLsFailed != 0 {
+ t.Errorf("URLsFailed = %d, want 0", stats.URLsFailed)
+ }
+
+ // Verify processor was called
+ if proc.Count() != 1 {
+ t.Errorf("processor called %d times, want 1", proc.Count())
+ }
+}
+
+func TestPipeline_MultipleProcessors(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Add multiple processors
+ proc1 := &mockProcessor{name: "processor1"}
+ proc2 := &mockProcessor{name: "processor2"}
+ proc3 := &mockProcessor{name: "processor3"}
+
+ p.AddProcessor(proc1)
+ p.AddProcessor(proc2)
+ p.AddProcessor(proc3)
+
+ p.Start()
+ defer p.Stop()
+
+ // Enqueue a URL
+ ru := &models.RecordedURL{
+ URL: "http://example.com",
+ Method: "GET",
+ }
+
+ p.Enqueue(ru)
+ time.Sleep(50 * time.Millisecond)
+
+ // All processors should have processed the URL
+ if proc1.Count() != 1 {
+ t.Errorf("proc1 called %d times, want 1", proc1.Count())
+ }
+ if proc2.Count() != 1 {
+ t.Errorf("proc2 called %d times, want 1", proc2.Count())
+ }
+ if proc3.Count() != 1 {
+ t.Errorf("proc3 called %d times, want 1", proc3.Count())
+ }
+}
+
+func TestPipeline_ProcessorError(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Add a processor that returns an error
+ testError := errors.New("test error")
+ proc := &mockProcessor{
+ name: "failing-processor",
+ processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
+ return testError
+ },
+ }
+ p.AddProcessor(proc)
+
+ p.Start()
+ defer p.Stop()
+
+ // Enqueue a URL
+ ru := &models.RecordedURL{URL: "http://example.com"}
+ p.Enqueue(ru)
+
+ time.Sleep(50 * time.Millisecond)
+
+ // Check stats - should be marked as failed
+ stats := p.GetStats()
+ if stats.URLsProcessed != 0 {
+ t.Errorf("URLsProcessed = %d, want 0", stats.URLsProcessed)
+ }
+ if stats.URLsFailed != 1 {
+ t.Errorf("URLsFailed = %d, want 1", stats.URLsFailed)
+ }
+}
+
+func TestPipeline_ProcessorChain_StopsOnError(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Add processors: 1st succeeds, 2nd fails, 3rd should not be called
+ proc1 := &mockProcessor{name: "proc1"}
+ proc2 := &mockProcessor{
+ name: "proc2",
+ processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
+ return errors.New("proc2 error")
+ },
+ }
+ proc3 := &mockProcessor{name: "proc3"}
+
+ p.AddProcessor(proc1)
+ p.AddProcessor(proc2)
+ p.AddProcessor(proc3)
+
+ p.Start()
+ defer p.Stop()
+
+ ru := &models.RecordedURL{URL: "http://example.com"}
+ p.Enqueue(ru)
+
+ time.Sleep(50 * time.Millisecond)
+
+ // proc1 should have been called
+ if proc1.Count() != 1 {
+ t.Errorf("proc1 called %d times, want 1", proc1.Count())
+ }
+
+ // proc2 should have been called (and failed)
+ if proc2.Count() != 1 {
+ t.Errorf("proc2 called %d times, want 1", proc2.Count())
+ }
+
+ // proc3 should NOT have been called (chain stopped on error)
+ if proc3.Count() != 0 {
+ t.Errorf("proc3 called %d times, want 0 (chain should stop on error)", proc3.Count())
+ }
+}
+
+func TestPipeline_ConcurrentEnqueue(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 100
+ p := NewPipeline(cfg, slog.Default())
+
+ proc := &mockProcessor{name: "test"}
+ p.AddProcessor(proc)
+
+ p.Start()
+ defer p.Stop()
+
+ // Enqueue many URLs concurrently
+ numURLs := 50
+ var wg sync.WaitGroup
+ for i := 0; i < numURLs; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ ru := &models.RecordedURL{
+ URL: fmt.Sprintf("http://example.com/%d", id),
+ Method: "GET",
+ }
+ if err := p.Enqueue(ru); err != nil {
+ t.Errorf("Enqueue(%d) error = %v", id, err)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Wait for all processing to complete
+ time.Sleep(200 * time.Millisecond)
+
+ // Verify all URLs were processed
+ stats := p.GetStats()
+ if stats.URLsQueued != int64(numURLs) {
+ t.Errorf("URLsQueued = %d, want %d", stats.URLsQueued, numURLs)
+ }
+ if stats.URLsProcessed != int64(numURLs) {
+ t.Errorf("URLsProcessed = %d, want %d", stats.URLsProcessed, numURLs)
+ }
+ if proc.Count() != numURLs {
+ t.Errorf("processor called %d times, want %d", proc.Count(), numURLs)
+ }
+}
+
+func TestPipeline_GetStats(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ p := NewPipeline(cfg, slog.Default())
+
+ // Initial stats should be zero
+ stats := p.GetStats()
+ if stats.URLsQueued != 0 {
+ t.Errorf("initial URLsQueued = %d, want 0", stats.URLsQueued)
+ }
+ if stats.URLsProcessed != 0 {
+ t.Errorf("initial URLsProcessed = %d, want 0", stats.URLsProcessed)
+ }
+ if stats.URLsFailed != 0 {
+ t.Errorf("initial URLsFailed = %d, want 0", stats.URLsFailed)
+ }
+
+ // Modify internal stats
+ p.stats.mu.Lock()
+ p.stats.URLsQueued = 10
+ p.stats.URLsProcessed = 8
+ p.stats.URLsFailed = 2
+ p.stats.mu.Unlock()
+
+ // GetStats should return a copy
+ stats = p.GetStats()
+ if stats.URLsQueued != 10 {
+ t.Errorf("URLsQueued = %d, want 10", stats.URLsQueued)
+ }
+ if stats.URLsProcessed != 8 {
+ t.Errorf("URLsProcessed = %d, want 8", stats.URLsProcessed)
+ }
+ if stats.URLsFailed != 2 {
+ t.Errorf("URLsFailed = %d, want 2", stats.URLsFailed)
+ }
+}
+
+func TestPipeline_QueueLength(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 100
+ p := NewPipeline(cfg, slog.Default())
+
+ // Initially empty
+ if qlen := p.QueueLength(); qlen != 0 {
+ t.Errorf("initial QueueLength() = %d, want 0", qlen)
+ }
+
+ // Add a slow processor to keep items in queue
+ proc := &mockProcessor{
+ name: "slow",
+ processFunc: func(ctx context.Context, ru *models.RecordedURL) error {
+ time.Sleep(100 * time.Millisecond)
+ return nil
+ },
+ }
+ p.AddProcessor(proc)
+
+ p.Start()
+ defer p.Stop()
+
+ // Enqueue multiple URLs quickly
+ for i := 0; i < 10; i++ {
+ p.Enqueue(&models.RecordedURL{
+ URL: fmt.Sprintf("http://example.com/%d", i),
+ })
+ }
+
+ // Queue should have items
+ qlen := p.QueueLength()
+ if qlen == 0 {
+ t.Error("QueueLength() = 0, expected > 0 after enqueueing with slow processor")
+ }
+}
+
+func TestPipeline_ContextCancellation(t *testing.T) {
+ cfg := config.NewDefaultConfig()
+ cfg.QueueSize = 10
+ p := NewPipeline(cfg, slog.Default())
+
+ // Add a simple processor
+ proc := &mockProcessor{name: "test"}
+ p.AddProcessor(proc)
+
+ p.Start()
+
+ // Enqueue a URL
+ p.Enqueue(&models.RecordedURL{URL: "http://example.com"})
+
+ // Stop pipeline (cancels context)
+ p.Stop()
+
+ // Context should have been cancelled
+ if p.ctx.Err() == nil {
+ t.Error("context should be cancelled after Stop()")
+ }
+}