main
Raw Download raw file
  1// Package pipeline provides a processing pipeline for recorded URLs.
  2//
  3// The Pipeline manages a queue of RecordedURL objects and processes them
  4// through a chain of Processor implementations. URLs are enqueued by the
  5// proxy handlers and processed asynchronously by worker goroutines.
  6//
  7// Key features:
  8//   - Buffered input channel for decoupling URL capture from processing
  9//   - Multiple processors executed in sequence (e.g., WARC writer, dedup checker)
 10//   - Statistics tracking for monitoring (queued, processed, failed counts)
 11//   - Graceful shutdown with context cancellation
 12//
 13// The typical flow is:
 14//  1. Create pipeline with NewPipeline
 15//  2. Add processors with AddProcessor
 16//  3. Start workers with Start
 17//  4. Enqueue URLs from proxy handlers
 18//  5. Stop gracefully with Stop when shutting down
 19//
 20// Thread-safety: All public methods are safe for concurrent use.
 21package pipeline
 22
 23import (
 24	"context"
 25	"fmt"
 26	"log/slog"
 27	"sync"
 28
 29	"github.com/internetarchive/gowarcprox/internal/models"
 30	"github.com/internetarchive/gowarcprox/pkg/config"
 31)
 32
 33// Pipeline processes recorded URLs through a chain of processors
 34type Pipeline struct {
 35	config     *config.Config
 36	processors []Processor
 37	inputCh    chan *models.RecordedURL
 38	ctx        context.Context
 39	cancel     context.CancelFunc
 40	wg         sync.WaitGroup
 41	logger     *slog.Logger
 42	stats      *Stats
 43}
 44
 45// Stats tracks pipeline statistics
 46type Stats struct {
 47	mu              sync.Mutex
 48	URLsQueued      int64
 49	URLsProcessed   int64
 50	URLsFailed      int64
 51	CurrentQueueLen int
 52}
 53
 54// NewPipeline creates a new processing pipeline
 55func NewPipeline(cfg *config.Config, logger *slog.Logger) *Pipeline {
 56	if logger == nil {
 57		logger = slog.Default()
 58	}
 59
 60	ctx, cancel := context.WithCancel(context.Background())
 61
 62	return &Pipeline{
 63		config:     cfg,
 64		processors: make([]Processor, 0),
 65		inputCh:    make(chan *models.RecordedURL, cfg.QueueSize),
 66		ctx:        ctx,
 67		cancel:     cancel,
 68		logger:     logger,
 69		stats:      &Stats{},
 70	}
 71}
 72
 73// AddProcessor adds a processor to the pipeline
 74func (p *Pipeline) AddProcessor(processor Processor) {
 75	p.processors = append(p.processors, processor)
 76	p.logger.Info("added processor to pipeline", "processor", processor.Name())
 77}
 78
 79// Start starts the pipeline workers
 80func (p *Pipeline) Start() error {
 81	p.logger.Info("starting pipeline", "queue_size", p.config.QueueSize)
 82
 83	// Start worker goroutines
 84	numWorkers := 1 // Can be made configurable later
 85	for i := 0; i < numWorkers; i++ {
 86		p.wg.Add(1)
 87		go p.worker(i)
 88	}
 89
 90	p.logger.Info("pipeline started", "workers", numWorkers)
 91	return nil
 92}
 93
 94// Stop gracefully stops the pipeline
 95func (p *Pipeline) Stop() error {
 96	p.logger.Info("stopping pipeline")
 97
 98	// Close input channel to signal workers to stop
 99	close(p.inputCh)
100
101	// Cancel context
102	p.cancel()
103
104	// Wait for all workers to finish
105	p.wg.Wait()
106
107	p.logger.Info("pipeline stopped",
108		"urls_processed", p.stats.URLsProcessed,
109		"urls_failed", p.stats.URLsFailed)
110	return nil
111}
112
113// Enqueue adds a recorded URL to the pipeline
114func (p *Pipeline) Enqueue(ru *models.RecordedURL) error {
115	select {
116	case p.inputCh <- ru:
117		p.stats.mu.Lock()
118		p.stats.URLsQueued++
119		p.stats.CurrentQueueLen = len(p.inputCh)
120		p.stats.mu.Unlock()
121		return nil
122	case <-p.ctx.Done():
123		return fmt.Errorf("pipeline is shutting down")
124	}
125}
126
127// worker processes URLs from the input channel
128func (p *Pipeline) worker(id int) {
129	defer p.wg.Done()
130
131	p.logger.Debug("pipeline worker started", "worker_id", id)
132
133	for {
134		select {
135		case ru, ok := <-p.inputCh:
136			if !ok {
137				// Channel closed, worker should exit
138				p.logger.Debug("pipeline worker stopping", "worker_id", id)
139				return
140			}
141
142			// Process through all processors
143			if err := p.processURL(ru); err != nil {
144				p.logger.Error("failed to process URL",
145					"worker_id", id,
146					"url", ru.URL,
147					"error", err)
148
149				p.stats.mu.Lock()
150				p.stats.URLsFailed++
151				p.stats.mu.Unlock()
152			} else {
153				p.stats.mu.Lock()
154				p.stats.URLsProcessed++
155				p.stats.mu.Unlock()
156
157				p.logger.Debug("processed URL",
158					"worker_id", id,
159					"url", ru.URL,
160					"method", ru.Method,
161					"status", ru.StatusCode,
162					"digest", ru.PayloadDigest)
163			}
164
165			// Update queue length
166			p.stats.mu.Lock()
167			p.stats.CurrentQueueLen = len(p.inputCh)
168			p.stats.mu.Unlock()
169
170		case <-p.ctx.Done():
171			p.logger.Debug("pipeline worker cancelled", "worker_id", id)
172			return
173		}
174	}
175}
176
177// processURL processes a URL through all processors
178func (p *Pipeline) processURL(ru *models.RecordedURL) error {
179	for _, processor := range p.processors {
180		if err := processor.Process(p.ctx, ru); err != nil {
181			return fmt.Errorf("processor %s failed: %w", processor.Name(), err)
182		}
183	}
184	return nil
185}
186
187// GetStats returns a copy of the current pipeline statistics
188func (p *Pipeline) GetStats() Stats {
189	p.stats.mu.Lock()
190	defer p.stats.mu.Unlock()
191	return *p.stats
192}
193
194// QueueLength returns the current queue length.
195// Note: This returns the last recorded length, not the real-time channel length,
196// to avoid data races with concurrent channel operations.
197func (p *Pipeline) QueueLength() int {
198	p.stats.mu.Lock()
199	defer p.stats.mu.Unlock()
200	return p.stats.CurrentQueueLen
201}