main
Raw Download raw file
  1// Package workerpool provides a bounded concurrent worker pool.
  2package workerpool
  3
  4import (
  5	"context"
  6	"sync"
  7)
  8
  9// Task represents a unit of work.
 10type Task func(ctx context.Context) error
 11
 12// Result holds the outcome of a task.
 13type Result struct {
 14	Index int
 15	Error error
 16}
 17
 18// Pool executes tasks with bounded concurrency.
 19type Pool struct {
 20	workers int
 21}
 22
 23// New creates a pool with the specified number of workers.
 24func New(workers int) *Pool {
 25	if workers < 1 {
 26		workers = 1
 27	}
 28	return &Pool{workers: workers}
 29}
 30
 31// Run executes all tasks with bounded concurrency.
 32// Returns results in task order (not completion order).
 33// Continues processing remaining tasks even if some fail.
 34func (p *Pool) Run(ctx context.Context, tasks []Task) []Result {
 35	results := make([]Result, len(tasks))
 36
 37	if len(tasks) == 0 {
 38		return results
 39	}
 40
 41	taskCh := make(chan int, len(tasks))
 42	resultCh := make(chan Result, len(tasks))
 43
 44	var wg sync.WaitGroup
 45
 46	// Start workers
 47	for i := 0; i < p.workers; i++ {
 48		wg.Add(1)
 49		go func() {
 50			defer wg.Done()
 51			for idx := range taskCh {
 52				select {
 53				case <-ctx.Done():
 54					resultCh <- Result{Index: idx, Error: ctx.Err()}
 55				default:
 56					err := tasks[idx](ctx)
 57					resultCh <- Result{Index: idx, Error: err}
 58				}
 59			}
 60		}()
 61	}
 62
 63	// Send tasks
 64	for i := range tasks {
 65		taskCh <- i
 66	}
 67	close(taskCh)
 68
 69	// Wait for completion in background
 70	go func() {
 71		wg.Wait()
 72		close(resultCh)
 73	}()
 74
 75	// Collect results
 76	for result := range resultCh {
 77		results[result.Index] = result
 78	}
 79
 80	return results
 81}
 82
 83// RunWithProgress executes tasks and calls progress after each completion.
 84func (p *Pool) RunWithProgress(ctx context.Context, tasks []Task, progress func(completed, total int)) []Result {
 85	results := make([]Result, len(tasks))
 86
 87	if len(tasks) == 0 {
 88		return results
 89	}
 90
 91	taskCh := make(chan int, len(tasks))
 92	resultCh := make(chan Result, len(tasks))
 93
 94	var wg sync.WaitGroup
 95
 96	// Start workers
 97	for i := 0; i < p.workers; i++ {
 98		wg.Add(1)
 99		go func() {
100			defer wg.Done()
101			for idx := range taskCh {
102				select {
103				case <-ctx.Done():
104					resultCh <- Result{Index: idx, Error: ctx.Err()}
105				default:
106					err := tasks[idx](ctx)
107					resultCh <- Result{Index: idx, Error: err}
108				}
109			}
110		}()
111	}
112
113	// Send tasks
114	for i := range tasks {
115		taskCh <- i
116	}
117	close(taskCh)
118
119	// Wait for completion in background
120	go func() {
121		wg.Wait()
122		close(resultCh)
123	}()
124
125	// Collect results with progress
126	completed := 0
127	for result := range resultCh {
128		results[result.Index] = result
129		completed++
130		if progress != nil {
131			progress(completed, len(tasks))
132		}
133	}
134
135	return results
136}