main
1// Package workerpool provides a bounded concurrent worker pool.
2package workerpool
3
4import (
5 "context"
6 "sync"
7)
8
9// Task represents a unit of work.
10type Task func(ctx context.Context) error
11
12// Result holds the outcome of a task.
13type Result struct {
14 Index int
15 Error error
16}
17
18// Pool executes tasks with bounded concurrency.
19type Pool struct {
20 workers int
21}
22
23// New creates a pool with the specified number of workers.
24func New(workers int) *Pool {
25 if workers < 1 {
26 workers = 1
27 }
28 return &Pool{workers: workers}
29}
30
31// Run executes all tasks with bounded concurrency.
32// Returns results in task order (not completion order).
33// Continues processing remaining tasks even if some fail.
34func (p *Pool) Run(ctx context.Context, tasks []Task) []Result {
35 results := make([]Result, len(tasks))
36
37 if len(tasks) == 0 {
38 return results
39 }
40
41 taskCh := make(chan int, len(tasks))
42 resultCh := make(chan Result, len(tasks))
43
44 var wg sync.WaitGroup
45
46 // Start workers
47 for i := 0; i < p.workers; i++ {
48 wg.Add(1)
49 go func() {
50 defer wg.Done()
51 for idx := range taskCh {
52 select {
53 case <-ctx.Done():
54 resultCh <- Result{Index: idx, Error: ctx.Err()}
55 default:
56 err := tasks[idx](ctx)
57 resultCh <- Result{Index: idx, Error: err}
58 }
59 }
60 }()
61 }
62
63 // Send tasks
64 for i := range tasks {
65 taskCh <- i
66 }
67 close(taskCh)
68
69 // Wait for completion in background
70 go func() {
71 wg.Wait()
72 close(resultCh)
73 }()
74
75 // Collect results
76 for result := range resultCh {
77 results[result.Index] = result
78 }
79
80 return results
81}
82
83// RunWithProgress executes tasks and calls progress after each completion.
84func (p *Pool) RunWithProgress(ctx context.Context, tasks []Task, progress func(completed, total int)) []Result {
85 results := make([]Result, len(tasks))
86
87 if len(tasks) == 0 {
88 return results
89 }
90
91 taskCh := make(chan int, len(tasks))
92 resultCh := make(chan Result, len(tasks))
93
94 var wg sync.WaitGroup
95
96 // Start workers
97 for i := 0; i < p.workers; i++ {
98 wg.Add(1)
99 go func() {
100 defer wg.Done()
101 for idx := range taskCh {
102 select {
103 case <-ctx.Done():
104 resultCh <- Result{Index: idx, Error: ctx.Err()}
105 default:
106 err := tasks[idx](ctx)
107 resultCh <- Result{Index: idx, Error: err}
108 }
109 }
110 }()
111 }
112
113 // Send tasks
114 for i := range tasks {
115 taskCh <- i
116 }
117 close(taskCh)
118
119 // Wait for completion in background
120 go func() {
121 wg.Wait()
122 close(resultCh)
123 }()
124
125 // Collect results with progress
126 completed := 0
127 for result := range resultCh {
128 results[result.Index] = result
129 completed++
130 if progress != nil {
131 progress(completed, len(tasks))
132 }
133 }
134
135 return results
136}