task

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package task provides small, standard-library flavored background task primitives.

Design highlights

  • Manager: holds tasks, starts schedulers, and coordinates graceful shutdown.
  • Trigger task: runs on demand via Handle.Trigger/TryTrigger/TriggerAndWait.
  • Every task: runs periodically, with either fixed-delay or fixed-rate scheduling.
  • Overlap policy: Skip or Merge (bounded; no unbounded queue).
  • Panic/error reporting: uses safego-style handlers and tags; by default reports to stderr.

Lifecycle

Manager must be started explicitly:

m := task.NewManager()
h, _ := m.Add(task.Every(10*time.Second, work), task.WithName("refresh"))
_ = m.Start(ctx)
defer m.Shutdown(context.Background())

Start is not idempotent: calling Start more than once returns ErrAlreadyStarted.

Shutdown is safe to call even if Start was never called; it marks all registered tasks as stopped for observability.

Shutdown cancels the manager context and waits for all internal goroutines (schedulers + runs) to exit. During/after Shutdown:

  • Add returns ErrClosed
  • Trigger/TryTrigger/TriggerAndWait are no-ops (TryTrigger=false; TriggerAndWait=ErrClosed)

Triggering before Start is also a no-op (TryTrigger=false; TriggerAndWait=ErrNotRunning).

Trigger vs Every

Trigger tasks run when triggered:

h, _ := m.Add(task.Trigger(func(ctx context.Context) error {
	return rebuildIndex(ctx)
}), task.WithName("rebuild-index"))

h.Trigger() // fire-and-forget

By default, Trigger uses OverlapMerge (maxConcurrent defaults to 1).

Every tasks run periodically:

_, _ = m.Add(task.Every(10*time.Second, refreshCache),
	task.WithName("cache-refresh"),
)

By default, Every uses OverlapSkip (maxConcurrent defaults to 1).

By default, Every does NOT run immediately on Start (first run happens after one interval). Use WithStartImmediately(true) to run immediately.

Fixed-delay vs fixed-rate

EveryFixedDelay (default): next run is scheduled after a run finishes, then waits interval.

Note: fixed-delay uses the completion time of the latest run, regardless of whether the run was triggered manually (Handle.Trigger) or scheduled. In other words, manual triggers also "reset" the delay window: the next fixed-delay tick will be interval after the most recent completion. If a tick happens while a run is still in-flight and the overlap policy is OverlapSkip, that tick is dropped; the scheduler still waits for the next completion to compute the following tick.

EveryFixedRate: schedules run opportunities aligned to a base time and interval, but it never "catches up" by emitting multiple missed ticks; it only schedules the next tick.

Base time:

  • If a task is added before Manager.Start, base time is the Start time.
  • If a task is added after Manager.Start, base time is the Add time.

Overlap policies (Skip / Merge)

Under contention (max concurrency reached), run opportunities are handled as:

  • OverlapSkip: drop the opportunity (TriggerAndWait returns ErrSkipped)
  • OverlapMerge: coalesce all overlapping opportunities into one pending run

This keeps behavior bounded; task does not provide unbounded queues.

Hooks

Manager and task options may provide OnRunStart/OnRunFinish hooks.

Hooks are called synchronously on the task execution path. They must be fast and must not block (avoid network I/O and long computations). If you need asynchronous processing, start your own goroutine or send to a buffered channel from the hook.

TriggerAndWait

TriggerAndWait is useful for admin/ops endpoints or startup "run once now":

if err := h.TriggerAndWait(ctx); err != nil {
	// ErrNotRunning / ErrClosed / ErrSkipped / ErrPanicked / ctx.Err() / or the task's error
}

Observability

Task status can be observed via Handle.Status() or Manager.Snapshot(), which is designed for consumption by an ops layer:

snap := m.Snapshot()
if st, ok := snap.Get("cache-refresh"); ok {
	_ = st.LastError
	_ = st.NextRun
}

Note on Status.LastError: LastError is updated only when a run fails (or panics), and it is not cleared on success. Treat it as "last failure" rather than "last run error"; use the timestamps/counters to interpret recency and outcome.

Names and lookup

Task names are optional. If a task is named (WithName), the name is:

  • normalized by strings.TrimSpace
  • validated against [A-Za-z0-9._-]
  • unique within the Manager

Named tasks can be looked up by name:

h, ok := m.Lookup("cache-refresh")
_ = h

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyStarted is returned by Start when called more than once.
	ErrAlreadyStarted = errors.New("task: manager already started")
	// ErrClosed is returned when the manager is shutting down or already stopped.
	ErrClosed = errors.New("task: manager closed")
	// ErrNotRunning is returned when an operation requires a running manager.
	ErrNotRunning = errors.New("task: manager not running")
	// ErrSkipped indicates a run opportunity was dropped due to OverlapSkip.
	ErrSkipped = errors.New("task: trigger skipped")
	// ErrPanicked indicates a run panicked (panic is recovered and reported).
	ErrPanicked = errors.New("task: run panicked")

	// ErrInvalidName is returned by Add when a task name is invalid.
	//
	// Name rules:
	//   - name is optional (empty means unnamed)
	//   - non-empty name must match [A-Za-z0-9._-]
	//   - name is normalized by strings.TrimSpace before validation
	ErrInvalidName = errors.New("task: invalid name")

	// ErrDuplicateName is returned by Add when a non-empty task name is already registered.
	//
	// Names are unique within a manager (after normalization).
	ErrDuplicateName = errors.New("task: duplicate name")
)

Functions

This section is empty.

Types

type EveryMode

type EveryMode int

EveryMode controls the scheduling semantics of an Every task.

const (
	// EveryFixedDelay schedules the next run after a run finishes, then waits interval.
	EveryFixedDelay EveryMode = iota
	// EveryFixedRate schedules run opportunities aligned to a base time and interval.
	// It never "catches up" by emitting multiple missed ticks; it only schedules the next tick.
	EveryFixedRate
)

func (EveryMode) String

func (m EveryMode) String() string

type Func

type Func func(context.Context) error

Func is the user-provided function executed by a task.

Returning a non-nil error is reported via the configured error handler (unless filtered), and may also be observed via TriggerAndWait.

type Handle

type Handle interface {
	// Name returns the configured name (may be empty).
	Name() string

	// Trigger requests a run opportunity. It is equivalent to calling TryTrigger and
	// ignoring the return value.
	Trigger()

	// TryTrigger requests a run opportunity and reports whether it was accepted.
	TryTrigger() bool

	// TriggerAndWait requests a run opportunity and waits for its completion.
	//
	// Errors:
	//   - ErrNotRunning: Manager not started yet.
	//   - ErrClosed: Manager is shutting down or already stopped.
	//   - ErrSkipped: OverlapSkip drops this trigger due to concurrency.
	//   - ctx.Err(): ctx canceled/timeout while waiting.
	//
	// If the run panicked, TriggerAndWait returns ErrPanicked.
	// If the run returned an error (and it was not filtered), that error is returned.
	//
	// If ctx is nil, it is treated as context.Background().
	TriggerAndWait(ctx context.Context) error

	// Status returns a snapshot of the task's current status.
	Status() Status
}

Handle is a registered task handle.

Triggering before Start is a no-op (TryTrigger returns false). Triggering during/after Shutdown is a no-op (TryTrigger returns false).

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates tasks and their lifecycles.

It is safe for concurrent use.

The zero value is ready to use with default configuration. To apply ManagerOption (hooks/handlers), use NewManager.

Example (ShutdownWithoutStart)
package main

import (
	"context"
	"fmt"

	"github.com/evan-idocoding/zkit/rt/task"
)

func main() {
	m := task.NewManager()
	h := m.MustAdd(task.Trigger(func(context.Context) error { return nil }), task.WithName("job"))

	_ = m.Shutdown(context.Background())

	fmt.Println(h.Status().State)

}
Output:

stopped
Example (Snapshot)
package main

import (
	"context"
	"fmt"

	"github.com/evan-idocoding/zkit/rt/task"
)

func main() {
	m := task.NewManager()
	h := m.MustAdd(task.Trigger(func(context.Context) error { return nil }), task.WithName("job"))

	_ = m.Start(context.Background())
	defer m.Shutdown(context.Background())

	_ = h.TriggerAndWait(context.Background())

	snap := m.Snapshot()
	st, ok := snap.Get("job")
	fmt.Println(ok, st.RunCount, st.SuccessCount, st.FailCount, st.CanceledCount)

}
Output:

true 1 1 0 0
Example (TriggerAndWait)
package main

import (
	"context"
	"fmt"

	"github.com/evan-idocoding/zkit/rt/task"
)

func main() {
	m := task.NewManager()
	h := m.MustAdd(task.Trigger(func(context.Context) error {
		fmt.Println("ran")
		return nil
	}), task.WithName("rebuild-index"))

	_ = m.Start(context.Background())
	defer m.Shutdown(context.Background())

	_ = h.TriggerAndWait(context.Background())

}
Output:

ran

func NewManager

func NewManager(opts ...ManagerOption) *Manager

NewManager creates a new Manager.

func (*Manager) Add

func (m *Manager) Add(t Task, opts ...Option) (Handle, error)

Add registers a task and returns its handle.

It can be called before or after Start. If called during/after Shutdown, it returns ErrClosed.

func (*Manager) Lookup

func (m *Manager) Lookup(name string) (Handle, bool)

Lookup finds a task handle by name.

Name is normalized by strings.TrimSpace. Empty names are not indexed and always return (nil, false). Lookup is safe for concurrent use.

func (*Manager) MustAdd

func (m *Manager) MustAdd(t Task, opts ...Option) Handle

MustAdd is like Add but panics on error.

It is intended for initialization-time wiring where an error indicates a programming/configuration mistake (for example, invalid name or duplicate name).

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown stops scheduling new work, cancels task contexts, and waits for running tasks to finish.

Shutdown is safe to call multiple times. It is also safe to call without a prior Start; in that case, it marks tasks as stopped for observability and returns nil.

If ctx is nil, it is treated as context.Background().

func (*Manager) Snapshot

func (m *Manager) Snapshot() Snapshot

Snapshot returns a point-in-time view of all tasks.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts all tasks and their schedulers.

Start is not idempotent: calling it more than once returns ErrAlreadyStarted. If ctx is nil, it is treated as context.Background().

func (*Manager) Wait

func (m *Manager) Wait()

Wait waits until all internal goroutines (schedulers + runs) have exited.

type ManagerOption

type ManagerOption func(*managerConfig)

func WithManagerErrorHandler

func WithManagerErrorHandler(h safego.ErrorHandler) ManagerOption

WithManagerErrorHandler sets a default error handler for tasks added to this manager.

func WithManagerOnRunFinish

func WithManagerOnRunFinish(fn func(info RunFinishInfo)) ManagerOption

WithManagerOnRunFinish sets a global hook for all tasks in this manager.

func WithManagerOnRunStart

func WithManagerOnRunStart(fn func(info RunStartInfo)) ManagerOption

WithManagerOnRunStart sets a global hook for all tasks in this manager.

func WithManagerPanicHandler

func WithManagerPanicHandler(h safego.PanicHandler) ManagerOption

WithManagerPanicHandler sets a default panic handler for tasks added to this manager.

func WithManagerReportContextCancel

func WithManagerReportContextCancel(report bool) ManagerOption

WithManagerReportContextCancel sets the default reportContextCancel for tasks added to this manager.

type Option

type Option func(*taskConfig)

func WithErrorHandler

func WithErrorHandler(h safego.ErrorHandler) Option

WithErrorHandler sets the error handler. If not set, errors are reported to stderr by default.

func WithEveryMode

func WithEveryMode(mode EveryMode) Option

WithEveryMode sets the scheduling mode for an Every task. Default is EveryFixedDelay.

func WithMaxConcurrent

func WithMaxConcurrent(n int) Option

WithMaxConcurrent sets the max concurrent runs for a task.

If n <= 0, Add panics (configuration error).

func WithName

func WithName(name string) Option

WithName sets a human-friendly task name.

Notes:

  • Name is optional (empty means unnamed).
  • Name is normalized by strings.TrimSpace.
  • Non-empty names must match [A-Za-z0-9._-].
  • Non-empty names are unique within a Manager; Add returns ErrDuplicateName on duplicates.

func WithOnRunFinish

func WithOnRunFinish(fn func(info RunFinishInfo)) Option

WithOnRunFinish sets a hook to observe run finishes. Hooks are called synchronously.

func WithOnRunStart

func WithOnRunStart(fn func(info RunStartInfo)) Option

WithOnRunStart sets a hook to observe run starts. Hooks are called synchronously.

func WithOverlapPolicy

func WithOverlapPolicy(p OverlapPolicy) Option

WithOverlapPolicy sets how run opportunities are handled under contention.

func WithPanicHandler

func WithPanicHandler(h safego.PanicHandler) Option

WithPanicHandler sets the panic handler. If not set, panics are reported to stderr by default.

func WithReportContextCancel

func WithReportContextCancel(report bool) Option

WithReportContextCancel controls whether context.Canceled and context.DeadlineExceeded are reported.

func WithStartImmediately

func WithStartImmediately(v bool) Option

WithStartImmediately controls whether an Every task runs immediately upon Start/Add. Default is false.

func WithTags

func WithTags(tags ...safego.Tag) Option

WithTags appends tags for error/panic reports and hooks.

type OverlapPolicy

type OverlapPolicy int

OverlapPolicy controls how overlapping run opportunities are handled.

const (
	// OverlapSkip drops a run opportunity if max concurrency is reached.
	OverlapSkip OverlapPolicy = iota
	// OverlapMerge merges all overlapping run opportunities into a single pending run.
	OverlapMerge
)

func (OverlapPolicy) String

func (p OverlapPolicy) String() string

type RunFinishInfo

type RunFinishInfo struct {
	Name string
	Tags []safego.Tag

	Kind        RunKind
	ScheduledAt time.Time

	StartedAt  time.Time
	FinishedAt time.Time
	Duration   time.Duration

	Err      string
	Panicked bool
}

RunFinishInfo is passed to OnRunFinish hooks.

type RunKind

type RunKind int

RunKind indicates why a run started.

const (
	RunKindTrigger RunKind = iota
	RunKindSchedule
)

func (RunKind) String

func (k RunKind) String() string

type RunStartInfo

type RunStartInfo struct {
	Name string
	Tags []safego.Tag

	Kind        RunKind
	ScheduledAt time.Time // non-zero for schedule-based runs (best-effort).

	StartedAt time.Time
}

RunStartInfo is passed to OnRunStart hooks.

type Runner

type Runner interface {
	Start(context.Context) error
	Shutdown(context.Context) error
	Wait()
}

Runner is a small lifecycle interface implemented by Manager for app assembly.

type Snapshot

type Snapshot struct {
	Tasks []Status
}

Snapshot is a point-in-time view of all tasks in a Manager.

func (Snapshot) Get

func (s Snapshot) Get(name string) (Status, bool)

Get finds a task status by name.

type State

type State int

State is the high-level lifecycle state of a task.

const (
	StateNotStarted State = iota
	StateIdle
	StateRunning
	StateStopping
	StateStopped
)

func (State) String

func (s State) String() string

type Status

type Status struct {
	Name  string
	Tags  []safego.Tag
	State State

	Running int
	// Pending is true when OverlapMerge has a pending run opportunity.
	// Pending does not imply the task is currently running; check State/Running for that.
	Pending bool

	RunCount     uint64
	FailCount    uint64
	SuccessCount uint64
	// CanceledCount counts context cancellation / deadline exceeded that is filtered by
	// reportContextCancel=false (i.e. not reported, and not treated as success or failure).
	CanceledCount uint64

	LastStarted  time.Time
	LastFinished time.Time
	LastSuccess  time.Time

	LastDuration time.Duration
	// LastError is the most recent run error for this task *when the run failed*.
	//
	// Important semantics:
	//   - LastError is updated only when a run fails (non-nil error) or panics ("panic").
	//   - LastError is NOT cleared on success.
	//   - Therefore, LastError represents "last failure" rather than "last run error".
	//
	// Use LastFinished/LastSuccess and the counters (FailCount/SuccessCount/CanceledCount)
	// to interpret recency and outcome.
	LastError string

	// NextRun is the next scheduled time (Every tasks only). Zero for Trigger tasks.
	NextRun time.Time
}

Status is a task state snapshot.

type Task

type Task interface {
	// contains filtered or unexported methods
}

Task is a task definition that can be registered to a Manager.

func Every

func Every(interval time.Duration, fn Func) Task

Every creates a periodic task that runs at the given interval.

Default scheduling mode is EveryFixedDelay. Default startImmediately is false (first run happens after one interval).

The interval must be > 0; otherwise Manager.Add panics (configuration error).

func Trigger

func Trigger(fn Func) Task

Trigger creates a trigger-based task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL