core

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Exit codes for swarm service execution states
	// These are Ofelia-specific codes, not from Docker Swarm API
	// They indicate failure modes that don't map to container exit codes
	ExitCodeSwarmError = -999 // Swarm orchestration error (task not found, service unavailable)
	ExitCodeTimeout    = -998 // Max runtime exceeded before task completion
)
View Source
const DefaultStopTimeout = 30 * time.Second

DefaultStopTimeout is the default timeout for graceful shutdown.

View Source
const HashmeTagName = "hash"
View Source
const TriggeredSchedule = "@triggered"

TriggeredSchedule is a special schedule keyword for jobs that should only run when triggered by another job's on-success/on-failure, or manually via RunJob(). Jobs with this schedule are not added to the cron scheduler.

Variables

View Source
var (
	// Container errors
	ErrContainerNotFound     = errors.New("container not found")
	ErrContainerStartFailed  = errors.New("failed to start container")
	ErrContainerCreateFailed = errors.New("failed to create container")
	ErrContainerStopFailed   = errors.New("failed to stop container")
	ErrContainerRemoveFailed = errors.New("failed to remove container")

	// Image errors
	ErrImageNotFound      = errors.New("image not found")
	ErrImagePullFailed    = errors.New("failed to pull image")
	ErrLocalImageNotFound = errors.New("local image not found")

	// Service errors
	ErrServiceNotFound     = errors.New("service not found")
	ErrServiceCreateFailed = errors.New("failed to create service")
	ErrServiceStartFailed  = errors.New("failed to start service")
	ErrServiceRemoveFailed = errors.New("failed to remove service")

	// Job errors
	ErrJobNotFound      = errors.New("job not found")
	ErrJobAlreadyExists = errors.New("job already exists")
	ErrJobExecution     = errors.New("job execution failed")
	ErrMaxTimeRunning   = errors.New("max runtime exceeded")
	ErrUnexpected       = errors.New("unexpected error")

	// Workflow errors
	ErrCircularDependency = errors.New("circular dependency detected")
	ErrDependencyNotMet   = errors.New("job dependencies not met")
	ErrWorkflowInvalid    = errors.New("invalid workflow configuration")

	// Validation errors
	ErrEmptyCommand         = errors.New("command cannot be empty")
	ErrUnsupportedFieldType = errors.New("unsupported field type")
	ErrImageOrContainer     = errors.New("job-run requires either 'image' or 'container'")
	ErrImageRequired        = errors.New("job-service-run requires 'image' to create a new swarm service")

	// Scheduler errors
	ErrSchedulerTimeout = errors.New("scheduler stop timed out")

	// Shutdown errors
	ErrShutdownInProgress = errors.New("shutdown already in progress")
	ErrShutdownTimeout    = errors.New("shutdown timed out")
	ErrJobCanceled        = errors.New("job canceled: shutdown in progress")
	ErrCannotStartJob     = errors.New("cannot start job during shutdown")
	ErrWaitTimeout        = errors.New("timeout waiting for jobs to complete")

	// Resilience errors
	ErrCircuitBreakerOpen     = errors.New("circuit breaker is open")
	ErrCircuitBreakerHalfOpen = errors.New("circuit breaker is half-open but max calls reached")
	ErrCircuitBreakerUnknown  = errors.New("circuit breaker is in unknown state")
	ErrRateLimitExceeded      = errors.New("rate limit exceeded")
	ErrTokensExceedCapacity   = errors.New("requested tokens exceed capacity")
	ErrBulkheadFull           = errors.New("bulkhead is full")

	// Docker SDK errors
	ErrResponseChannelClosed = errors.New("response channel closed unexpectedly")
)

Common errors used across the package

View Source
var (
	ErrEmptyScheduler = errors.New("unable to start an empty scheduler")
	ErrEmptySchedule  = errors.New("unable to add a job with an empty schedule")
)
View Source
var (
	// DefaultBufferPool provides enhanced performance for job execution
	// This replaces the simple buffer pool with multi-tier adaptive pooling
	// Note: ShrinkInterval is set to 0 to prevent background goroutine at package init
	DefaultBufferPool = func() *EnhancedBufferPool {
		cfg := DefaultEnhancedBufferPoolConfig()
		cfg.ShrinkInterval = 0
		cfg.EnablePrewarming = false
		cfg.PoolSize = 0
		return NewEnhancedBufferPool(cfg, nil)
	}()
)

Global enhanced buffer pool instance

View Source
var ErrSkippedExecution = errors.New("skipped execution")

ErrSkippedExecution pass this error to `Execution.Stop` if you wish to mark it as skipped.

View Source
var GlobalPerformanceMetrics = NewPerformanceMetrics()

Global enhanced metrics instance

View Source
var Version = "dev"

Version is the Ofelia version, set via ldflags during build. Defaults to "dev" if not set.

Functions

func GetHash added in v0.10.1

func GetHash(t reflect.Type, v reflect.Value, hash *string) error

func IsNonZeroExitError added in v0.10.1

func IsNonZeroExitError(err error) bool

IsNonZeroExitError checks if the error is a non-zero exit code error

func IsRetryableError added in v0.10.1

func IsRetryableError(err error) bool

IsRetryableError checks if an error should trigger a retry

func IsTriggeredSchedule added in v0.14.0

func IsTriggeredSchedule(schedule string) bool

IsTriggeredSchedule returns true if the schedule indicates the job should only run when triggered (not on a time-based schedule).

func Retry added in v0.10.1

func Retry(ctx context.Context, policy *RetryPolicy, fn func() error) error

Retry executes a function with retry logic

func SetDefaultClock added in v0.17.0

func SetDefaultClock(c Clock)

func SetGlobalBufferPoolLogger added in v0.11.0

func SetGlobalBufferPoolLogger(logger *slog.Logger)

SetGlobalBufferPoolLogger sets the logger for the global buffer pool

func WrapContainerError added in v0.10.1

func WrapContainerError(op string, containerID string, err error) error

WrapContainerError wraps a container-related error with context

func WrapImageError added in v0.10.1

func WrapImageError(op string, image string, err error) error

WrapImageError wraps an image-related error with context

func WrapJobError added in v0.10.1

func WrapJobError(op string, jobName string, err error) error

WrapJobError wraps a job-related error with context

func WrapServiceError added in v0.10.1

func WrapServiceError(op string, serviceID string, err error) error

WrapServiceError wraps a service-related error with context

Types

type BareJob

type BareJob struct {
	Schedule string `hash:"true"`
	Name     string `hash:"true"`
	Command  string `hash:"true"`
	// RunOnStartup controls whether the job is executed once immediately when the scheduler starts,
	// before regular cron-based scheduling begins. This is a boolean flag with a default value of false.
	// Startup executions are dispatched in non-blocking goroutines so they do not delay scheduler startup.
	RunOnStartup     bool     `default:"false" gcfg:"run-on-startup" mapstructure:"run-on-startup" hash:"true"`
	HistoryLimit     int      `default:"10"`
	MaxRetries       int      `default:"0"`                                  // Maximum number of retry attempts (0 = no retries)
	RetryDelayMs     int      `default:"1000"`                               // Initial retry delay in milliseconds
	RetryExponential bool     `default:"true"`                               // Use exponential backoff for retries
	RetryMaxDelayMs  int      `default:"60000"`                              // Maximum retry delay in milliseconds (1 minute)
	Dependencies     []string `gcfg:"depends-on" mapstructure:"depends-on,"` // Jobs that must complete first
	OnSuccess        []string `gcfg:"on-success" mapstructure:"on-success,"` // Jobs to trigger on success
	OnFailure        []string `gcfg:"on-failure" mapstructure:"on-failure,"` // Jobs to trigger on failure
	AllowParallel    bool     `default:"true"`                               // Allow job to run in parallel with others
	// contains filtered or unexported fields
}

func (*BareJob) GetCommand

func (j *BareJob) GetCommand() string

func (*BareJob) GetCronJobID

func (j *BareJob) GetCronJobID() uint64

func (*BareJob) GetHistory added in v0.8.0

func (j *BareJob) GetHistory() []*Execution

GetHistory returns a copy of the job's execution history.

func (*BareJob) GetLastRun added in v0.8.0

func (j *BareJob) GetLastRun() *Execution

GetLastRun returns the last execution of the job, if any.

func (*BareJob) GetName

func (j *BareJob) GetName() string

func (*BareJob) GetRetryConfig added in v0.10.1

func (j *BareJob) GetRetryConfig() RetryConfig

GetRetryConfig returns the retry configuration for the job

func (*BareJob) GetSchedule

func (j *BareJob) GetSchedule() string

func (*BareJob) Hash

func (j *BareJob) Hash() (string, error)

Returns a hash of all the job attributes. Used to detect changes

func (*BareJob) Middlewares

func (c *BareJob) Middlewares() []Middleware

func (*BareJob) NotifyStart

func (j *BareJob) NotifyStart()

func (*BareJob) NotifyStop

func (j *BareJob) NotifyStop()

func (*BareJob) ResetMiddlewares added in v0.9.0

func (c *BareJob) ResetMiddlewares(ms ...Middleware)

func (*BareJob) Run added in v0.10.1

func (j *BareJob) Run(ctx *Context) error

Run implements the Job interface - this is handled by jobWrapper

func (*BareJob) Running

func (j *BareJob) Running() int32

func (*BareJob) SetCronJobID

func (j *BareJob) SetCronJobID(id uint64)

func (*BareJob) SetLastRun added in v0.8.0

func (j *BareJob) SetLastRun(e *Execution)

SetLastRun stores the last executed run for the job.

func (*BareJob) ShouldRunOnStartup added in v0.18.0

func (j *BareJob) ShouldRunOnStartup() bool

ShouldRunOnStartup returns true if the job should run immediately when the scheduler starts.

func (*BareJob) Use

func (c *BareJob) Use(ms ...Middleware)

type Bulkhead added in v0.10.1

type Bulkhead struct {
	// contains filtered or unexported fields
}

Bulkhead implements the bulkhead pattern for resource isolation

func NewBulkhead added in v0.10.1

func NewBulkhead(name string, maxConcurrent int) *Bulkhead

NewBulkhead creates a new bulkhead

func (*Bulkhead) Execute added in v0.10.1

func (b *Bulkhead) Execute(ctx context.Context, fn func() error) error

Execute runs a function with bulkhead protection

func (*Bulkhead) GetMetrics added in v0.10.1

func (b *Bulkhead) GetMetrics() map[string]any

GetMetrics returns bulkhead metrics

type CircuitBreaker added in v0.10.1

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker added in v0.10.1

func NewCircuitBreaker(name string, maxFailures uint32, resetTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute added in v0.10.1

func (cb *CircuitBreaker) Execute(fn func() error) error

Execute runs a function through the circuit breaker

func (*CircuitBreaker) GetMetrics added in v0.10.1

func (cb *CircuitBreaker) GetMetrics() map[string]any

GetMetrics returns circuit breaker metrics

func (*CircuitBreaker) GetState added in v0.10.1

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current state of the circuit breaker

type CircuitBreakerState added in v0.10.1

type CircuitBreakerState int

CircuitBreakerState represents the state of a circuit breaker

const (
	StateClosed CircuitBreakerState = iota
	StateOpen
	StateHalfOpen
)

func (CircuitBreakerState) String added in v0.10.1

func (s CircuitBreakerState) String() string

type Clock added in v0.17.0

type Clock interface {
	Now() time.Time
	NewTicker(d time.Duration) Ticker
	NewTimer(d time.Duration) Timer
	After(d time.Duration) <-chan time.Time
	Sleep(d time.Duration)
}

func GetDefaultClock added in v0.17.0

func GetDefaultClock() Clock

func NewRealClock added in v0.17.0

func NewRealClock() Clock

type ComposeJob added in v0.9.0

type ComposeJob struct {
	BareJob `mapstructure:",squash"`
	File    string `default:"compose.yml" gcfg:"file" mapstructure:"file" hash:"true"`
	Service string `gcfg:"service" mapstructure:"service" hash:"true"`
	Exec    bool   `default:"false" gcfg:"exec" mapstructure:"exec" hash:"true"`
}

func NewComposeJob added in v0.9.0

func NewComposeJob() *ComposeJob

func (*ComposeJob) Middlewares added in v0.9.0

func (c *ComposeJob) Middlewares() []Middleware

func (*ComposeJob) ResetMiddlewares added in v0.9.0

func (c *ComposeJob) ResetMiddlewares(ms ...Middleware)

func (*ComposeJob) Run added in v0.9.0

func (j *ComposeJob) Run(ctx *Context) error

func (*ComposeJob) Use added in v0.9.0

func (c *ComposeJob) Use(ms ...Middleware)

type ContainerLogsOptions added in v0.12.0

type ContainerLogsOptions struct {
	ShowStdout bool
	ShowStderr bool
	Since      time.Time
	Tail       string
	Follow     bool
}

ContainerLogsOptions defines options for container log retrieval.

type Context

type Context struct {
	Scheduler *Scheduler
	Logger    *slog.Logger
	Job       Job
	Execution *Execution
	Ctx       context.Context //nolint:containedctx // intentional: propagates go-cron's per-entry context through middleware chain
	// contains filtered or unexported fields
}

func NewContext

func NewContext(s *Scheduler, j Job, e *Execution) *Context

func NewContextWithContext added in v0.20.0

func NewContextWithContext(ctx context.Context, s *Scheduler, j Job, e *Execution) *Context

NewContextWithContext creates a Context with a specific context.Context, typically the per-entry context provided by go-cron's JobWithContext interface.

func (*Context) Log

func (c *Context) Log(msg string)

func (*Context) Next

func (c *Context) Next() error

func (*Context) Start

func (c *Context) Start()

func (*Context) Stop

func (c *Context) Stop(err error)

func (*Context) Warn

func (c *Context) Warn(msg string)

type CronClock added in v0.17.0

type CronClock struct {
	*FakeClock
}

func NewCronClock added in v0.17.0

func NewCronClock(start time.Time) *CronClock

func (*CronClock) NewTimer added in v0.17.0

func (c *CronClock) NewTimer(d time.Duration) cron.Timer

type CronUtils

type CronUtils struct {
	Logger *slog.Logger
}

Implement the cron logger interface

func NewCronUtils

func NewCronUtils(l *slog.Logger) *CronUtils

func (*CronUtils) Error

func (c *CronUtils) Error(err error, msg string, keysAndValues ...any)

func (*CronUtils) Info

func (c *CronUtils) Info(msg string, keysAndValues ...any)

type DependencyNode added in v0.10.1

type DependencyNode struct {
	Job          Job
	Dependencies []string // Job names this job depends on
	Dependents   []string // Job names that depend on this job
	OnSuccess    []string // Jobs to trigger on success
	OnFailure    []string // Jobs to trigger on failure
}

DependencyNode represents a job in the dependency graph

type DockerProvider added in v0.12.0

type DockerProvider interface {
	// Container operations
	CreateContainer(ctx context.Context, config *domain.ContainerConfig, name string) (string, error)
	StartContainer(ctx context.Context, containerID string) error
	StopContainer(ctx context.Context, containerID string, timeout *time.Duration) error
	RemoveContainer(ctx context.Context, containerID string, force bool) error
	InspectContainer(ctx context.Context, containerID string) (*domain.Container, error)
	ListContainers(ctx context.Context, opts domain.ListOptions) ([]domain.Container, error)
	WaitContainer(ctx context.Context, containerID string) (int64, error)
	GetContainerLogs(ctx context.Context, containerID string, opts ContainerLogsOptions) (io.ReadCloser, error)

	// Exec operations
	CreateExec(ctx context.Context, containerID string, config *domain.ExecConfig) (string, error)
	StartExec(ctx context.Context, execID string, opts domain.ExecStartOptions) (*domain.HijackedResponse, error)
	InspectExec(ctx context.Context, execID string) (*domain.ExecInspect, error)
	RunExec(ctx context.Context, containerID string, config *domain.ExecConfig, stdout, stderr io.Writer) (int, error)

	// Image operations
	PullImage(ctx context.Context, image string) error
	HasImageLocally(ctx context.Context, image string) (bool, error)
	EnsureImage(ctx context.Context, image string, forcePull bool) error

	// Network operations
	ConnectNetwork(ctx context.Context, networkID, containerID string) error
	FindNetworkByName(ctx context.Context, networkName string) ([]domain.Network, error)

	// Event operations
	SubscribeEvents(ctx context.Context, filter domain.EventFilter) (<-chan domain.Event, <-chan error)

	// Service operations (Swarm)
	CreateService(ctx context.Context, spec domain.ServiceSpec, opts domain.ServiceCreateOptions) (string, error)
	InspectService(ctx context.Context, serviceID string) (*domain.Service, error)
	ListTasks(ctx context.Context, opts domain.TaskListOptions) ([]domain.Task, error)
	RemoveService(ctx context.Context, serviceID string) error
	WaitForServiceTasks(ctx context.Context, serviceID string, timeout time.Duration) ([]domain.Task, error)

	// System operations
	Info(ctx context.Context) (*domain.SystemInfo, error)
	Ping(ctx context.Context) error

	// Lifecycle
	Close() error
}

DockerProvider defines the interface for Docker operations. The SDK adapter implements this interface.

type EnhancedBufferPool added in v0.11.0

type EnhancedBufferPool struct {
	// contains filtered or unexported fields
}

EnhancedBufferPool provides high-performance buffer management with adaptive sizing

func NewBufferPool added in v0.10.1

func NewBufferPool(minSize, defaultSize, maxSize int64) *EnhancedBufferPool

NewBufferPool is a compatibility wrapper for tests and old code It returns the enhanced buffer pool with custom configuration

func NewEnhancedBufferPool added in v0.11.0

func NewEnhancedBufferPool(config *EnhancedBufferPoolConfig, logger *slog.Logger) *EnhancedBufferPool

NewEnhancedBufferPool creates a new enhanced buffer pool with adaptive management

func (*EnhancedBufferPool) Get added in v0.11.0

func (ebp *EnhancedBufferPool) Get() (*circbuf.Buffer, error)

Get retrieves a buffer from the pool, optimized for high concurrency

func (*EnhancedBufferPool) GetSized added in v0.11.0

func (ebp *EnhancedBufferPool) GetSized(requestedSize int64) (*circbuf.Buffer, error)

GetSized retrieves a buffer with a specific size requirement, with intelligent size selection

func (*EnhancedBufferPool) GetStats added in v0.11.0

func (ebp *EnhancedBufferPool) GetStats() map[string]any

GetStats returns comprehensive performance statistics

func (*EnhancedBufferPool) Put added in v0.11.0

func (ebp *EnhancedBufferPool) Put(buf *circbuf.Buffer)

Put returns a buffer to the appropriate pool

func (*EnhancedBufferPool) Shutdown added in v0.11.0

func (ebp *EnhancedBufferPool) Shutdown()

Shutdown gracefully stops the enhanced buffer pool

type EnhancedBufferPoolConfig added in v0.11.0

type EnhancedBufferPoolConfig struct {
	MinSize          int64         `json:"minSize"`          // Minimum buffer size
	DefaultSize      int64         `json:"defaultSize"`      // Default buffer size
	MaxSize          int64         `json:"maxSize"`          // Maximum buffer size
	PoolSize         int           `json:"poolSize"`         // Number of buffers to pre-allocate
	MaxPoolSize      int           `json:"maxPoolSize"`      // Maximum number of buffers in pool
	GrowthFactor     float64       `json:"growthFactor"`     // Factor to increase pool size when needed
	ShrinkThreshold  float64       `json:"shrinkThreshold"`  // Usage percentage below which to shrink
	ShrinkInterval   time.Duration `json:"shrinkInterval"`   // How often to check for shrinking
	EnableMetrics    bool          `json:"enableMetrics"`    // Enable performance metrics
	EnablePrewarming bool          `json:"enablePrewarming"` // Pre-allocate buffers on startup
}

EnhancedBufferPoolConfig holds configuration for the enhanced buffer pool

func DefaultEnhancedBufferPoolConfig added in v0.11.0

func DefaultEnhancedBufferPoolConfig() *EnhancedBufferPoolConfig

DefaultEnhancedBufferPoolConfig returns optimized defaults for high-concurrency scenarios

type ExecJob

type ExecJob struct {
	BareJob   `mapstructure:",squash"`
	Provider  DockerProvider `json:"-"` // SDK-based Docker provider
	Container string         `hash:"true"`
	// User specifies the user to run the command as.
	// If not set, uses the global default-user setting (default: "nobody").
	// Set to "default" to explicitly use the container's default user, overriding global setting.
	User        string   `hash:"true"`
	TTY         bool     `default:"false" hash:"true"`
	Environment []string `mapstructure:"environment" hash:"true"`
	WorkingDir  string   `mapstructure:"working-dir" hash:"true"`
}

func NewExecJob

func NewExecJob(provider DockerProvider) *ExecJob

func (*ExecJob) InitializeRuntimeFields added in v0.10.2

func (j *ExecJob) InitializeRuntimeFields()

InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.

func (*ExecJob) Middlewares

func (c *ExecJob) Middlewares() []Middleware

func (*ExecJob) ResetMiddlewares added in v0.9.0

func (c *ExecJob) ResetMiddlewares(ms ...Middleware)

func (*ExecJob) Run

func (j *ExecJob) Run(ctx *Context) error

func (*ExecJob) RunWithStreams added in v0.12.0

func (j *ExecJob) RunWithStreams(ctx context.Context, stdout, stderr io.Writer) (int, error)

RunWithStreams runs the exec job with custom output streams. This is useful for testing or when custom stream handling is needed.

func (*ExecJob) Use

func (c *ExecJob) Use(ms ...Middleware)

type Execution

type Execution struct {
	ID        string
	Date      time.Time
	Duration  time.Duration
	IsRunning bool
	Failed    bool
	Skipped   bool
	Error     error

	OutputStream, ErrorStream *circbuf.Buffer `json:"-"`

	// Captured output for persistence after buffer cleanup
	CapturedStdout, CapturedStderr string `json:"-"`
}

Execution contains all the information relative to a Job execution.

func NewExecution

func NewExecution() (*Execution, error)

NewExecution returns a new Execution, with a random ID

func (*Execution) Cleanup added in v0.10.1

func (e *Execution) Cleanup()

Cleanup returns execution buffers to the pool for reuse

func (*Execution) GetStderr added in v0.10.1

func (e *Execution) GetStderr() string

GetStderr returns stderr content, preferring live buffer if available

func (*Execution) GetStdout added in v0.10.1

func (e *Execution) GetStdout() string

GetStdout returns stdout content, preferring live buffer if available

func (*Execution) Start

func (e *Execution) Start()

Start starts the execution, initializes the running flags and the start date.

func (*Execution) Stop

func (e *Execution) Stop(err error)

Stop halts the execution. If a ErrSkippedExecution is given the execution is marked as skipped; if any other error is given the execution is marked as failed. Also mark the execution as IsRunning false and save the duration time

type FakeClock added in v0.17.0

type FakeClock struct {
	// contains filtered or unexported fields
}

func NewFakeClock added in v0.17.0

func NewFakeClock(start time.Time) *FakeClock

func (*FakeClock) Advance added in v0.17.0

func (c *FakeClock) Advance(d time.Duration)

func (*FakeClock) After added in v0.17.0

func (c *FakeClock) After(d time.Duration) <-chan time.Time

func (*FakeClock) NewTicker added in v0.17.0

func (c *FakeClock) NewTicker(d time.Duration) Ticker

func (*FakeClock) NewTimer added in v0.17.0

func (c *FakeClock) NewTimer(d time.Duration) Timer

func (*FakeClock) Now added in v0.17.0

func (c *FakeClock) Now() time.Time

func (*FakeClock) Set added in v0.17.0

func (c *FakeClock) Set(t time.Time)

func (*FakeClock) Sleep added in v0.17.0

func (c *FakeClock) Sleep(d time.Duration)

func (*FakeClock) TickerCount added in v0.17.0

func (c *FakeClock) TickerCount() int

func (*FakeClock) WaitForAdvance added in v0.17.0

func (c *FakeClock) WaitForAdvance()

type GracefulScheduler added in v0.10.1

type GracefulScheduler struct {
	*Scheduler
	// contains filtered or unexported fields
}

GracefulScheduler wraps a scheduler with graceful shutdown

func NewGracefulScheduler added in v0.10.1

func NewGracefulScheduler(scheduler *Scheduler, shutdownManager *ShutdownManager) *GracefulScheduler

NewGracefulScheduler creates a scheduler with graceful shutdown support

func (GracefulScheduler) Middlewares added in v0.10.1

func (c GracefulScheduler) Middlewares() []Middleware

func (GracefulScheduler) ResetMiddlewares added in v0.10.1

func (c GracefulScheduler) ResetMiddlewares(ms ...Middleware)

func (*GracefulScheduler) RunJobWithTracking added in v0.10.1

func (gs *GracefulScheduler) RunJobWithTracking(job Job, ctx *Context) error

RunJobWithTracking runs a job with shutdown tracking

func (GracefulScheduler) Use added in v0.10.1

func (c GracefulScheduler) Use(ms ...Middleware)

type GracefulServer added in v0.10.1

type GracefulServer struct {
	// contains filtered or unexported fields
}

GracefulServer wraps an HTTP server with graceful shutdown

func NewGracefulServer added in v0.10.1

func NewGracefulServer(server *http.Server, shutdownManager *ShutdownManager, logger *slog.Logger) *GracefulServer

NewGracefulServer creates a server with graceful shutdown support

type Job

type Job interface {
	GetName() string
	GetSchedule() string
	GetCommand() string
	ShouldRunOnStartup() bool
	Middlewares() []Middleware
	Use(...Middleware)
	Run(*Context) error
	Running() int32
	NotifyStart()
	NotifyStop()
	GetCronJobID() uint64
	SetCronJobID(uint64)
	GetHistory() []*Execution
	Hash() (string, error)
}

type JobMetrics added in v0.11.0

type JobMetrics struct {
	ExecutionCount  int64
	TotalDuration   time.Duration
	AverageDuration time.Duration
	MinDuration     time.Duration
	MaxDuration     time.Duration
	SuccessCount    int64
	FailureCount    int64
	LastExecution   time.Time
	LastSuccess     time.Time
	LastFailure     time.Time
}

JobMetrics holds metrics for individual jobs

type JobMetricsRecorder added in v0.10.1

type JobMetricsRecorder interface {
	RecordMetric(name string, value any)
	RecordJobExecution(jobName string, success bool, duration time.Duration)
	RecordRetryAttempt(jobName string, attempt int, success bool)
}

JobMetricsRecorder interface for recording job-specific metrics

type LatencyTracker added in v0.11.0

type LatencyTracker struct {
	Count   int64
	Total   time.Duration
	Min     time.Duration
	Max     time.Duration
	Average time.Duration
	// contains filtered or unexported fields
}

LatencyTracker tracks latency statistics for operations

type LocalJob

type LocalJob struct {
	BareJob     `mapstructure:",squash"`
	Dir         string   `hash:"true"`
	Environment []string `mapstructure:"environment" hash:"true"`
}

func NewLocalJob

func NewLocalJob() *LocalJob

func (*LocalJob) Middlewares

func (c *LocalJob) Middlewares() []Middleware

func (*LocalJob) ResetMiddlewares added in v0.9.0

func (c *LocalJob) ResetMiddlewares(ms ...Middleware)

func (*LocalJob) Run

func (j *LocalJob) Run(ctx *Context) error

func (*LocalJob) Use

func (c *LocalJob) Use(ms ...Middleware)

type MetricsRecorder added in v0.10.1

type MetricsRecorder interface {
	RecordJobRetry(jobName string, attempt int, success bool)
	RecordContainerEvent()
	RecordContainerMonitorFallback()
	RecordContainerMonitorMethod(usingEvents bool)
	RecordContainerWaitDuration(seconds float64)
	RecordDockerOperation(operation string)
	RecordDockerError(operation string)
	// Job scheduling metrics (from go-cron ObservabilityHooks)
	RecordJobStart(jobName string)
	RecordJobComplete(jobName string, durationSeconds float64, panicked bool)
	RecordJobScheduled(jobName string)
}

MetricsRecorder interface for recording retry and monitoring metrics

type Middleware

type Middleware interface {
	// Run is called instead of the original `Job.Run`, you MUST call to `ctx.Run`
	// inside of the middleware `Run` function otherwise you will broken the
	// Job workflow.
	Run(*Context) error
	// ContinueOnStop reports whether Run should be called even when the
	// execution has been stopped
	ContinueOnStop() bool
}

Middleware can wrap any job execution, allowing to execution code before or/and after of each `Job.Run`

type NonZeroExitError added in v0.10.1

type NonZeroExitError struct {
	ExitCode int
}

NonZeroExitError represents a container exit with non-zero code

func (NonZeroExitError) Error added in v0.10.1

func (e NonZeroExitError) Error() string

type PerformanceMetrics added in v0.11.0

type PerformanceMetrics struct {
	// contains filtered or unexported fields
}

PerformanceMetrics implements comprehensive performance tracking

func NewPerformanceMetrics added in v0.11.0

func NewPerformanceMetrics() *PerformanceMetrics

NewPerformanceMetrics creates a new performance metrics recorder

func (*PerformanceMetrics) GetDockerMetrics added in v0.11.0

func (pm *PerformanceMetrics) GetDockerMetrics() map[string]any

GetDockerMetrics returns Docker-specific metrics

func (*PerformanceMetrics) GetJobMetrics added in v0.11.0

func (pm *PerformanceMetrics) GetJobMetrics() map[string]any

GetJobMetrics returns job execution metrics

func (*PerformanceMetrics) GetMetrics added in v0.11.0

func (pm *PerformanceMetrics) GetMetrics() map[string]any

GetMetrics returns all performance metrics

func (*PerformanceMetrics) GetSummaryReport added in v0.11.0

func (pm *PerformanceMetrics) GetSummaryReport() string

GetSummaryReport generates a human-readable performance summary

func (*PerformanceMetrics) RecordBufferPoolStats added in v0.11.0

func (pm *PerformanceMetrics) RecordBufferPoolStats(stats map[string]any)

RecordBufferPoolStats records buffer pool performance statistics

func (*PerformanceMetrics) RecordConcurrentJobs added in v0.11.0

func (pm *PerformanceMetrics) RecordConcurrentJobs(count int64)

RecordConcurrentJobs tracks the number of concurrent jobs

func (*PerformanceMetrics) RecordContainerEvent added in v0.11.0

func (pm *PerformanceMetrics) RecordContainerEvent()

RecordContainerEvent records container events

func (*PerformanceMetrics) RecordContainerMonitorFallback added in v0.11.0

func (pm *PerformanceMetrics) RecordContainerMonitorFallback()

RecordContainerMonitorFallback records container monitor fallbacks

func (*PerformanceMetrics) RecordContainerMonitorMethod added in v0.11.0

func (pm *PerformanceMetrics) RecordContainerMonitorMethod(usingEvents bool)

RecordContainerMonitorMethod records container monitor method usage

func (*PerformanceMetrics) RecordContainerWaitDuration added in v0.11.0

func (pm *PerformanceMetrics) RecordContainerWaitDuration(seconds float64)

RecordContainerWaitDuration records container wait durations

func (*PerformanceMetrics) RecordCustomMetric added in v0.11.0

func (pm *PerformanceMetrics) RecordCustomMetric(name string, value any)

RecordCustomMetric records a custom metric

func (*PerformanceMetrics) RecordDockerError added in v0.11.0

func (pm *PerformanceMetrics) RecordDockerError(operation string)

RecordDockerError records a Docker operation error

func (*PerformanceMetrics) RecordDockerLatency added in v0.11.0

func (pm *PerformanceMetrics) RecordDockerLatency(operation string, duration time.Duration)

RecordDockerLatency records the latency of a Docker operation

func (*PerformanceMetrics) RecordDockerOperation added in v0.11.0

func (pm *PerformanceMetrics) RecordDockerOperation(operation string)

RecordDockerOperation records a successful Docker operation

func (*PerformanceMetrics) RecordJobComplete added in v0.13.0

func (pm *PerformanceMetrics) RecordJobComplete(jobName string, durationSeconds float64, panicked bool)

RecordJobComplete records a job completing (from go-cron ObservabilityHooks)

func (*PerformanceMetrics) RecordJobExecution added in v0.11.0

func (pm *PerformanceMetrics) RecordJobExecution(jobName string, duration time.Duration, success bool)

RecordJobExecution records a job execution with timing and success status

func (*PerformanceMetrics) RecordJobRetry added in v0.11.0

func (pm *PerformanceMetrics) RecordJobRetry(jobName string, attempt int, success bool)

RecordJobRetry records job retry attempts

func (*PerformanceMetrics) RecordJobScheduled added in v0.11.0

func (pm *PerformanceMetrics) RecordJobScheduled(jobName string)

RecordJobScheduled records when a job is scheduled

func (*PerformanceMetrics) RecordJobSkipped added in v0.11.0

func (pm *PerformanceMetrics) RecordJobSkipped(jobName string, reason string)

RecordJobSkipped records when a job is skipped

func (*PerformanceMetrics) RecordJobStart added in v0.13.0

func (pm *PerformanceMetrics) RecordJobStart(jobName string)

RecordJobStart records a job start (from go-cron ObservabilityHooks)

func (*PerformanceMetrics) RecordMemoryUsage added in v0.11.0

func (pm *PerformanceMetrics) RecordMemoryUsage(bytes int64)

RecordMemoryUsage tracks memory usage

func (*PerformanceMetrics) Reset added in v0.11.0

func (pm *PerformanceMetrics) Reset()

Reset clears all metrics (useful for testing or periodic resets)

type PerformanceRecorder added in v0.11.0

type PerformanceRecorder interface {
	MetricsRecorder // Embed existing interface (includes RecordDockerError and job scheduling metrics)

	// Extended Docker operations
	RecordDockerLatency(operation string, duration time.Duration)

	// Job operations (extended beyond MetricsRecorder)
	RecordJobExecution(jobName string, duration time.Duration, success bool)
	RecordJobSkipped(jobName string, reason string)

	// System metrics
	RecordConcurrentJobs(count int64)
	RecordMemoryUsage(bytes int64)
	RecordBufferPoolStats(stats map[string]any)

	// Custom metrics
	RecordCustomMetric(name string, value any)

	// Retrieval
	GetMetrics() map[string]any
	GetDockerMetrics() map[string]any
	GetJobMetrics() map[string]any
	Reset()
}

PerformanceRecorder defines the interface for recording comprehensive performance metrics This extends the existing MetricsRecorder interface with additional capabilities

type RateLimiter added in v0.10.1

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements token bucket rate limiting

func NewRateLimiter added in v0.10.1

func NewRateLimiter(rate float64, capacity int) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow added in v0.10.1

func (rl *RateLimiter) Allow() bool

Allow checks if a request is allowed

func (*RateLimiter) AllowN added in v0.10.1

func (rl *RateLimiter) AllowN(n int) bool

AllowN checks if n requests are allowed

func (*RateLimiter) Wait added in v0.10.1

func (rl *RateLimiter) Wait(ctx context.Context) error

Wait blocks until a request is allowed

func (*RateLimiter) WaitN added in v0.10.1

func (rl *RateLimiter) WaitN(ctx context.Context, n int) error

WaitN blocks until n requests are allowed

type ResilientJobExecutor added in v0.10.1

type ResilientJobExecutor struct {
	// contains filtered or unexported fields
}

ResilientJobExecutor wraps job execution with resilience patterns

func NewResilientJobExecutor added in v0.10.1

func NewResilientJobExecutor(job Job) *ResilientJobExecutor

NewResilientJobExecutor creates a new resilient job executor

func (*ResilientJobExecutor) Execute added in v0.10.1

func (rje *ResilientJobExecutor) Execute(ctx *Context) error

Execute runs the job with resilience patterns

func (*ResilientJobExecutor) GetCircuitBreakerState added in v0.10.1

func (rje *ResilientJobExecutor) GetCircuitBreakerState() CircuitBreakerState

GetCircuitBreakerState returns the current state of the circuit breaker

func (*ResilientJobExecutor) ResetCircuitBreaker added in v0.10.1

func (rje *ResilientJobExecutor) ResetCircuitBreaker()

ResetCircuitBreaker manually resets the circuit breaker

func (*ResilientJobExecutor) SetBulkhead added in v0.10.1

func (rje *ResilientJobExecutor) SetBulkhead(bh *Bulkhead)

SetBulkhead updates the bulkhead

func (*ResilientJobExecutor) SetCircuitBreaker added in v0.10.1

func (rje *ResilientJobExecutor) SetCircuitBreaker(cb *CircuitBreaker)

SetCircuitBreaker updates the circuit breaker

func (*ResilientJobExecutor) SetMetricsRecorder added in v0.10.1

func (rje *ResilientJobExecutor) SetMetricsRecorder(metrics JobMetricsRecorder)

SetMetricsRecorder sets the metrics recorder

func (*ResilientJobExecutor) SetRateLimiter added in v0.10.1

func (rje *ResilientJobExecutor) SetRateLimiter(rl *RateLimiter)

SetRateLimiter updates the rate limiter

func (*ResilientJobExecutor) SetRetryPolicy added in v0.10.1

func (rje *ResilientJobExecutor) SetRetryPolicy(policy *RetryPolicy)

SetRetryPolicy updates the retry policy

type RetryConfig added in v0.10.1

type RetryConfig struct {
	MaxRetries       int
	RetryDelayMs     int
	RetryExponential bool
	RetryMaxDelayMs  int
}

RetryConfig contains retry configuration for a job

type RetryExecutor added in v0.10.1

type RetryExecutor struct {
	// contains filtered or unexported fields
}

RetryExecutor wraps job execution with retry logic

func NewRetryExecutor added in v0.10.1

func NewRetryExecutor(logger *slog.Logger) *RetryExecutor

NewRetryExecutor creates a new retry executor

func (*RetryExecutor) ExecuteWithRetry added in v0.10.1

func (re *RetryExecutor) ExecuteWithRetry(job Job, ctx *Context, runFunc func(*Context) error) error

ExecuteWithRetry executes a job with retry logic

func (*RetryExecutor) SetMetricsRecorder added in v0.10.1

func (re *RetryExecutor) SetMetricsRecorder(metrics MetricsRecorder)

SetMetricsRecorder sets the metrics recorder for the retry executor

type RetryMetrics added in v0.11.0

type RetryMetrics struct {
	TotalAttempts     int64
	SuccessfulRetries int64
	FailedRetries     int64
	LastRetry         time.Time
}

RetryMetrics holds retry-specific metrics

type RetryPolicy added in v0.10.1

type RetryPolicy struct {
	MaxAttempts     int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	BackoffFactor   float64
	JitterFactor    float64
	RetryableErrors func(error) bool
}

RetryPolicy defines retry behavior

func DefaultRetryPolicy added in v0.10.1

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a default retry policy

type RetryStats added in v0.10.1

type RetryStats struct {
	JobName       string
	TotalAttempts int
	SuccessAfter  int // Number of retries before success (0 if first attempt succeeded)
	Failed        bool
	LastError     error
}

RetryStats tracks retry statistics for a job

type RetryableJob added in v0.10.1

type RetryableJob interface {
	Job
	GetRetryConfig() RetryConfig
}

RetryableJob interface for jobs that support retries

type RunJob

type RunJob struct {
	BareJob  `mapstructure:",squash"`
	Provider DockerProvider `json:"-"` // SDK-based Docker provider
	// User specifies the user to run the container as.
	// If not set, uses the global default-user setting (default: "nobody").
	// Set to "default" to explicitly use the container's default user, overriding global setting.
	User string `hash:"true"`

	// ContainerName specifies the name of the container to be created. If
	// nil, the job name will be used. If set to an empty string, Docker
	// will assign a random name.
	ContainerName *string `gcfg:"container-name" mapstructure:"container-name" hash:"true"`

	TTY bool `default:"false" hash:"true"`

	// do not use bool values with "default:true" because if
	// user would set it to "false" explicitly, it still will be
	// changed to "true" https://github.com/netresearch/ofelia/issues/135
	// so lets use strings here as workaround
	Delete string `default:"true" hash:"true"`
	Pull   string `default:"true" hash:"true"`

	Image       string   `hash:"true"`
	Network     string   `hash:"true"`
	Hostname    string   `hash:"true"`
	Entrypoint  *string  `hash:"true"`
	Container   string   `hash:"true"`
	Volume      []string `hash:"true"`
	VolumesFrom []string `gcfg:"volumes-from" mapstructure:"volumes-from," hash:"true"`
	Environment []string `mapstructure:"environment" hash:"true"`
	Annotations []string `mapstructure:"annotations" hash:"true"`

	MaxRuntime time.Duration `gcfg:"max-runtime" mapstructure:"max-runtime"`
	// contains filtered or unexported fields
}

func NewRunJob

func NewRunJob(provider DockerProvider) *RunJob

func (*RunJob) InitializeRuntimeFields added in v0.10.1

func (j *RunJob) InitializeRuntimeFields()

InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.

func (*RunJob) Middlewares

func (c *RunJob) Middlewares() []Middleware

func (*RunJob) ResetMiddlewares added in v0.9.0

func (c *RunJob) ResetMiddlewares(ms ...Middleware)

func (*RunJob) Run

func (j *RunJob) Run(ctx *Context) error

func (*RunJob) Use

func (c *RunJob) Use(ms ...Middleware)

func (*RunJob) Validate added in v0.16.0

func (j *RunJob) Validate() error

Validate checks that the job configuration is valid. For job-run, either Image or Container must be specified.

type RunServiceJob

type RunServiceJob struct {
	BareJob  `mapstructure:",squash"`
	Provider DockerProvider `json:"-"` // SDK-based Docker provider
	// User specifies the user to run the service as.
	// If not set, uses the global default-user setting (default: "nobody").
	// Set to "default" to explicitly use the container's default user, overriding global setting.
	User string `hash:"true"`
	TTY  bool   `default:"false" hash:"true"`
	// do not use bool values with "default:true" because if
	// user would set it to "false" explicitly, it still will be
	// changed to "true" https://github.com/netresearch/ofelia/issues/135
	// so lets use strings here as workaround
	Delete      string        `default:"true" hash:"true"`
	Image       string        `hash:"true"`
	Network     string        `hash:"true"`
	Annotations []string      `mapstructure:"annotations" hash:"true"`
	MaxRuntime  time.Duration `gcfg:"max-runtime" mapstructure:"max-runtime"`
}

func NewRunServiceJob

func NewRunServiceJob(provider DockerProvider) *RunServiceJob

func (*RunServiceJob) InitializeRuntimeFields added in v0.12.0

func (j *RunServiceJob) InitializeRuntimeFields()

InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.

func (*RunServiceJob) Middlewares

func (c *RunServiceJob) Middlewares() []Middleware

func (*RunServiceJob) ResetMiddlewares added in v0.9.0

func (c *RunServiceJob) ResetMiddlewares(ms ...Middleware)

func (*RunServiceJob) Run

func (j *RunServiceJob) Run(ctx *Context) error

func (*RunServiceJob) Use

func (c *RunServiceJob) Use(ms ...Middleware)

func (*RunServiceJob) Validate added in v0.16.0

func (j *RunServiceJob) Validate() error

Validate checks that the job configuration is valid. For job-service-run, Image is required.

type SDKDockerProvider added in v0.12.0

type SDKDockerProvider struct {
	// contains filtered or unexported fields
}

SDKDockerProvider implements DockerProvider using the official Docker SDK.

func NewSDKDockerProvider added in v0.12.0

func NewSDKDockerProvider(cfg *SDKDockerProviderConfig) (*SDKDockerProvider, error)

NewSDKDockerProvider creates a new SDK-based Docker provider.

func NewSDKDockerProviderDefault added in v0.12.0

func NewSDKDockerProviderDefault() (*SDKDockerProvider, error)

NewSDKDockerProviderDefault creates a provider with default settings.

func NewSDKDockerProviderFromClient added in v0.12.0

func NewSDKDockerProviderFromClient(client ports.DockerClient, logger *slog.Logger, metricsRecorder MetricsRecorder) *SDKDockerProvider

NewSDKDockerProviderFromClient creates a provider from an existing client.

func (*SDKDockerProvider) Close added in v0.12.0

func (p *SDKDockerProvider) Close() error

Close closes the Docker client.

func (*SDKDockerProvider) ConnectNetwork added in v0.12.0

func (p *SDKDockerProvider) ConnectNetwork(ctx context.Context, networkID, containerID string) error

ConnectNetwork connects a container to a network.

func (*SDKDockerProvider) CreateContainer added in v0.12.0

func (p *SDKDockerProvider) CreateContainer(ctx context.Context, config *domain.ContainerConfig, name string) (string, error)

CreateContainer creates a new container.

func (*SDKDockerProvider) CreateExec added in v0.12.0

func (p *SDKDockerProvider) CreateExec(ctx context.Context, containerID string, config *domain.ExecConfig) (string, error)

CreateExec creates an exec instance.

func (*SDKDockerProvider) CreateService added in v0.12.0

CreateService creates a new Swarm service.

func (*SDKDockerProvider) EnsureImage added in v0.12.0

func (p *SDKDockerProvider) EnsureImage(ctx context.Context, image string, forcePull bool) error

EnsureImage ensures an image is available, pulling if necessary.

func (*SDKDockerProvider) FindNetworkByName added in v0.12.0

func (p *SDKDockerProvider) FindNetworkByName(ctx context.Context, networkName string) ([]domain.Network, error)

FindNetworkByName finds networks by name.

func (*SDKDockerProvider) GetContainerLogs added in v0.12.0

func (p *SDKDockerProvider) GetContainerLogs(ctx context.Context, containerID string, opts ContainerLogsOptions) (io.ReadCloser, error)

GetContainerLogs retrieves container logs.

func (*SDKDockerProvider) HasImageLocally added in v0.12.0

func (p *SDKDockerProvider) HasImageLocally(ctx context.Context, image string) (bool, error)

HasImageLocally checks if an image exists locally.

func (*SDKDockerProvider) Info added in v0.12.0

Info returns Docker system info.

func (*SDKDockerProvider) InspectContainer added in v0.12.0

func (p *SDKDockerProvider) InspectContainer(ctx context.Context, containerID string) (*domain.Container, error)

InspectContainer inspects a container.

func (*SDKDockerProvider) InspectExec added in v0.12.0

func (p *SDKDockerProvider) InspectExec(ctx context.Context, execID string) (*domain.ExecInspect, error)

InspectExec inspects an exec instance.

func (*SDKDockerProvider) InspectService added in v0.12.0

func (p *SDKDockerProvider) InspectService(ctx context.Context, serviceID string) (*domain.Service, error)

InspectService returns detailed information about a service.

func (*SDKDockerProvider) ListContainers added in v0.12.0

func (p *SDKDockerProvider) ListContainers(ctx context.Context, opts domain.ListOptions) ([]domain.Container, error)

ListContainers lists containers matching the options.

func (*SDKDockerProvider) ListTasks added in v0.12.0

func (p *SDKDockerProvider) ListTasks(ctx context.Context, opts domain.TaskListOptions) ([]domain.Task, error)

ListTasks lists tasks matching the filter options.

func (*SDKDockerProvider) Ping added in v0.12.0

func (p *SDKDockerProvider) Ping(ctx context.Context) error

Ping pings the Docker daemon.

func (*SDKDockerProvider) PullImage added in v0.12.0

func (p *SDKDockerProvider) PullImage(ctx context.Context, image string) error

PullImage pulls an image.

func (*SDKDockerProvider) RemoveContainer added in v0.12.0

func (p *SDKDockerProvider) RemoveContainer(ctx context.Context, containerID string, force bool) error

RemoveContainer removes a container.

func (*SDKDockerProvider) RemoveService added in v0.12.0

func (p *SDKDockerProvider) RemoveService(ctx context.Context, serviceID string) error

RemoveService removes a service.

func (*SDKDockerProvider) RunExec added in v0.12.0

func (p *SDKDockerProvider) RunExec(
	ctx context.Context, containerID string, config *domain.ExecConfig, stdout, stderr io.Writer,
) (int, error)

RunExec executes a command and waits for completion.

func (*SDKDockerProvider) StartContainer added in v0.12.0

func (p *SDKDockerProvider) StartContainer(ctx context.Context, containerID string) error

StartContainer starts a container.

func (*SDKDockerProvider) StartExec added in v0.12.0

StartExec starts an exec instance.

func (*SDKDockerProvider) StopContainer added in v0.12.0

func (p *SDKDockerProvider) StopContainer(ctx context.Context, containerID string, timeout *time.Duration) error

StopContainer stops a container.

func (*SDKDockerProvider) SubscribeEvents added in v0.12.0

func (p *SDKDockerProvider) SubscribeEvents(ctx context.Context, filter domain.EventFilter) (<-chan domain.Event, <-chan error)

SubscribeEvents subscribes to Docker events.

func (*SDKDockerProvider) WaitContainer added in v0.12.0

func (p *SDKDockerProvider) WaitContainer(ctx context.Context, containerID string) (int64, error)

WaitContainer waits for a container to exit.

func (*SDKDockerProvider) WaitForServiceTasks added in v0.12.0

func (p *SDKDockerProvider) WaitForServiceTasks(ctx context.Context, serviceID string, timeout time.Duration) ([]domain.Task, error)

WaitForServiceTasks waits for all tasks of a service to reach a terminal state.

type SDKDockerProviderConfig added in v0.12.0

type SDKDockerProviderConfig struct {
	// Host is the Docker host address (e.g., "unix:///var/run/docker.sock")
	Host string
	// Logger for operation logging
	Logger *slog.Logger
	// MetricsRecorder for metrics tracking
	MetricsRecorder MetricsRecorder
	// AuthProvider for registry authentication (optional)
	AuthProvider ports.AuthProvider
}

SDKDockerProviderConfig configures the SDK provider.

type Scheduler

type Scheduler struct {
	Jobs     []Job
	Removed  []Job
	Disabled []Job
	Logger   *slog.Logger
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(l *slog.Logger) *Scheduler

func NewSchedulerWithClock added in v0.17.0

func NewSchedulerWithClock(l *slog.Logger, cronClock *CronClock) *Scheduler

NewSchedulerWithClock creates a scheduler with a fake clock for testing. This allows tests to control time advancement without real waits.

func NewSchedulerWithMetrics added in v0.13.0

func NewSchedulerWithMetrics(l *slog.Logger, metricsRecorder MetricsRecorder) *Scheduler

NewSchedulerWithMetrics creates a scheduler with metrics (deprecated: use NewSchedulerWithOptions)

func NewSchedulerWithOptions added in v0.17.0

func NewSchedulerWithOptions(l *slog.Logger, metricsRecorder MetricsRecorder, minEveryInterval time.Duration) *Scheduler

NewSchedulerWithOptions creates a scheduler with configurable minimum interval. minEveryInterval of 0 uses the library default (1s). Use negative value to allow sub-second.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(j Job) error

func (*Scheduler) AddJobWithTags added in v0.13.0

func (s *Scheduler) AddJobWithTags(j Job, tags ...string) error

AddJobWithTags adds a job with optional tags for categorization. Tags can be used to group, filter, and remove related jobs. Jobs with @triggered/@manual/@none schedules are stored but not scheduled in cron.

func (*Scheduler) DisableJob added in v0.8.0

func (s *Scheduler) DisableJob(name string) error

DisableJob stops scheduling the job but keeps it for later enabling.

func (*Scheduler) EnableJob added in v0.8.0

func (s *Scheduler) EnableJob(name string) error

EnableJob schedules a previously disabled job. Uses go-cron's UpsertJob for atomic create-or-update, eliminating the previous polling retry loop that waited for RemoveByName to take effect.

func (*Scheduler) Entries added in v0.8.0

func (s *Scheduler) Entries() []cron.Entry

Entries returns all scheduled cron entries.

func (*Scheduler) GetDisabledJob added in v0.8.0

func (s *Scheduler) GetDisabledJob(name string) Job

GetDisabledJob returns a disabled job by name.

func (*Scheduler) GetDisabledJobs added in v0.8.0

func (s *Scheduler) GetDisabledJobs() []Job

GetDisabledJobs returns a copy of all disabled jobs.

func (*Scheduler) GetJob added in v0.8.0

func (s *Scheduler) GetJob(name string) Job

GetJob returns an active job by name.

func (*Scheduler) GetJobsByTag added in v0.13.0

func (s *Scheduler) GetJobsByTag(tag string) []Job

GetJobsByTag returns all jobs with the specified tag.

func (*Scheduler) GetRemovedJobs added in v0.8.0

func (s *Scheduler) GetRemovedJobs() []Job

GetRemovedJobs returns a copy of all jobs that were removed from the scheduler.

func (*Scheduler) IsJobRunning added in v0.20.0

func (s *Scheduler) IsJobRunning(name string) bool

IsJobRunning reports whether the named job has any invocations currently in flight. Returns false if no job with the given name exists or the scheduler has no cron instance.

func (*Scheduler) IsRunning

func (s *Scheduler) IsRunning() bool

IsRunning returns true if the scheduler is active. Delegates to go-cron's IsRunning() which is the authoritative source.

func (*Scheduler) Middlewares

func (c *Scheduler) Middlewares() []Middleware

func (*Scheduler) RemoveJob

func (s *Scheduler) RemoveJob(j Job) error

func (*Scheduler) RemoveJobsByTag added in v0.13.0

func (s *Scheduler) RemoveJobsByTag(tag string) int

RemoveJobsByTag removes all jobs with the specified tag. Returns the number of jobs removed.

func (*Scheduler) ResetMiddlewares added in v0.9.0

func (c *Scheduler) ResetMiddlewares(ms ...Middleware)

func (*Scheduler) RunJob added in v0.8.0

func (s *Scheduler) RunJob(ctx context.Context, jobName string) error

RunJob manually triggers a job by name. The provided context is propagated to the job's RunWithContext method and is available via Context.Ctx.

func (*Scheduler) SetClock added in v0.17.0

func (s *Scheduler) SetClock(c Clock)

func (*Scheduler) SetMaxConcurrentJobs added in v0.10.1

func (s *Scheduler) SetMaxConcurrentJobs(maxJobs int)

SetMaxConcurrentJobs configures the maximum number of concurrent jobs

func (*Scheduler) SetMetricsRecorder added in v0.10.1

func (s *Scheduler) SetMetricsRecorder(recorder MetricsRecorder)

func (*Scheduler) SetOnJobComplete added in v0.17.0

func (s *Scheduler) SetOnJobComplete(callback func(jobName string, success bool))

func (*Scheduler) Start

func (s *Scheduler) Start() error

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

func (*Scheduler) StopAndWait added in v0.13.0

func (s *Scheduler) StopAndWait()

StopAndWait stops the scheduler and waits indefinitely for all jobs to complete.

func (*Scheduler) StopWithTimeout added in v0.13.0

func (s *Scheduler) StopWithTimeout(timeout time.Duration) error

StopWithTimeout stops the scheduler with a graceful shutdown timeout. It stops accepting new jobs, then waits up to the timeout for running jobs to complete. Returns nil if all jobs completed, or an error if the timeout was exceeded.

func (*Scheduler) UpdateJob added in v0.20.0

func (s *Scheduler) UpdateJob(name string, newSchedule string, newJob Job) error

UpdateJob atomically replaces the schedule and job implementation for an existing named entry using go-cron's UpdateEntryJobByName. The old job's in-flight invocations complete before the new schedule takes effect (because go-cron serializes entry mutations through the scheduler goroutine).

Returns ErrJobNotFound if no active job with the given name exists.

func (*Scheduler) Use

func (c *Scheduler) Use(ms ...Middleware)

type ShutdownHook added in v0.10.1

type ShutdownHook struct {
	Name     string
	Priority int // Lower values execute first
	Hook     func(context.Context) error
}

ShutdownHook is a function to be called during shutdown

type ShutdownManager added in v0.10.1

type ShutdownManager struct {
	// contains filtered or unexported fields
}

ShutdownManager handles graceful shutdown of the application

func NewShutdownManager added in v0.10.1

func NewShutdownManager(logger *slog.Logger, timeout time.Duration) *ShutdownManager

NewShutdownManager creates a new shutdown manager

func (*ShutdownManager) IsShuttingDown added in v0.10.1

func (sm *ShutdownManager) IsShuttingDown() bool

IsShuttingDown returns true if shutdown is in progress

func (*ShutdownManager) ListenForShutdown added in v0.10.1

func (sm *ShutdownManager) ListenForShutdown()

ListenForShutdown starts listening for shutdown signals

func (*ShutdownManager) RegisterHook added in v0.10.1

func (sm *ShutdownManager) RegisterHook(hook ShutdownHook)

RegisterHook registers a shutdown hook

func (*ShutdownManager) Shutdown added in v0.10.1

func (sm *ShutdownManager) Shutdown() error

Shutdown initiates graceful shutdown

func (*ShutdownManager) ShutdownChan added in v0.10.1

func (sm *ShutdownManager) ShutdownChan() <-chan struct{}

ShutdownChan returns a channel that's closed when shutdown starts

type SimpleMetricsRecorder added in v0.10.1

type SimpleMetricsRecorder struct {
	// contains filtered or unexported fields
}

SimpleMetricsRecorder provides a basic implementation of JobMetricsRecorder

func NewSimpleMetricsRecorder added in v0.10.1

func NewSimpleMetricsRecorder() *SimpleMetricsRecorder

NewSimpleMetricsRecorder creates a new simple metrics recorder

func (*SimpleMetricsRecorder) GetMetrics added in v0.10.1

func (smr *SimpleMetricsRecorder) GetMetrics() map[string]any

GetMetrics returns all recorded metrics

func (*SimpleMetricsRecorder) RecordJobExecution added in v0.10.1

func (smr *SimpleMetricsRecorder) RecordJobExecution(jobName string, success bool, duration time.Duration)

RecordJobExecution records job execution metrics

func (*SimpleMetricsRecorder) RecordMetric added in v0.10.1

func (smr *SimpleMetricsRecorder) RecordMetric(name string, value any)

RecordMetric records a generic metric

func (*SimpleMetricsRecorder) RecordRetryAttempt added in v0.10.1

func (smr *SimpleMetricsRecorder) RecordRetryAttempt(jobName string, attempt int, success bool)

RecordRetryAttempt records retry attempt metrics

type TestContainerMonitor added in v0.10.1

type TestContainerMonitor struct {
	// contains filtered or unexported fields
}

TestContainerMonitor provides a test interface to simulate container monitoring

func (*TestContainerMonitor) SetUseEventsAPI added in v0.10.1

func (t *TestContainerMonitor) SetUseEventsAPI(use bool)

func (*TestContainerMonitor) WaitForContainer added in v0.10.1

func (t *TestContainerMonitor) WaitForContainer(containerID string, maxRuntime time.Duration) (*domain.ContainerState, error)

type Ticker added in v0.17.0

type Ticker interface {
	C() <-chan time.Time
	Stop()
}

type Timer added in v0.17.0

type Timer interface {
	C() <-chan time.Time
	Stop() bool
	Reset(d time.Duration) bool
}

Timer represents a single event timer, compatible with go-cron's Timer interface. It provides the same operations as time.Timer.

type WorkflowExecution added in v0.10.1

type WorkflowExecution struct {
	ID            string
	StartTime     time.Time
	CompletedJobs map[string]bool
	FailedJobs    map[string]bool
	RunningJobs   map[string]bool
	// contains filtered or unexported fields
}

WorkflowExecution tracks the state of a workflow execution

type WorkflowOrchestrator added in v0.10.1

type WorkflowOrchestrator struct {
	// contains filtered or unexported fields
}

WorkflowOrchestrator manages job dependencies and workflow execution

func NewWorkflowOrchestrator added in v0.10.1

func NewWorkflowOrchestrator(scheduler *Scheduler, logger *slog.Logger) *WorkflowOrchestrator

NewWorkflowOrchestrator creates a new workflow orchestrator

func (*WorkflowOrchestrator) BuildDependencyGraph added in v0.10.1

func (wo *WorkflowOrchestrator) BuildDependencyGraph(jobs []Job) error

BuildDependencyGraph builds the dependency graph from jobs

func (*WorkflowOrchestrator) CanExecute added in v0.10.1

func (wo *WorkflowOrchestrator) CanExecute(jobName string, executionID string) bool

CanExecute checks if a job can execute based on its dependencies

func (*WorkflowOrchestrator) CleanupOldExecutions added in v0.10.1

func (wo *WorkflowOrchestrator) CleanupOldExecutions(maxAge time.Duration)

CleanupOldExecutions removes old workflow executions

func (*WorkflowOrchestrator) GetWorkflowStatus added in v0.10.1

func (wo *WorkflowOrchestrator) GetWorkflowStatus(executionID string) map[string]any

GetWorkflowStatus returns the status of a workflow execution

func (*WorkflowOrchestrator) JobCompleted added in v0.10.1

func (wo *WorkflowOrchestrator) JobCompleted(ctx context.Context, jobName string, executionID string, success bool)

JobCompleted marks a job as completed and triggers dependent jobs

func (*WorkflowOrchestrator) JobStarted added in v0.10.1

func (wo *WorkflowOrchestrator) JobStarted(jobName string, executionID string)

JobStarted marks a job as started in the workflow

Directories

Path Synopsis
adapters
docker
Package docker provides an adapter for the official Docker SDK.
Package docker provides an adapter for the official Docker SDK.
mock
Package mock provides mock implementations of the ports interfaces for testing.
Package mock provides mock implementations of the ports interfaces for testing.
Package domain contains SDK-agnostic domain models for Docker operations.
Package domain contains SDK-agnostic domain models for Docker operations.
Package ports defines the port interfaces for Docker operations.
Package ports defines the port interfaces for Docker operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL