main
1// Package pipeline provides a processing pipeline for recorded URLs.
2//
3// The Pipeline manages a queue of RecordedURL objects and processes them
4// through a chain of Processor implementations. URLs are enqueued by the
5// proxy handlers and processed asynchronously by worker goroutines.
6//
7// Key features:
8// - Buffered input channel for decoupling URL capture from processing
9// - Multiple processors executed in sequence (e.g., WARC writer, dedup checker)
10// - Statistics tracking for monitoring (queued, processed, failed counts)
11// - Graceful shutdown with context cancellation
12//
13// The typical flow is:
14// 1. Create pipeline with NewPipeline
15// 2. Add processors with AddProcessor
16// 3. Start workers with Start
17// 4. Enqueue URLs from proxy handlers
18// 5. Stop gracefully with Stop when shutting down
19//
20// Thread-safety: All public methods are safe for concurrent use.
21package pipeline
22
23import (
24 "context"
25 "fmt"
26 "log/slog"
27 "sync"
28
29 "github.com/internetarchive/gowarcprox/internal/models"
30 "github.com/internetarchive/gowarcprox/pkg/config"
31)
32
33// Pipeline processes recorded URLs through a chain of processors
34type Pipeline struct {
35 config *config.Config
36 processors []Processor
37 inputCh chan *models.RecordedURL
38 ctx context.Context
39 cancel context.CancelFunc
40 wg sync.WaitGroup
41 logger *slog.Logger
42 stats *Stats
43}
44
45// Stats tracks pipeline statistics
46type Stats struct {
47 mu sync.Mutex
48 URLsQueued int64
49 URLsProcessed int64
50 URLsFailed int64
51 CurrentQueueLen int
52}
53
54// NewPipeline creates a new processing pipeline
55func NewPipeline(cfg *config.Config, logger *slog.Logger) *Pipeline {
56 if logger == nil {
57 logger = slog.Default()
58 }
59
60 ctx, cancel := context.WithCancel(context.Background())
61
62 return &Pipeline{
63 config: cfg,
64 processors: make([]Processor, 0),
65 inputCh: make(chan *models.RecordedURL, cfg.QueueSize),
66 ctx: ctx,
67 cancel: cancel,
68 logger: logger,
69 stats: &Stats{},
70 }
71}
72
73// AddProcessor adds a processor to the pipeline
74func (p *Pipeline) AddProcessor(processor Processor) {
75 p.processors = append(p.processors, processor)
76 p.logger.Info("added processor to pipeline", "processor", processor.Name())
77}
78
79// Start starts the pipeline workers
80func (p *Pipeline) Start() error {
81 p.logger.Info("starting pipeline", "queue_size", p.config.QueueSize)
82
83 // Start worker goroutines
84 numWorkers := 1 // Can be made configurable later
85 for i := 0; i < numWorkers; i++ {
86 p.wg.Add(1)
87 go p.worker(i)
88 }
89
90 p.logger.Info("pipeline started", "workers", numWorkers)
91 return nil
92}
93
94// Stop gracefully stops the pipeline
95func (p *Pipeline) Stop() error {
96 p.logger.Info("stopping pipeline")
97
98 // Close input channel to signal workers to stop
99 close(p.inputCh)
100
101 // Cancel context
102 p.cancel()
103
104 // Wait for all workers to finish
105 p.wg.Wait()
106
107 p.logger.Info("pipeline stopped",
108 "urls_processed", p.stats.URLsProcessed,
109 "urls_failed", p.stats.URLsFailed)
110 return nil
111}
112
113// Enqueue adds a recorded URL to the pipeline
114func (p *Pipeline) Enqueue(ru *models.RecordedURL) error {
115 select {
116 case p.inputCh <- ru:
117 p.stats.mu.Lock()
118 p.stats.URLsQueued++
119 p.stats.CurrentQueueLen = len(p.inputCh)
120 p.stats.mu.Unlock()
121 return nil
122 case <-p.ctx.Done():
123 return fmt.Errorf("pipeline is shutting down")
124 }
125}
126
127// worker processes URLs from the input channel
128func (p *Pipeline) worker(id int) {
129 defer p.wg.Done()
130
131 p.logger.Debug("pipeline worker started", "worker_id", id)
132
133 for {
134 select {
135 case ru, ok := <-p.inputCh:
136 if !ok {
137 // Channel closed, worker should exit
138 p.logger.Debug("pipeline worker stopping", "worker_id", id)
139 return
140 }
141
142 // Process through all processors
143 if err := p.processURL(ru); err != nil {
144 p.logger.Error("failed to process URL",
145 "worker_id", id,
146 "url", ru.URL,
147 "error", err)
148
149 p.stats.mu.Lock()
150 p.stats.URLsFailed++
151 p.stats.mu.Unlock()
152 } else {
153 p.stats.mu.Lock()
154 p.stats.URLsProcessed++
155 p.stats.mu.Unlock()
156
157 p.logger.Debug("processed URL",
158 "worker_id", id,
159 "url", ru.URL,
160 "method", ru.Method,
161 "status", ru.StatusCode,
162 "digest", ru.PayloadDigest)
163 }
164
165 // Update queue length
166 p.stats.mu.Lock()
167 p.stats.CurrentQueueLen = len(p.inputCh)
168 p.stats.mu.Unlock()
169
170 case <-p.ctx.Done():
171 p.logger.Debug("pipeline worker cancelled", "worker_id", id)
172 return
173 }
174 }
175}
176
177// processURL processes a URL through all processors
178func (p *Pipeline) processURL(ru *models.RecordedURL) error {
179 for _, processor := range p.processors {
180 if err := processor.Process(p.ctx, ru); err != nil {
181 return fmt.Errorf("processor %s failed: %w", processor.Name(), err)
182 }
183 }
184 return nil
185}
186
187// GetStats returns a copy of the current pipeline statistics
188func (p *Pipeline) GetStats() Stats {
189 p.stats.mu.Lock()
190 defer p.stats.mu.Unlock()
191 return *p.stats
192}
193
194// QueueLength returns the current queue length.
195// Note: This returns the last recorded length, not the real-time channel length,
196// to avoid data races with concurrent channel operations.
197func (p *Pipeline) QueueLength() int {
198 p.stats.mu.Lock()
199 defer p.stats.mu.Unlock()
200 return p.stats.CurrentQueueLen
201}