Documentation
¶
Overview ¶
Package pool provides a simple worker pool implementation with a single stage only. It allows submitting tasks to be processed in parallel by a number of workers.
The package supports both stateless and stateful workers through two distinct constructors:
- New - for pools with a single shared worker instance
- NewStateful - for pools where each goroutine gets its own worker instance
Worker Types:
The package provides a simple Worker interface that can be implemented in two ways:
type Worker[T any] interface {
Do(ctx context.Context, v T) error
}
1. Direct implementation for complex stateful workers:
type dbWorker struct {
conn *sql.DB
}
func (w *dbWorker) Do(ctx context.Context, v string) error {
return w.conn.ExecContext(ctx, "INSERT INTO items (value) VALUES (?)", v)
}
2. Function adapter for simple stateless workers:
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
// process the value
return nil
})
Basic Usage:
For stateless operations (like HTTP requests, parsing operations, etc.):
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
resp, err := http.Get(v)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
})
p := pool.New[string](2, worker)
if err := p.Go(context.Background()); err != nil {
return err
}
// submit work
p.Submit("task1")
p.Submit("task2")
if err := p.Close(context.Background()); err != nil {
return err
}
Note: all With* configuration methods are builder-style and must be called before Go(). Calling them after Go() is unsupported.
For stateful operations (like database connections, file handles, etc.):
maker := func() pool.Worker[string] {
return &dbWorker{
conn: openConnection(),
}
}
p := pool.NewStateful[string](2, maker)
Features:
- Generic worker pool implementation supporting any data type
- Configurable number of workers running in parallel
- Support for both stateless shared workers and per-worker instances
- Batching capability for processing multiple items at once
- Customizable work distribution through chunk functions
- Built-in metrics collection including processing times and counts
- Error handling with options to continue or stop on errors
- Context-based cancellation and timeouts
- Optional completion callbacks
Advanced Features:
Batching:
p := New[string](2, worker).WithBatchSize(10)
Chunked distribution:
p := New[string](2, worker).WithChunkFn(func(v string) string {
return v // items with same hash go to same worker
})
Error handling:
p := New[string](2, worker).WithContinueOnError()
Metrics:
The pool automatically tracks standard stats metrics (processed counts, errors, timings). Workers can also record additional custom metrics:
m := metrics.Get(ctx)
m.Inc("custom-counter")
Access metrics:
metrics := p.Metrics()
value := metrics.Get("custom-counter")
Statistical metrics including:
- Number of processed items
- Number of errors
- Number of dropped items
- Processing time
- Wait time
- Initialization time
- Total time
Access stats:
metrics := p.Metrics()
stats := metrics.GetStats()
fmt.Printf("processed: %d, errors: %d", stats.Processed, stats.Errors)
Data Collection:
For collecting results from workers, use the Collector:
collector := pool.NewCollector[Result](ctx, 10)
worker := pool.WorkerFunc[Input](func(ctx context.Context, v Input) error {
result := process(v)
collector.Submit(result)
return nil
})
Results can be retrieved either through iteration:
for v, err := range collector.Iter() {
if err != nil {
return err
}
// use v
}
Or by collecting all at once:
results, err := collector.All()
Middleware Support:
The pool supports middleware pattern similar to HTTP middleware in Go. Middleware can be used to add functionality like retries, timeouts, metrics, or error handling:
// retry middleware
retryMiddleware := func(next Worker[string]) Worker[string] {
return WorkerFunc[string](func(ctx context.Context, v string) error {
var lastErr error
for i := 0; i < 3; i++ {
if err := next.Do(ctx, v); err == nil {
return nil
} else {
lastErr = err
}
time.Sleep(time.Second * time.Duration(i))
}
return fmt.Errorf("failed after 3 attempts: %w", lastErr)
})
}
p := New[string](2, worker).Use(retryMiddleware)
Multiple middleware can be chained, and they execute in the same order as provided:
p.Use(logging, metrics, retry) // executes: logging -> metrics -> retry -> worker
Example (Basic) ¶
// collect output
var out []string
var mu sync.Mutex
worker := WorkerFunc[int](func(_ context.Context, v int) error {
mu.Lock()
out = append(out, fmt.Sprintf("processed: %d", v))
mu.Unlock()
return nil
})
p := New[int](2, worker)
if err := p.Go(context.Background()); err != nil {
panic(err) // handle error, don't panic in real code
}
// submit work
p.Submit(1)
p.Submit(2)
p.Submit(3)
_ = p.Close(context.Background())
// print collected output in sorted order
sort.Strings(out)
for _, s := range out {
fmt.Println(s)
}
Output: processed: 1 processed: 2 processed: 3
Example (ChainedCalculation) ¶
// stage 1: calculate fibonacci numbers in parallel
type FibResult struct {
n int
fib uint64
}
stage1Collector := NewCollector[FibResult](context.Background(), 10)
fibWorker := WorkerFunc[int](func(_ context.Context, n int) error {
var a, b uint64 = 0, 1
for range n {
a, b = b, a+b
}
stage1Collector.Submit(FibResult{n: n, fib: a})
return nil
})
// stage 2: calculate factors for each fibonacci number
type FactorsResult struct {
n uint64
factors []uint64
}
stage2Collector := NewCollector[FactorsResult](context.Background(), 10)
factorsWorker := WorkerFunc[FibResult](func(_ context.Context, res FibResult) error {
if res.fib <= 1 {
stage2Collector.Submit(FactorsResult{n: res.fib, factors: []uint64{res.fib}})
return nil
}
var factors []uint64
n := res.fib
for i := uint64(2); i*i <= n; i++ {
for n%i == 0 {
factors = append(factors, i)
n /= i
}
}
if n > 1 {
factors = append(factors, n)
}
stage2Collector.Submit(FactorsResult{n: res.fib, factors: factors})
return nil
})
// create and start both pools
pool1 := New[int](3, fibWorker)
pool1.Go(context.Background())
pool2 := NewStateful[FibResult](2, func() Worker[FibResult] {
return factorsWorker
})
pool2.Go(context.Background())
// submit numbers to calculate
numbers := []int{5, 7, 10}
for _, n := range numbers {
pool1.Submit(n)
}
// close pools and collectors in order
pool1.Close(context.Background())
stage1Collector.Close()
// process stage 1 results in stage 2
for fibRes, err := range stage1Collector.Iter() {
if err != nil {
fmt.Printf("stage 1 error: %v\n", err)
continue
}
pool2.Submit(fibRes)
}
pool2.Close(context.Background())
stage2Collector.Close()
// collect and sort final results to ensure deterministic output order
results, _ := stage2Collector.All()
sort.Slice(results, func(i, j int) bool {
return results[i].n < results[j].n
})
// print results in sorted order
for _, res := range results {
fmt.Printf("number %d has factors %v\n", res.n, res.factors)
}
Output: number 5 has factors [5] number 13 has factors [13] number 55 has factors [5 11]
Example (FibCalculator) ¶
// FibResult type to store both input and calculated Fibonacci number
type FibResult struct {
n int
fib uint64
}
// create collector for results
collector := NewCollector[FibResult](context.Background(), 10)
// worker calculating fibonacci numbers
worker := WorkerFunc[int](func(_ context.Context, n int) error {
if n <= 0 {
return fmt.Errorf("invalid input: %d", n)
}
// calculate fibonacci number
var a, b uint64 = 0, 1
for range n {
a, b = b, a+b
}
collector.Submit(FibResult{n: n, fib: a})
return nil
})
// create pool with 3 workers
p := New[int](3, worker)
p.Go(context.Background())
// submit numbers to calculate asynchronously
go func() {
numbers := []int{5, 7, 10, 3, 8}
for _, n := range numbers {
p.Submit(n)
}
p.Close(context.Background())
collector.Close()
}()
// collect results and sort them by input number for consistent output
results, _ := collector.All()
sort.Slice(results, func(i, j int) bool {
return results[i].n < results[j].n
})
// print results
for _, res := range results {
fmt.Printf("fib(%d) = %d\n", res.n, res.fib)
}
Output: fib(3) = 2 fib(5) = 5 fib(7) = 13 fib(8) = 21 fib(10) = 55
Example (Middleware) ¶
// create a worker that sometimes fails
worker := WorkerFunc[string](func(_ context.Context, v string) error {
if v == "fail" {
return errors.New("simulated failure")
}
fmt.Printf("processed: %s\n", v)
return nil
})
// create logging middleware
logging := func(next Worker[string]) Worker[string] {
return WorkerFunc[string](func(ctx context.Context, v string) error {
fmt.Printf("starting: %s\n", v)
err := next.Do(ctx, v)
fmt.Printf("completed: %s, err: %v\n", v, err)
return err
})
}
// create retry middleware
retry := func(attempts int) Middleware[string] {
return func(next Worker[string]) Worker[string] {
return WorkerFunc[string](func(ctx context.Context, v string) error {
var lastErr error
for i := range attempts {
var err error
if err = next.Do(ctx, v); err == nil {
return nil
}
lastErr = err
fmt.Printf("attempt %d failed: %v\n", i+1, err)
}
return fmt.Errorf("failed after %d attempts: %w", attempts, lastErr)
})
}
}
// create pool with both middleware - retry first since we want logging to be outermost
p := New[string](1, worker).Use(retry(2), logging)
p.Go(context.Background())
// process items
p.Submit("ok") // should succeed first time
p.Submit("fail") // should fail after retries
p.Close(context.Background())
Output: starting: ok processed: ok completed: ok, err: <nil> starting: fail completed: fail, err: simulated failure attempt 1 failed: simulated failure starting: fail completed: fail, err: simulated failure attempt 2 failed: simulated failure
Example (WithCollector) ¶
type Item struct {
val int
label string
}
// create collector for results with buffer size 10
collector := NewCollector[Item](context.Background(), 10)
// create worker that processes numbers and sends results to collector
worker := WorkerFunc[int](func(_ context.Context, v int) error {
result := Item{
val: v * 2, // double the value
label: "proc", // add label
}
collector.Submit(result)
return nil
})
// create and start pool
p := New[int](2, worker)
p.Go(context.Background())
// submit items asynchronously
go func() {
for i := 1; i <= 3; i++ {
p.Submit(i)
}
p.Close(context.Background())
collector.Close() // close collector after pool is done
}()
// collect results and sort them for deterministic output
results, _ := collector.All()
sort.Slice(results, func(i, j int) bool {
return results[i].val < results[j].val
})
// print sorted results
for _, res := range results {
fmt.Printf("got result: %d (%s)\n", res.val, res.label)
}
Output: got result: 2 (proc) got result: 4 (proc) got result: 6 (proc)
Example (WithCollectorIterator) ¶
collector := NewCollector[string](context.Background(), 5)
worker := WorkerFunc[int](func(_ context.Context, v int) error {
collector.Submit(fmt.Sprintf("value %d", v))
return nil
})
p := New[int](2, worker)
p.Go(context.Background())
// submit items asynchronously
go func() {
for i := 1; i <= 3; i++ {
p.Submit(i)
}
p.Close(context.Background())
collector.Close()
}()
// collect all values first
var values []string
for val, err := range collector.Iter() {
if err != nil {
fmt.Printf("error: %v\n", err)
continue
}
values = append(values, val)
}
// sort and print values for deterministic output
sort.Strings(values)
for _, val := range values {
fmt.Printf("processed: %s\n", val)
}
Output: processed: value 1 processed: value 2 processed: value 3
Example (WithContext) ¶
started := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker := WorkerFunc[int](func(ctx context.Context, v int) error {
close(started) // signal that worker started
<-ctx.Done() // wait for cancellation
return ctx.Err()
})
p := New[int](1, worker).WithBatchSize(0) // disable batching
p.Go(ctx)
p.Submit(1)
<-started // wait for worker to start
cancel() // cancel context
err := p.Close(context.Background())
fmt.Printf("got error: %v\n", err != nil)
Output: got error: true
Example (WithError) ¶
// collect output to ensure deterministic order
var out []string
var mu sync.Mutex
worker := WorkerFunc[int](func(_ context.Context, v int) error {
if v == 0 {
return fmt.Errorf("zero value not allowed")
}
mu.Lock()
out = append(out, fmt.Sprintf("processed: %d", v))
mu.Unlock()
return nil
})
p := New[int](1, worker).WithContinueOnError() // don't stop on errors
p.Go(context.Background())
p.Submit(1)
p.Submit(0) // this will fail but processing continues
p.Submit(2)
err := p.Close(context.Background())
if err != nil {
mu.Lock()
out = append(out, fmt.Sprintf("finished with error: %v", err))
mu.Unlock()
}
// print collected output in sorted order
sort.Strings(out)
for _, s := range out {
fmt.Println(s)
}
Output: finished with error: total errors: 1, last error: worker 0 failed: zero value not allowed processed: 1 processed: 2
Example (WithRouting) ¶
// collect output with sync.Map for thread safety
var out sync.Map
worker := WorkerFunc[int](func(ctx context.Context, v int) error {
out.Store(v, fmt.Sprintf("worker %d got %d", metrics.WorkerID(ctx), v))
return nil
})
// create pool with chunk function that routes based on even/odd
p := New[int](2, worker).WithChunkFn(func(v int) string {
if v%2 == 0 {
return "even"
}
return "odd"
},
)
p.Go(context.Background())
// submit all numbers
for i := 1; i <= 4; i++ {
p.Submit(i)
}
p.Close(context.Background())
// print in order to ensure deterministic output
for i := 1; i <= 4; i++ {
if v, ok := out.Load(i); ok {
fmt.Println(v)
}
}
Output: worker 0 got 1 worker 1 got 2 worker 0 got 3 worker 1 got 4
Example (WorkerTypes) ¶
// these two workers are functionally equivalent:
// 1. Implementing Worker interface explicitly
// 2. Using WorkerFunc adapter - same thing, just shorter
workerFn := WorkerFunc[string](func(_ context.Context, v string) error {
fmt.Printf("processed: %s\n", v)
return nil
})
// run first pool to completion
p1 := New[string](1, &processingWorker{})
p1.Go(context.Background())
p1.Submit("task1")
p1.Close(context.Background())
// then run second pool
p2 := New[string](1, workerFn)
p2.Go(context.Background())
p2.Submit("task2")
p2.Close(context.Background())
Output: processed: task1 processed: task2
Index ¶
- type Collector
- type GroupCompleteFn
- type Middleware
- type Send
- type Worker
- type WorkerCompleteFn
- type WorkerFunc
- type WorkerGroup
- func (p *WorkerGroup[T]) Close(ctx context.Context) error
- func (p *WorkerGroup[T]) Go(ctx context.Context) error
- func (p *WorkerGroup[T]) Metrics() *metrics.Value
- func (p *WorkerGroup[T]) Send(v T)
- func (p *WorkerGroup[T]) Submit(v T)
- func (p *WorkerGroup[T]) Use(middlewares ...Middleware[T]) *WorkerGroup[T]
- func (p *WorkerGroup[T]) Wait(ctx context.Context) error
- func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithChunkFn(fn func(T) string) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithPoolCompleteFn(fn GroupCompleteFn[T]) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithWorkerCompleteFn(fn WorkerCompleteFn[T]) *WorkerGroup[T]
- type WorkerMaker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Collector ¶
type Collector[V any] struct { // contains filtered or unexported fields }
Collector provides synchronous access to async data from pool's response channel
func NewCollector ¶
NewCollector creates a new collector with a given context and buffer size for the channel
type GroupCompleteFn ¶ added in v0.6.0
GroupCompleteFn called once when all workers are done
type Middleware ¶ added in v0.4.0
Middleware wraps worker and adds functionality
type WorkerCompleteFn ¶ added in v0.6.0
WorkerCompleteFn called on worker completion
type WorkerFunc ¶
WorkerFunc is an adapter to allow the use of ordinary functions as Workers.
type WorkerGroup ¶
type WorkerGroup[T any] struct { // contains filtered or unexported fields }
WorkerGroup represents a pool of workers processing items in parallel. Supports both direct item processing and batching modes.
func New ¶
func New[T any](size int, worker Worker[T]) *WorkerGroup[T]
New creates a worker pool with a shared worker instance. All goroutines share the same worker, suitable for stateless processing.
func NewStateful ¶ added in v0.2.0
func NewStateful[T any](size int, maker func() Worker[T]) *WorkerGroup[T]
NewStateful creates a worker pool where each goroutine gets its own worker instance. Suitable for operations requiring state (e.g., database connections).
func (*WorkerGroup[T]) Close ¶
func (p *WorkerGroup[T]) Close(ctx context.Context) error
Close pool. Has to be called by consumer as the indication of "all records submitted". The call is blocking till all processing completed by workers or context is cancelled. After this call pool can't be reused. Returns an error if any happened during the run. Note: Close always closes channels to ensure workers can exit, even if context times out. Workers must respect either the context or channel closure to exit cleanly.
func (*WorkerGroup[T]) Go ¶
func (p *WorkerGroup[T]) Go(ctx context.Context) error
Go activates the pool and starts worker goroutines. Must be called before submitting items.
func (*WorkerGroup[T]) Metrics ¶
func (p *WorkerGroup[T]) Metrics() *metrics.Value
Metrics returns combined metrics from all workers
func (*WorkerGroup[T]) Send ¶ added in v0.6.0
func (p *WorkerGroup[T]) Send(v T)
Send adds an item to the pool for processing. Safe for concurrent use, intended for worker-to-pool submissions or for use by multiple concurrent producers.
func (*WorkerGroup[T]) Submit ¶
func (p *WorkerGroup[T]) Submit(v T)
Submit adds an item to the pool for processing. May block if worker channels are full. Not thread-safe, intended for use by the main thread ot a single producer's thread.
func (*WorkerGroup[T]) Use ¶ added in v0.4.0
func (p *WorkerGroup[T]) Use(middlewares ...Middleware[T]) *WorkerGroup[T]
Use applies middlewares to the worker group's worker. Middlewares are applied in the same order as they are provided, matching the HTTP middleware pattern in Go. The first middleware is the outermost wrapper, and the last middleware is the innermost wrapper closest to the original worker.
func (*WorkerGroup[T]) Wait ¶
func (p *WorkerGroup[T]) Wait(ctx context.Context) error
Wait till workers completed and the result channel closed. Respects context cancellation and timeouts.
func (*WorkerGroup[T]) WithBatchSize ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T]
WithBatchSize enables item batching with specified size. Items are accumulated until batch is full before processing. Set to 0 to disable batching. Negative values are treated as 0. Default: 10
func (*WorkerGroup[T]) WithChunkFn ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithChunkFn(fn func(T) string) *WorkerGroup[T]
WithChunkFn enables predictable item distribution. Items with the same key (returned by fn) are processed by the same worker. Useful for maintaining order within groups of related items. Default: none (random distribution)
func (*WorkerGroup[T]) WithContinueOnError ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T]
WithContinueOnError sets whether the pool should continue on error. Default: false
func (*WorkerGroup[T]) WithPoolCompleteFn ¶ added in v0.6.0
func (p *WorkerGroup[T]) WithPoolCompleteFn(fn GroupCompleteFn[T]) *WorkerGroup[T]
WithPoolCompleteFn sets callback executed once when all workers are done
func (*WorkerGroup[T]) WithWorkerChanSize ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T]
WithWorkerChanSize sets channel buffer size for each worker. Larger sizes can help with bursty workloads but increase memory usage. Default: 1
func (*WorkerGroup[T]) WithWorkerCompleteFn ¶ added in v0.6.0
func (p *WorkerGroup[T]) WithWorkerCompleteFn(fn WorkerCompleteFn[T]) *WorkerGroup[T]
WithWorkerCompleteFn sets callback executed on worker completion. Useful for cleanup or finalization of worker resources. Default: none (disabled)
type WorkerMaker ¶
WorkerMaker is a function that returns a new Worker
Directories
¶
| Path | Synopsis |
|---|---|
|
Package metrics provides a way to collect metrics in a thread-safe way
|
Package metrics provides a way to collect metrics in a thread-safe way |
|
Package middleware provides common middleware implementations for the pool package.
|
Package middleware provides common middleware implementations for the pool package. |