Documentation
¶
Index ¶
- Constants
- Variables
- func GetHash(t reflect.Type, v reflect.Value, hash *string) error
- func IsNonZeroExitError(err error) bool
- func IsRetryableError(err error) bool
- func IsTriggeredSchedule(schedule string) bool
- func Retry(ctx context.Context, policy *RetryPolicy, fn func() error) error
- func SetDefaultClock(c Clock)
- func SetGlobalBufferPoolLogger(logger *slog.Logger)
- func WrapContainerError(op string, containerID string, err error) error
- func WrapImageError(op string, image string, err error) error
- func WrapJobError(op string, jobName string, err error) error
- func WrapServiceError(op string, serviceID string, err error) error
- type BareJob
- func (j *BareJob) GetCommand() string
- func (j *BareJob) GetCronJobID() uint64
- func (j *BareJob) GetHistory() []*Execution
- func (j *BareJob) GetLastRun() *Execution
- func (j *BareJob) GetName() string
- func (j *BareJob) GetRetryConfig() RetryConfig
- func (j *BareJob) GetSchedule() string
- func (j *BareJob) Hash() (string, error)
- func (c *BareJob) Middlewares() []Middleware
- func (j *BareJob) NotifyStart()
- func (j *BareJob) NotifyStop()
- func (c *BareJob) ResetMiddlewares(ms ...Middleware)
- func (j *BareJob) Run(ctx *Context) error
- func (j *BareJob) Running() int32
- func (j *BareJob) SetCronJobID(id uint64)
- func (j *BareJob) SetLastRun(e *Execution)
- func (j *BareJob) ShouldRunOnStartup() bool
- func (c *BareJob) Use(ms ...Middleware)
- type Bulkhead
- type CircuitBreaker
- type CircuitBreakerState
- type Clock
- type ComposeJob
- type ContainerLogsOptions
- type Context
- type CronClock
- type CronUtils
- type DependencyNode
- type DockerProvider
- type EnhancedBufferPool
- func (ebp *EnhancedBufferPool) Get() (*circbuf.Buffer, error)
- func (ebp *EnhancedBufferPool) GetSized(requestedSize int64) (*circbuf.Buffer, error)
- func (ebp *EnhancedBufferPool) GetStats() map[string]any
- func (ebp *EnhancedBufferPool) Put(buf *circbuf.Buffer)
- func (ebp *EnhancedBufferPool) Shutdown()
- type EnhancedBufferPoolConfig
- type ExecJob
- func (j *ExecJob) InitializeRuntimeFields()
- func (c *ExecJob) Middlewares() []Middleware
- func (c *ExecJob) ResetMiddlewares(ms ...Middleware)
- func (j *ExecJob) Run(ctx *Context) error
- func (j *ExecJob) RunWithStreams(ctx context.Context, stdout, stderr io.Writer) (int, error)
- func (c *ExecJob) Use(ms ...Middleware)
- type Execution
- type FakeClock
- func (c *FakeClock) Advance(d time.Duration)
- func (c *FakeClock) After(d time.Duration) <-chan time.Time
- func (c *FakeClock) NewTicker(d time.Duration) Ticker
- func (c *FakeClock) NewTimer(d time.Duration) Timer
- func (c *FakeClock) Now() time.Time
- func (c *FakeClock) Set(t time.Time)
- func (c *FakeClock) Sleep(d time.Duration)
- func (c *FakeClock) TickerCount() int
- func (c *FakeClock) WaitForAdvance()
- type GracefulScheduler
- type GracefulServer
- type Job
- type JobMetrics
- type JobMetricsRecorder
- type LatencyTracker
- type LocalJob
- type MetricsRecorder
- type Middleware
- type NonZeroExitError
- type PerformanceMetrics
- func (pm *PerformanceMetrics) GetDockerMetrics() map[string]any
- func (pm *PerformanceMetrics) GetJobMetrics() map[string]any
- func (pm *PerformanceMetrics) GetMetrics() map[string]any
- func (pm *PerformanceMetrics) GetSummaryReport() string
- func (pm *PerformanceMetrics) RecordBufferPoolStats(stats map[string]any)
- func (pm *PerformanceMetrics) RecordConcurrentJobs(count int64)
- func (pm *PerformanceMetrics) RecordContainerEvent()
- func (pm *PerformanceMetrics) RecordContainerMonitorFallback()
- func (pm *PerformanceMetrics) RecordContainerMonitorMethod(usingEvents bool)
- func (pm *PerformanceMetrics) RecordContainerWaitDuration(seconds float64)
- func (pm *PerformanceMetrics) RecordCustomMetric(name string, value any)
- func (pm *PerformanceMetrics) RecordDockerError(operation string)
- func (pm *PerformanceMetrics) RecordDockerLatency(operation string, duration time.Duration)
- func (pm *PerformanceMetrics) RecordDockerOperation(operation string)
- func (pm *PerformanceMetrics) RecordJobComplete(jobName string, durationSeconds float64, panicked bool)
- func (pm *PerformanceMetrics) RecordJobExecution(jobName string, duration time.Duration, success bool)
- func (pm *PerformanceMetrics) RecordJobRetry(jobName string, attempt int, success bool)
- func (pm *PerformanceMetrics) RecordJobScheduled(jobName string)
- func (pm *PerformanceMetrics) RecordJobSkipped(jobName string, reason string)
- func (pm *PerformanceMetrics) RecordJobStart(jobName string)
- func (pm *PerformanceMetrics) RecordMemoryUsage(bytes int64)
- func (pm *PerformanceMetrics) Reset()
- type PerformanceRecorder
- type RateLimiter
- type ResilientJobExecutor
- func (rje *ResilientJobExecutor) Execute(ctx *Context) error
- func (rje *ResilientJobExecutor) GetCircuitBreakerState() CircuitBreakerState
- func (rje *ResilientJobExecutor) ResetCircuitBreaker()
- func (rje *ResilientJobExecutor) SetBulkhead(bh *Bulkhead)
- func (rje *ResilientJobExecutor) SetCircuitBreaker(cb *CircuitBreaker)
- func (rje *ResilientJobExecutor) SetMetricsRecorder(metrics JobMetricsRecorder)
- func (rje *ResilientJobExecutor) SetRateLimiter(rl *RateLimiter)
- func (rje *ResilientJobExecutor) SetRetryPolicy(policy *RetryPolicy)
- type RetryConfig
- type RetryExecutor
- type RetryMetrics
- type RetryPolicy
- type RetryStats
- type RetryableJob
- type RunJob
- type RunServiceJob
- type SDKDockerProvider
- func (p *SDKDockerProvider) Close() error
- func (p *SDKDockerProvider) ConnectNetwork(ctx context.Context, networkID, containerID string) error
- func (p *SDKDockerProvider) CreateContainer(ctx context.Context, config *domain.ContainerConfig, name string) (string, error)
- func (p *SDKDockerProvider) CreateExec(ctx context.Context, containerID string, config *domain.ExecConfig) (string, error)
- func (p *SDKDockerProvider) CreateService(ctx context.Context, spec domain.ServiceSpec, opts domain.ServiceCreateOptions) (string, error)
- func (p *SDKDockerProvider) EnsureImage(ctx context.Context, image string, forcePull bool) error
- func (p *SDKDockerProvider) FindNetworkByName(ctx context.Context, networkName string) ([]domain.Network, error)
- func (p *SDKDockerProvider) GetContainerLogs(ctx context.Context, containerID string, opts ContainerLogsOptions) (io.ReadCloser, error)
- func (p *SDKDockerProvider) HasImageLocally(ctx context.Context, image string) (bool, error)
- func (p *SDKDockerProvider) Info(ctx context.Context) (*domain.SystemInfo, error)
- func (p *SDKDockerProvider) InspectContainer(ctx context.Context, containerID string) (*domain.Container, error)
- func (p *SDKDockerProvider) InspectExec(ctx context.Context, execID string) (*domain.ExecInspect, error)
- func (p *SDKDockerProvider) InspectService(ctx context.Context, serviceID string) (*domain.Service, error)
- func (p *SDKDockerProvider) ListContainers(ctx context.Context, opts domain.ListOptions) ([]domain.Container, error)
- func (p *SDKDockerProvider) ListTasks(ctx context.Context, opts domain.TaskListOptions) ([]domain.Task, error)
- func (p *SDKDockerProvider) Ping(ctx context.Context) error
- func (p *SDKDockerProvider) PullImage(ctx context.Context, image string) error
- func (p *SDKDockerProvider) RemoveContainer(ctx context.Context, containerID string, force bool) error
- func (p *SDKDockerProvider) RemoveService(ctx context.Context, serviceID string) error
- func (p *SDKDockerProvider) RunExec(ctx context.Context, containerID string, config *domain.ExecConfig, ...) (int, error)
- func (p *SDKDockerProvider) StartContainer(ctx context.Context, containerID string) error
- func (p *SDKDockerProvider) StartExec(ctx context.Context, execID string, opts domain.ExecStartOptions) (*domain.HijackedResponse, error)
- func (p *SDKDockerProvider) StopContainer(ctx context.Context, containerID string, timeout *time.Duration) error
- func (p *SDKDockerProvider) SubscribeEvents(ctx context.Context, filter domain.EventFilter) (<-chan domain.Event, <-chan error)
- func (p *SDKDockerProvider) WaitContainer(ctx context.Context, containerID string) (int64, error)
- func (p *SDKDockerProvider) WaitForServiceTasks(ctx context.Context, serviceID string, timeout time.Duration) ([]domain.Task, error)
- type SDKDockerProviderConfig
- type Scheduler
- func NewScheduler(l *slog.Logger) *Scheduler
- func NewSchedulerWithClock(l *slog.Logger, cronClock *CronClock) *Scheduler
- func NewSchedulerWithMetrics(l *slog.Logger, metricsRecorder MetricsRecorder) *Scheduler
- func NewSchedulerWithOptions(l *slog.Logger, metricsRecorder MetricsRecorder, ...) *Scheduler
- func (s *Scheduler) AddJob(j Job) error
- func (s *Scheduler) AddJobWithTags(j Job, tags ...string) error
- func (s *Scheduler) DisableJob(name string) error
- func (s *Scheduler) EnableJob(name string) error
- func (s *Scheduler) Entries() []cron.Entry
- func (s *Scheduler) GetDisabledJob(name string) Job
- func (s *Scheduler) GetDisabledJobs() []Job
- func (s *Scheduler) GetJob(name string) Job
- func (s *Scheduler) GetJobsByTag(tag string) []Job
- func (s *Scheduler) GetRemovedJobs() []Job
- func (s *Scheduler) IsJobRunning(name string) bool
- func (s *Scheduler) IsRunning() bool
- func (c *Scheduler) Middlewares() []Middleware
- func (s *Scheduler) RemoveJob(j Job) error
- func (s *Scheduler) RemoveJobsByTag(tag string) int
- func (c *Scheduler) ResetMiddlewares(ms ...Middleware)
- func (s *Scheduler) RunJob(ctx context.Context, jobName string) error
- func (s *Scheduler) SetClock(c Clock)
- func (s *Scheduler) SetMaxConcurrentJobs(maxJobs int)
- func (s *Scheduler) SetMetricsRecorder(recorder MetricsRecorder)
- func (s *Scheduler) SetOnJobComplete(callback func(jobName string, success bool))
- func (s *Scheduler) Start() error
- func (s *Scheduler) Stop() error
- func (s *Scheduler) StopAndWait()
- func (s *Scheduler) StopWithTimeout(timeout time.Duration) error
- func (s *Scheduler) UpdateJob(name string, newSchedule string, newJob Job) error
- func (c *Scheduler) Use(ms ...Middleware)
- type ShutdownHook
- type ShutdownManager
- type SimpleMetricsRecorder
- func (smr *SimpleMetricsRecorder) GetMetrics() map[string]any
- func (smr *SimpleMetricsRecorder) RecordJobExecution(jobName string, success bool, duration time.Duration)
- func (smr *SimpleMetricsRecorder) RecordMetric(name string, value any)
- func (smr *SimpleMetricsRecorder) RecordRetryAttempt(jobName string, attempt int, success bool)
- type TestContainerMonitor
- type Ticker
- type Timer
- type WorkflowExecution
- type WorkflowOrchestrator
- func (wo *WorkflowOrchestrator) BuildDependencyGraph(jobs []Job) error
- func (wo *WorkflowOrchestrator) CanExecute(jobName string, executionID string) bool
- func (wo *WorkflowOrchestrator) CleanupOldExecutions(maxAge time.Duration)
- func (wo *WorkflowOrchestrator) GetWorkflowStatus(executionID string) map[string]any
- func (wo *WorkflowOrchestrator) JobCompleted(ctx context.Context, jobName string, executionID string, success bool)
- func (wo *WorkflowOrchestrator) JobStarted(jobName string, executionID string)
Constants ¶
const ( // Exit codes for swarm service execution states // These are Ofelia-specific codes, not from Docker Swarm API // They indicate failure modes that don't map to container exit codes ExitCodeSwarmError = -999 // Swarm orchestration error (task not found, service unavailable) ExitCodeTimeout = -998 // Max runtime exceeded before task completion )
const DefaultStopTimeout = 30 * time.Second
DefaultStopTimeout is the default timeout for graceful shutdown.
const HashmeTagName = "hash"
const TriggeredSchedule = "@triggered"
TriggeredSchedule is a special schedule keyword for jobs that should only run when triggered by another job's on-success/on-failure, or manually via RunJob(). Jobs with this schedule are not added to the cron scheduler.
Variables ¶
var ( // Container errors ErrContainerNotFound = errors.New("container not found") ErrContainerStartFailed = errors.New("failed to start container") ErrContainerCreateFailed = errors.New("failed to create container") ErrContainerStopFailed = errors.New("failed to stop container") ErrContainerRemoveFailed = errors.New("failed to remove container") // Image errors ErrImageNotFound = errors.New("image not found") ErrImagePullFailed = errors.New("failed to pull image") ErrLocalImageNotFound = errors.New("local image not found") // Service errors ErrServiceNotFound = errors.New("service not found") ErrServiceCreateFailed = errors.New("failed to create service") ErrServiceStartFailed = errors.New("failed to start service") ErrServiceRemoveFailed = errors.New("failed to remove service") // Job errors ErrJobNotFound = errors.New("job not found") ErrJobAlreadyExists = errors.New("job already exists") ErrJobExecution = errors.New("job execution failed") ErrMaxTimeRunning = errors.New("max runtime exceeded") ErrUnexpected = errors.New("unexpected error") // Workflow errors ErrCircularDependency = errors.New("circular dependency detected") ErrDependencyNotMet = errors.New("job dependencies not met") ErrWorkflowInvalid = errors.New("invalid workflow configuration") // Validation errors ErrEmptyCommand = errors.New("command cannot be empty") ErrUnsupportedFieldType = errors.New("unsupported field type") ErrImageOrContainer = errors.New("job-run requires either 'image' or 'container'") ErrImageRequired = errors.New("job-service-run requires 'image' to create a new swarm service") // Scheduler errors ErrSchedulerTimeout = errors.New("scheduler stop timed out") // Shutdown errors ErrShutdownInProgress = errors.New("shutdown already in progress") ErrShutdownTimeout = errors.New("shutdown timed out") ErrJobCanceled = errors.New("job canceled: shutdown in progress") ErrCannotStartJob = errors.New("cannot start job during shutdown") ErrWaitTimeout = errors.New("timeout waiting for jobs to complete") // Resilience errors ErrCircuitBreakerOpen = errors.New("circuit breaker is open") ErrCircuitBreakerHalfOpen = errors.New("circuit breaker is half-open but max calls reached") ErrCircuitBreakerUnknown = errors.New("circuit breaker is in unknown state") ErrRateLimitExceeded = errors.New("rate limit exceeded") ErrTokensExceedCapacity = errors.New("requested tokens exceed capacity") ErrBulkheadFull = errors.New("bulkhead is full") // Docker SDK errors ErrResponseChannelClosed = errors.New("response channel closed unexpectedly") )
Common errors used across the package
var ( ErrEmptyScheduler = errors.New("unable to start an empty scheduler") ErrEmptySchedule = errors.New("unable to add a job with an empty schedule") )
var ( // DefaultBufferPool provides enhanced performance for job execution // This replaces the simple buffer pool with multi-tier adaptive pooling // Note: ShrinkInterval is set to 0 to prevent background goroutine at package init DefaultBufferPool = func() *EnhancedBufferPool { cfg := DefaultEnhancedBufferPoolConfig() cfg.ShrinkInterval = 0 cfg.EnablePrewarming = false cfg.PoolSize = 0 return NewEnhancedBufferPool(cfg, nil) }() )
Global enhanced buffer pool instance
var ErrSkippedExecution = errors.New("skipped execution")
ErrSkippedExecution pass this error to `Execution.Stop` if you wish to mark it as skipped.
var GlobalPerformanceMetrics = NewPerformanceMetrics()
Global enhanced metrics instance
var Version = "dev"
Version is the Ofelia version, set via ldflags during build. Defaults to "dev" if not set.
Functions ¶
func IsNonZeroExitError ¶ added in v0.10.1
IsNonZeroExitError checks if the error is a non-zero exit code error
func IsRetryableError ¶ added in v0.10.1
IsRetryableError checks if an error should trigger a retry
func IsTriggeredSchedule ¶ added in v0.14.0
IsTriggeredSchedule returns true if the schedule indicates the job should only run when triggered (not on a time-based schedule).
func Retry ¶ added in v0.10.1
func Retry(ctx context.Context, policy *RetryPolicy, fn func() error) error
Retry executes a function with retry logic
func SetDefaultClock ¶ added in v0.17.0
func SetDefaultClock(c Clock)
func SetGlobalBufferPoolLogger ¶ added in v0.11.0
SetGlobalBufferPoolLogger sets the logger for the global buffer pool
func WrapContainerError ¶ added in v0.10.1
WrapContainerError wraps a container-related error with context
func WrapImageError ¶ added in v0.10.1
WrapImageError wraps an image-related error with context
func WrapJobError ¶ added in v0.10.1
WrapJobError wraps a job-related error with context
Types ¶
type BareJob ¶
type BareJob struct {
Schedule string `hash:"true"`
Name string `hash:"true"`
Command string `hash:"true"`
// RunOnStartup controls whether the job is executed once immediately when the scheduler starts,
// before regular cron-based scheduling begins. This is a boolean flag with a default value of false.
// Startup executions are dispatched in non-blocking goroutines so they do not delay scheduler startup.
RunOnStartup bool `default:"false" gcfg:"run-on-startup" mapstructure:"run-on-startup" hash:"true"`
HistoryLimit int `default:"10"`
MaxRetries int `default:"0"` // Maximum number of retry attempts (0 = no retries)
RetryDelayMs int `default:"1000"` // Initial retry delay in milliseconds
RetryExponential bool `default:"true"` // Use exponential backoff for retries
RetryMaxDelayMs int `default:"60000"` // Maximum retry delay in milliseconds (1 minute)
Dependencies []string `gcfg:"depends-on" mapstructure:"depends-on,"` // Jobs that must complete first
OnSuccess []string `gcfg:"on-success" mapstructure:"on-success,"` // Jobs to trigger on success
OnFailure []string `gcfg:"on-failure" mapstructure:"on-failure,"` // Jobs to trigger on failure
AllowParallel bool `default:"true"` // Allow job to run in parallel with others
// contains filtered or unexported fields
}
func (*BareJob) GetCommand ¶
func (*BareJob) GetCronJobID ¶
func (*BareJob) GetHistory ¶ added in v0.8.0
GetHistory returns a copy of the job's execution history.
func (*BareJob) GetLastRun ¶ added in v0.8.0
GetLastRun returns the last execution of the job, if any.
func (*BareJob) GetRetryConfig ¶ added in v0.10.1
func (j *BareJob) GetRetryConfig() RetryConfig
GetRetryConfig returns the retry configuration for the job
func (*BareJob) GetSchedule ¶
func (*BareJob) Middlewares ¶
func (c *BareJob) Middlewares() []Middleware
func (*BareJob) NotifyStart ¶
func (j *BareJob) NotifyStart()
func (*BareJob) NotifyStop ¶
func (j *BareJob) NotifyStop()
func (*BareJob) ResetMiddlewares ¶ added in v0.9.0
func (c *BareJob) ResetMiddlewares(ms ...Middleware)
func (*BareJob) Run ¶ added in v0.10.1
Run implements the Job interface - this is handled by jobWrapper
func (*BareJob) SetCronJobID ¶
func (*BareJob) SetLastRun ¶ added in v0.8.0
SetLastRun stores the last executed run for the job.
func (*BareJob) ShouldRunOnStartup ¶ added in v0.18.0
ShouldRunOnStartup returns true if the job should run immediately when the scheduler starts.
func (*BareJob) Use ¶
func (c *BareJob) Use(ms ...Middleware)
type Bulkhead ¶ added in v0.10.1
type Bulkhead struct {
// contains filtered or unexported fields
}
Bulkhead implements the bulkhead pattern for resource isolation
func NewBulkhead ¶ added in v0.10.1
NewBulkhead creates a new bulkhead
func (*Bulkhead) GetMetrics ¶ added in v0.10.1
GetMetrics returns bulkhead metrics
type CircuitBreaker ¶ added in v0.10.1
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶ added in v0.10.1
func NewCircuitBreaker(name string, maxFailures uint32, resetTimeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Execute ¶ added in v0.10.1
func (cb *CircuitBreaker) Execute(fn func() error) error
Execute runs a function through the circuit breaker
func (*CircuitBreaker) GetMetrics ¶ added in v0.10.1
func (cb *CircuitBreaker) GetMetrics() map[string]any
GetMetrics returns circuit breaker metrics
func (*CircuitBreaker) GetState ¶ added in v0.10.1
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current state of the circuit breaker
type CircuitBreakerState ¶ added in v0.10.1
type CircuitBreakerState int
CircuitBreakerState represents the state of a circuit breaker
const ( StateClosed CircuitBreakerState = iota StateOpen StateHalfOpen )
func (CircuitBreakerState) String ¶ added in v0.10.1
func (s CircuitBreakerState) String() string
type Clock ¶ added in v0.17.0
type Clock interface {
Now() time.Time
NewTicker(d time.Duration) Ticker
NewTimer(d time.Duration) Timer
After(d time.Duration) <-chan time.Time
Sleep(d time.Duration)
}
func GetDefaultClock ¶ added in v0.17.0
func GetDefaultClock() Clock
func NewRealClock ¶ added in v0.17.0
func NewRealClock() Clock
type ComposeJob ¶ added in v0.9.0
type ComposeJob struct {
BareJob `mapstructure:",squash"`
File string `default:"compose.yml" gcfg:"file" mapstructure:"file" hash:"true"`
Service string `gcfg:"service" mapstructure:"service" hash:"true"`
Exec bool `default:"false" gcfg:"exec" mapstructure:"exec" hash:"true"`
}
func NewComposeJob ¶ added in v0.9.0
func NewComposeJob() *ComposeJob
func (*ComposeJob) Middlewares ¶ added in v0.9.0
func (c *ComposeJob) Middlewares() []Middleware
func (*ComposeJob) ResetMiddlewares ¶ added in v0.9.0
func (c *ComposeJob) ResetMiddlewares(ms ...Middleware)
func (*ComposeJob) Run ¶ added in v0.9.0
func (j *ComposeJob) Run(ctx *Context) error
func (*ComposeJob) Use ¶ added in v0.9.0
func (c *ComposeJob) Use(ms ...Middleware)
type ContainerLogsOptions ¶ added in v0.12.0
type ContainerLogsOptions struct {
ShowStdout bool
ShowStderr bool
Since time.Time
Tail string
Follow bool
}
ContainerLogsOptions defines options for container log retrieval.
type Context ¶
type Context struct {
Scheduler *Scheduler
Logger *slog.Logger
Job Job
Execution *Execution
Ctx context.Context //nolint:containedctx // intentional: propagates go-cron's per-entry context through middleware chain
// contains filtered or unexported fields
}
func NewContextWithContext ¶ added in v0.20.0
NewContextWithContext creates a Context with a specific context.Context, typically the per-entry context provided by go-cron's JobWithContext interface.
type CronClock ¶ added in v0.17.0
type CronClock struct {
*FakeClock
}
func NewCronClock ¶ added in v0.17.0
type DependencyNode ¶ added in v0.10.1
type DependencyNode struct {
Job Job
Dependencies []string // Job names this job depends on
Dependents []string // Job names that depend on this job
OnSuccess []string // Jobs to trigger on success
OnFailure []string // Jobs to trigger on failure
}
DependencyNode represents a job in the dependency graph
type DockerProvider ¶ added in v0.12.0
type DockerProvider interface {
// Container operations
CreateContainer(ctx context.Context, config *domain.ContainerConfig, name string) (string, error)
StartContainer(ctx context.Context, containerID string) error
StopContainer(ctx context.Context, containerID string, timeout *time.Duration) error
RemoveContainer(ctx context.Context, containerID string, force bool) error
InspectContainer(ctx context.Context, containerID string) (*domain.Container, error)
ListContainers(ctx context.Context, opts domain.ListOptions) ([]domain.Container, error)
WaitContainer(ctx context.Context, containerID string) (int64, error)
GetContainerLogs(ctx context.Context, containerID string, opts ContainerLogsOptions) (io.ReadCloser, error)
// Exec operations
CreateExec(ctx context.Context, containerID string, config *domain.ExecConfig) (string, error)
StartExec(ctx context.Context, execID string, opts domain.ExecStartOptions) (*domain.HijackedResponse, error)
InspectExec(ctx context.Context, execID string) (*domain.ExecInspect, error)
RunExec(ctx context.Context, containerID string, config *domain.ExecConfig, stdout, stderr io.Writer) (int, error)
// Image operations
PullImage(ctx context.Context, image string) error
HasImageLocally(ctx context.Context, image string) (bool, error)
EnsureImage(ctx context.Context, image string, forcePull bool) error
// Network operations
ConnectNetwork(ctx context.Context, networkID, containerID string) error
FindNetworkByName(ctx context.Context, networkName string) ([]domain.Network, error)
// Event operations
SubscribeEvents(ctx context.Context, filter domain.EventFilter) (<-chan domain.Event, <-chan error)
// Service operations (Swarm)
CreateService(ctx context.Context, spec domain.ServiceSpec, opts domain.ServiceCreateOptions) (string, error)
InspectService(ctx context.Context, serviceID string) (*domain.Service, error)
ListTasks(ctx context.Context, opts domain.TaskListOptions) ([]domain.Task, error)
RemoveService(ctx context.Context, serviceID string) error
WaitForServiceTasks(ctx context.Context, serviceID string, timeout time.Duration) ([]domain.Task, error)
// System operations
Info(ctx context.Context) (*domain.SystemInfo, error)
Ping(ctx context.Context) error
// Lifecycle
Close() error
}
DockerProvider defines the interface for Docker operations. The SDK adapter implements this interface.
type EnhancedBufferPool ¶ added in v0.11.0
type EnhancedBufferPool struct {
// contains filtered or unexported fields
}
EnhancedBufferPool provides high-performance buffer management with adaptive sizing
func NewBufferPool ¶ added in v0.10.1
func NewBufferPool(minSize, defaultSize, maxSize int64) *EnhancedBufferPool
NewBufferPool is a compatibility wrapper for tests and old code It returns the enhanced buffer pool with custom configuration
func NewEnhancedBufferPool ¶ added in v0.11.0
func NewEnhancedBufferPool(config *EnhancedBufferPoolConfig, logger *slog.Logger) *EnhancedBufferPool
NewEnhancedBufferPool creates a new enhanced buffer pool with adaptive management
func (*EnhancedBufferPool) Get ¶ added in v0.11.0
func (ebp *EnhancedBufferPool) Get() (*circbuf.Buffer, error)
Get retrieves a buffer from the pool, optimized for high concurrency
func (*EnhancedBufferPool) GetSized ¶ added in v0.11.0
func (ebp *EnhancedBufferPool) GetSized(requestedSize int64) (*circbuf.Buffer, error)
GetSized retrieves a buffer with a specific size requirement, with intelligent size selection
func (*EnhancedBufferPool) GetStats ¶ added in v0.11.0
func (ebp *EnhancedBufferPool) GetStats() map[string]any
GetStats returns comprehensive performance statistics
func (*EnhancedBufferPool) Put ¶ added in v0.11.0
func (ebp *EnhancedBufferPool) Put(buf *circbuf.Buffer)
Put returns a buffer to the appropriate pool
func (*EnhancedBufferPool) Shutdown ¶ added in v0.11.0
func (ebp *EnhancedBufferPool) Shutdown()
Shutdown gracefully stops the enhanced buffer pool
type EnhancedBufferPoolConfig ¶ added in v0.11.0
type EnhancedBufferPoolConfig struct {
MinSize int64 `json:"minSize"` // Minimum buffer size
DefaultSize int64 `json:"defaultSize"` // Default buffer size
MaxSize int64 `json:"maxSize"` // Maximum buffer size
PoolSize int `json:"poolSize"` // Number of buffers to pre-allocate
MaxPoolSize int `json:"maxPoolSize"` // Maximum number of buffers in pool
GrowthFactor float64 `json:"growthFactor"` // Factor to increase pool size when needed
ShrinkThreshold float64 `json:"shrinkThreshold"` // Usage percentage below which to shrink
ShrinkInterval time.Duration `json:"shrinkInterval"` // How often to check for shrinking
EnableMetrics bool `json:"enableMetrics"` // Enable performance metrics
EnablePrewarming bool `json:"enablePrewarming"` // Pre-allocate buffers on startup
}
EnhancedBufferPoolConfig holds configuration for the enhanced buffer pool
func DefaultEnhancedBufferPoolConfig ¶ added in v0.11.0
func DefaultEnhancedBufferPoolConfig() *EnhancedBufferPoolConfig
DefaultEnhancedBufferPoolConfig returns optimized defaults for high-concurrency scenarios
type ExecJob ¶
type ExecJob struct {
BareJob `mapstructure:",squash"`
Provider DockerProvider `json:"-"` // SDK-based Docker provider
Container string `hash:"true"`
// User specifies the user to run the command as.
// If not set, uses the global default-user setting (default: "nobody").
// Set to "default" to explicitly use the container's default user, overriding global setting.
User string `hash:"true"`
TTY bool `default:"false" hash:"true"`
Environment []string `mapstructure:"environment" hash:"true"`
WorkingDir string `mapstructure:"working-dir" hash:"true"`
}
func NewExecJob ¶
func NewExecJob(provider DockerProvider) *ExecJob
func (*ExecJob) InitializeRuntimeFields ¶ added in v0.10.2
func (j *ExecJob) InitializeRuntimeFields()
InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.
func (*ExecJob) Middlewares ¶
func (c *ExecJob) Middlewares() []Middleware
func (*ExecJob) ResetMiddlewares ¶ added in v0.9.0
func (c *ExecJob) ResetMiddlewares(ms ...Middleware)
func (*ExecJob) RunWithStreams ¶ added in v0.12.0
RunWithStreams runs the exec job with custom output streams. This is useful for testing or when custom stream handling is needed.
func (*ExecJob) Use ¶
func (c *ExecJob) Use(ms ...Middleware)
type Execution ¶
type Execution struct {
ID string
Date time.Time
Duration time.Duration
IsRunning bool
Failed bool
Skipped bool
Error error
OutputStream, ErrorStream *circbuf.Buffer `json:"-"`
// Captured output for persistence after buffer cleanup
CapturedStdout, CapturedStderr string `json:"-"`
}
Execution contains all the information relative to a Job execution.
func NewExecution ¶
NewExecution returns a new Execution, with a random ID
func (*Execution) Cleanup ¶ added in v0.10.1
func (e *Execution) Cleanup()
Cleanup returns execution buffers to the pool for reuse
func (*Execution) GetStderr ¶ added in v0.10.1
GetStderr returns stderr content, preferring live buffer if available
func (*Execution) GetStdout ¶ added in v0.10.1
GetStdout returns stdout content, preferring live buffer if available
type FakeClock ¶ added in v0.17.0
type FakeClock struct {
// contains filtered or unexported fields
}
func NewFakeClock ¶ added in v0.17.0
func (*FakeClock) TickerCount ¶ added in v0.17.0
func (*FakeClock) WaitForAdvance ¶ added in v0.17.0
func (c *FakeClock) WaitForAdvance()
type GracefulScheduler ¶ added in v0.10.1
type GracefulScheduler struct {
*Scheduler
// contains filtered or unexported fields
}
GracefulScheduler wraps a scheduler with graceful shutdown
func NewGracefulScheduler ¶ added in v0.10.1
func NewGracefulScheduler(scheduler *Scheduler, shutdownManager *ShutdownManager) *GracefulScheduler
NewGracefulScheduler creates a scheduler with graceful shutdown support
func (GracefulScheduler) Middlewares ¶ added in v0.10.1
func (c GracefulScheduler) Middlewares() []Middleware
func (GracefulScheduler) ResetMiddlewares ¶ added in v0.10.1
func (c GracefulScheduler) ResetMiddlewares(ms ...Middleware)
func (*GracefulScheduler) RunJobWithTracking ¶ added in v0.10.1
func (gs *GracefulScheduler) RunJobWithTracking(job Job, ctx *Context) error
RunJobWithTracking runs a job with shutdown tracking
func (GracefulScheduler) Use ¶ added in v0.10.1
func (c GracefulScheduler) Use(ms ...Middleware)
type GracefulServer ¶ added in v0.10.1
type GracefulServer struct {
// contains filtered or unexported fields
}
GracefulServer wraps an HTTP server with graceful shutdown
func NewGracefulServer ¶ added in v0.10.1
func NewGracefulServer(server *http.Server, shutdownManager *ShutdownManager, logger *slog.Logger) *GracefulServer
NewGracefulServer creates a server with graceful shutdown support
type Job ¶
type Job interface {
GetName() string
GetSchedule() string
GetCommand() string
ShouldRunOnStartup() bool
Middlewares() []Middleware
Use(...Middleware)
Run(*Context) error
Running() int32
NotifyStart()
NotifyStop()
GetCronJobID() uint64
SetCronJobID(uint64)
GetHistory() []*Execution
Hash() (string, error)
}
type JobMetrics ¶ added in v0.11.0
type JobMetrics struct {
ExecutionCount int64
TotalDuration time.Duration
AverageDuration time.Duration
MinDuration time.Duration
MaxDuration time.Duration
SuccessCount int64
FailureCount int64
LastExecution time.Time
LastSuccess time.Time
LastFailure time.Time
}
JobMetrics holds metrics for individual jobs
type JobMetricsRecorder ¶ added in v0.10.1
type JobMetricsRecorder interface {
RecordMetric(name string, value any)
RecordJobExecution(jobName string, success bool, duration time.Duration)
RecordRetryAttempt(jobName string, attempt int, success bool)
}
JobMetricsRecorder interface for recording job-specific metrics
type LatencyTracker ¶ added in v0.11.0
type LatencyTracker struct {
Count int64
Total time.Duration
Min time.Duration
Max time.Duration
Average time.Duration
// contains filtered or unexported fields
}
LatencyTracker tracks latency statistics for operations
type LocalJob ¶
type LocalJob struct {
BareJob `mapstructure:",squash"`
Dir string `hash:"true"`
Environment []string `mapstructure:"environment" hash:"true"`
}
func NewLocalJob ¶
func NewLocalJob() *LocalJob
func (*LocalJob) Middlewares ¶
func (c *LocalJob) Middlewares() []Middleware
func (*LocalJob) ResetMiddlewares ¶ added in v0.9.0
func (c *LocalJob) ResetMiddlewares(ms ...Middleware)
func (*LocalJob) Use ¶
func (c *LocalJob) Use(ms ...Middleware)
type MetricsRecorder ¶ added in v0.10.1
type MetricsRecorder interface {
RecordJobRetry(jobName string, attempt int, success bool)
RecordContainerEvent()
RecordContainerMonitorFallback()
RecordContainerMonitorMethod(usingEvents bool)
RecordContainerWaitDuration(seconds float64)
RecordDockerOperation(operation string)
RecordDockerError(operation string)
// Job scheduling metrics (from go-cron ObservabilityHooks)
RecordJobStart(jobName string)
RecordJobComplete(jobName string, durationSeconds float64, panicked bool)
RecordJobScheduled(jobName string)
}
MetricsRecorder interface for recording retry and monitoring metrics
type Middleware ¶
type Middleware interface {
// Run is called instead of the original `Job.Run`, you MUST call to `ctx.Run`
// inside of the middleware `Run` function otherwise you will broken the
// Job workflow.
Run(*Context) error
// ContinueOnStop reports whether Run should be called even when the
// execution has been stopped
ContinueOnStop() bool
}
Middleware can wrap any job execution, allowing to execution code before or/and after of each `Job.Run`
type NonZeroExitError ¶ added in v0.10.1
type NonZeroExitError struct {
ExitCode int
}
NonZeroExitError represents a container exit with non-zero code
func (NonZeroExitError) Error ¶ added in v0.10.1
func (e NonZeroExitError) Error() string
type PerformanceMetrics ¶ added in v0.11.0
type PerformanceMetrics struct {
// contains filtered or unexported fields
}
PerformanceMetrics implements comprehensive performance tracking
func NewPerformanceMetrics ¶ added in v0.11.0
func NewPerformanceMetrics() *PerformanceMetrics
NewPerformanceMetrics creates a new performance metrics recorder
func (*PerformanceMetrics) GetDockerMetrics ¶ added in v0.11.0
func (pm *PerformanceMetrics) GetDockerMetrics() map[string]any
GetDockerMetrics returns Docker-specific metrics
func (*PerformanceMetrics) GetJobMetrics ¶ added in v0.11.0
func (pm *PerformanceMetrics) GetJobMetrics() map[string]any
GetJobMetrics returns job execution metrics
func (*PerformanceMetrics) GetMetrics ¶ added in v0.11.0
func (pm *PerformanceMetrics) GetMetrics() map[string]any
GetMetrics returns all performance metrics
func (*PerformanceMetrics) GetSummaryReport ¶ added in v0.11.0
func (pm *PerformanceMetrics) GetSummaryReport() string
GetSummaryReport generates a human-readable performance summary
func (*PerformanceMetrics) RecordBufferPoolStats ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordBufferPoolStats(stats map[string]any)
RecordBufferPoolStats records buffer pool performance statistics
func (*PerformanceMetrics) RecordConcurrentJobs ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordConcurrentJobs(count int64)
RecordConcurrentJobs tracks the number of concurrent jobs
func (*PerformanceMetrics) RecordContainerEvent ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordContainerEvent()
RecordContainerEvent records container events
func (*PerformanceMetrics) RecordContainerMonitorFallback ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordContainerMonitorFallback()
RecordContainerMonitorFallback records container monitor fallbacks
func (*PerformanceMetrics) RecordContainerMonitorMethod ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordContainerMonitorMethod(usingEvents bool)
RecordContainerMonitorMethod records container monitor method usage
func (*PerformanceMetrics) RecordContainerWaitDuration ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordContainerWaitDuration(seconds float64)
RecordContainerWaitDuration records container wait durations
func (*PerformanceMetrics) RecordCustomMetric ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordCustomMetric(name string, value any)
RecordCustomMetric records a custom metric
func (*PerformanceMetrics) RecordDockerError ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordDockerError(operation string)
RecordDockerError records a Docker operation error
func (*PerformanceMetrics) RecordDockerLatency ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordDockerLatency(operation string, duration time.Duration)
RecordDockerLatency records the latency of a Docker operation
func (*PerformanceMetrics) RecordDockerOperation ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordDockerOperation(operation string)
RecordDockerOperation records a successful Docker operation
func (*PerformanceMetrics) RecordJobComplete ¶ added in v0.13.0
func (pm *PerformanceMetrics) RecordJobComplete(jobName string, durationSeconds float64, panicked bool)
RecordJobComplete records a job completing (from go-cron ObservabilityHooks)
func (*PerformanceMetrics) RecordJobExecution ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordJobExecution(jobName string, duration time.Duration, success bool)
RecordJobExecution records a job execution with timing and success status
func (*PerformanceMetrics) RecordJobRetry ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordJobRetry(jobName string, attempt int, success bool)
RecordJobRetry records job retry attempts
func (*PerformanceMetrics) RecordJobScheduled ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordJobScheduled(jobName string)
RecordJobScheduled records when a job is scheduled
func (*PerformanceMetrics) RecordJobSkipped ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordJobSkipped(jobName string, reason string)
RecordJobSkipped records when a job is skipped
func (*PerformanceMetrics) RecordJobStart ¶ added in v0.13.0
func (pm *PerformanceMetrics) RecordJobStart(jobName string)
RecordJobStart records a job start (from go-cron ObservabilityHooks)
func (*PerformanceMetrics) RecordMemoryUsage ¶ added in v0.11.0
func (pm *PerformanceMetrics) RecordMemoryUsage(bytes int64)
RecordMemoryUsage tracks memory usage
func (*PerformanceMetrics) Reset ¶ added in v0.11.0
func (pm *PerformanceMetrics) Reset()
Reset clears all metrics (useful for testing or periodic resets)
type PerformanceRecorder ¶ added in v0.11.0
type PerformanceRecorder interface {
MetricsRecorder // Embed existing interface (includes RecordDockerError and job scheduling metrics)
// Extended Docker operations
RecordDockerLatency(operation string, duration time.Duration)
// Job operations (extended beyond MetricsRecorder)
RecordJobExecution(jobName string, duration time.Duration, success bool)
RecordJobSkipped(jobName string, reason string)
// System metrics
RecordConcurrentJobs(count int64)
RecordMemoryUsage(bytes int64)
RecordBufferPoolStats(stats map[string]any)
// Custom metrics
RecordCustomMetric(name string, value any)
// Retrieval
GetMetrics() map[string]any
GetDockerMetrics() map[string]any
GetJobMetrics() map[string]any
Reset()
}
PerformanceRecorder defines the interface for recording comprehensive performance metrics This extends the existing MetricsRecorder interface with additional capabilities
type RateLimiter ¶ added in v0.10.1
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements token bucket rate limiting
func NewRateLimiter ¶ added in v0.10.1
func NewRateLimiter(rate float64, capacity int) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶ added in v0.10.1
func (rl *RateLimiter) Allow() bool
Allow checks if a request is allowed
func (*RateLimiter) AllowN ¶ added in v0.10.1
func (rl *RateLimiter) AllowN(n int) bool
AllowN checks if n requests are allowed
type ResilientJobExecutor ¶ added in v0.10.1
type ResilientJobExecutor struct {
// contains filtered or unexported fields
}
ResilientJobExecutor wraps job execution with resilience patterns
func NewResilientJobExecutor ¶ added in v0.10.1
func NewResilientJobExecutor(job Job) *ResilientJobExecutor
NewResilientJobExecutor creates a new resilient job executor
func (*ResilientJobExecutor) Execute ¶ added in v0.10.1
func (rje *ResilientJobExecutor) Execute(ctx *Context) error
Execute runs the job with resilience patterns
func (*ResilientJobExecutor) GetCircuitBreakerState ¶ added in v0.10.1
func (rje *ResilientJobExecutor) GetCircuitBreakerState() CircuitBreakerState
GetCircuitBreakerState returns the current state of the circuit breaker
func (*ResilientJobExecutor) ResetCircuitBreaker ¶ added in v0.10.1
func (rje *ResilientJobExecutor) ResetCircuitBreaker()
ResetCircuitBreaker manually resets the circuit breaker
func (*ResilientJobExecutor) SetBulkhead ¶ added in v0.10.1
func (rje *ResilientJobExecutor) SetBulkhead(bh *Bulkhead)
SetBulkhead updates the bulkhead
func (*ResilientJobExecutor) SetCircuitBreaker ¶ added in v0.10.1
func (rje *ResilientJobExecutor) SetCircuitBreaker(cb *CircuitBreaker)
SetCircuitBreaker updates the circuit breaker
func (*ResilientJobExecutor) SetMetricsRecorder ¶ added in v0.10.1
func (rje *ResilientJobExecutor) SetMetricsRecorder(metrics JobMetricsRecorder)
SetMetricsRecorder sets the metrics recorder
func (*ResilientJobExecutor) SetRateLimiter ¶ added in v0.10.1
func (rje *ResilientJobExecutor) SetRateLimiter(rl *RateLimiter)
SetRateLimiter updates the rate limiter
func (*ResilientJobExecutor) SetRetryPolicy ¶ added in v0.10.1
func (rje *ResilientJobExecutor) SetRetryPolicy(policy *RetryPolicy)
SetRetryPolicy updates the retry policy
type RetryConfig ¶ added in v0.10.1
type RetryConfig struct {
MaxRetries int
RetryDelayMs int
RetryExponential bool
RetryMaxDelayMs int
}
RetryConfig contains retry configuration for a job
type RetryExecutor ¶ added in v0.10.1
type RetryExecutor struct {
// contains filtered or unexported fields
}
RetryExecutor wraps job execution with retry logic
func NewRetryExecutor ¶ added in v0.10.1
func NewRetryExecutor(logger *slog.Logger) *RetryExecutor
NewRetryExecutor creates a new retry executor
func (*RetryExecutor) ExecuteWithRetry ¶ added in v0.10.1
func (re *RetryExecutor) ExecuteWithRetry(job Job, ctx *Context, runFunc func(*Context) error) error
ExecuteWithRetry executes a job with retry logic
func (*RetryExecutor) SetMetricsRecorder ¶ added in v0.10.1
func (re *RetryExecutor) SetMetricsRecorder(metrics MetricsRecorder)
SetMetricsRecorder sets the metrics recorder for the retry executor
type RetryMetrics ¶ added in v0.11.0
type RetryMetrics struct {
TotalAttempts int64
SuccessfulRetries int64
FailedRetries int64
LastRetry time.Time
}
RetryMetrics holds retry-specific metrics
type RetryPolicy ¶ added in v0.10.1
type RetryPolicy struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
JitterFactor float64
RetryableErrors func(error) bool
}
RetryPolicy defines retry behavior
func DefaultRetryPolicy ¶ added in v0.10.1
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a default retry policy
type RetryStats ¶ added in v0.10.1
type RetryStats struct {
JobName string
TotalAttempts int
SuccessAfter int // Number of retries before success (0 if first attempt succeeded)
Failed bool
LastError error
}
RetryStats tracks retry statistics for a job
type RetryableJob ¶ added in v0.10.1
type RetryableJob interface {
Job
GetRetryConfig() RetryConfig
}
RetryableJob interface for jobs that support retries
type RunJob ¶
type RunJob struct {
BareJob `mapstructure:",squash"`
Provider DockerProvider `json:"-"` // SDK-based Docker provider
// User specifies the user to run the container as.
// If not set, uses the global default-user setting (default: "nobody").
// Set to "default" to explicitly use the container's default user, overriding global setting.
User string `hash:"true"`
// ContainerName specifies the name of the container to be created. If
// nil, the job name will be used. If set to an empty string, Docker
// will assign a random name.
ContainerName *string `gcfg:"container-name" mapstructure:"container-name" hash:"true"`
TTY bool `default:"false" hash:"true"`
// do not use bool values with "default:true" because if
// user would set it to "false" explicitly, it still will be
// changed to "true" https://github.com/netresearch/ofelia/issues/135
// so lets use strings here as workaround
Delete string `default:"true" hash:"true"`
Pull string `default:"true" hash:"true"`
Image string `hash:"true"`
Network string `hash:"true"`
Hostname string `hash:"true"`
Entrypoint *string `hash:"true"`
Container string `hash:"true"`
Volume []string `hash:"true"`
VolumesFrom []string `gcfg:"volumes-from" mapstructure:"volumes-from," hash:"true"`
Environment []string `mapstructure:"environment" hash:"true"`
Annotations []string `mapstructure:"annotations" hash:"true"`
MaxRuntime time.Duration `gcfg:"max-runtime" mapstructure:"max-runtime"`
// contains filtered or unexported fields
}
func NewRunJob ¶
func NewRunJob(provider DockerProvider) *RunJob
func (*RunJob) InitializeRuntimeFields ¶ added in v0.10.1
func (j *RunJob) InitializeRuntimeFields()
InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.
func (*RunJob) Middlewares ¶
func (c *RunJob) Middlewares() []Middleware
func (*RunJob) ResetMiddlewares ¶ added in v0.9.0
func (c *RunJob) ResetMiddlewares(ms ...Middleware)
func (*RunJob) Use ¶
func (c *RunJob) Use(ms ...Middleware)
type RunServiceJob ¶
type RunServiceJob struct {
BareJob `mapstructure:",squash"`
Provider DockerProvider `json:"-"` // SDK-based Docker provider
// User specifies the user to run the service as.
// If not set, uses the global default-user setting (default: "nobody").
// Set to "default" to explicitly use the container's default user, overriding global setting.
User string `hash:"true"`
TTY bool `default:"false" hash:"true"`
// do not use bool values with "default:true" because if
// user would set it to "false" explicitly, it still will be
// changed to "true" https://github.com/netresearch/ofelia/issues/135
// so lets use strings here as workaround
Delete string `default:"true" hash:"true"`
Image string `hash:"true"`
Network string `hash:"true"`
Annotations []string `mapstructure:"annotations" hash:"true"`
MaxRuntime time.Duration `gcfg:"max-runtime" mapstructure:"max-runtime"`
}
func NewRunServiceJob ¶
func NewRunServiceJob(provider DockerProvider) *RunServiceJob
func (*RunServiceJob) InitializeRuntimeFields ¶ added in v0.12.0
func (j *RunServiceJob) InitializeRuntimeFields()
InitializeRuntimeFields initializes fields that depend on the Docker provider. This should be called after the Provider field is set.
func (*RunServiceJob) Middlewares ¶
func (c *RunServiceJob) Middlewares() []Middleware
func (*RunServiceJob) ResetMiddlewares ¶ added in v0.9.0
func (c *RunServiceJob) ResetMiddlewares(ms ...Middleware)
func (*RunServiceJob) Run ¶
func (j *RunServiceJob) Run(ctx *Context) error
func (*RunServiceJob) Use ¶
func (c *RunServiceJob) Use(ms ...Middleware)
func (*RunServiceJob) Validate ¶ added in v0.16.0
func (j *RunServiceJob) Validate() error
Validate checks that the job configuration is valid. For job-service-run, Image is required.
type SDKDockerProvider ¶ added in v0.12.0
type SDKDockerProvider struct {
// contains filtered or unexported fields
}
SDKDockerProvider implements DockerProvider using the official Docker SDK.
func NewSDKDockerProvider ¶ added in v0.12.0
func NewSDKDockerProvider(cfg *SDKDockerProviderConfig) (*SDKDockerProvider, error)
NewSDKDockerProvider creates a new SDK-based Docker provider.
func NewSDKDockerProviderDefault ¶ added in v0.12.0
func NewSDKDockerProviderDefault() (*SDKDockerProvider, error)
NewSDKDockerProviderDefault creates a provider with default settings.
func NewSDKDockerProviderFromClient ¶ added in v0.12.0
func NewSDKDockerProviderFromClient(client ports.DockerClient, logger *slog.Logger, metricsRecorder MetricsRecorder) *SDKDockerProvider
NewSDKDockerProviderFromClient creates a provider from an existing client.
func (*SDKDockerProvider) Close ¶ added in v0.12.0
func (p *SDKDockerProvider) Close() error
Close closes the Docker client.
func (*SDKDockerProvider) ConnectNetwork ¶ added in v0.12.0
func (p *SDKDockerProvider) ConnectNetwork(ctx context.Context, networkID, containerID string) error
ConnectNetwork connects a container to a network.
func (*SDKDockerProvider) CreateContainer ¶ added in v0.12.0
func (p *SDKDockerProvider) CreateContainer(ctx context.Context, config *domain.ContainerConfig, name string) (string, error)
CreateContainer creates a new container.
func (*SDKDockerProvider) CreateExec ¶ added in v0.12.0
func (p *SDKDockerProvider) CreateExec(ctx context.Context, containerID string, config *domain.ExecConfig) (string, error)
CreateExec creates an exec instance.
func (*SDKDockerProvider) CreateService ¶ added in v0.12.0
func (p *SDKDockerProvider) CreateService(ctx context.Context, spec domain.ServiceSpec, opts domain.ServiceCreateOptions) (string, error)
CreateService creates a new Swarm service.
func (*SDKDockerProvider) EnsureImage ¶ added in v0.12.0
EnsureImage ensures an image is available, pulling if necessary.
func (*SDKDockerProvider) FindNetworkByName ¶ added in v0.12.0
func (p *SDKDockerProvider) FindNetworkByName(ctx context.Context, networkName string) ([]domain.Network, error)
FindNetworkByName finds networks by name.
func (*SDKDockerProvider) GetContainerLogs ¶ added in v0.12.0
func (p *SDKDockerProvider) GetContainerLogs(ctx context.Context, containerID string, opts ContainerLogsOptions) (io.ReadCloser, error)
GetContainerLogs retrieves container logs.
func (*SDKDockerProvider) HasImageLocally ¶ added in v0.12.0
HasImageLocally checks if an image exists locally.
func (*SDKDockerProvider) Info ¶ added in v0.12.0
func (p *SDKDockerProvider) Info(ctx context.Context) (*domain.SystemInfo, error)
Info returns Docker system info.
func (*SDKDockerProvider) InspectContainer ¶ added in v0.12.0
func (p *SDKDockerProvider) InspectContainer(ctx context.Context, containerID string) (*domain.Container, error)
InspectContainer inspects a container.
func (*SDKDockerProvider) InspectExec ¶ added in v0.12.0
func (p *SDKDockerProvider) InspectExec(ctx context.Context, execID string) (*domain.ExecInspect, error)
InspectExec inspects an exec instance.
func (*SDKDockerProvider) InspectService ¶ added in v0.12.0
func (p *SDKDockerProvider) InspectService(ctx context.Context, serviceID string) (*domain.Service, error)
InspectService returns detailed information about a service.
func (*SDKDockerProvider) ListContainers ¶ added in v0.12.0
func (p *SDKDockerProvider) ListContainers(ctx context.Context, opts domain.ListOptions) ([]domain.Container, error)
ListContainers lists containers matching the options.
func (*SDKDockerProvider) ListTasks ¶ added in v0.12.0
func (p *SDKDockerProvider) ListTasks(ctx context.Context, opts domain.TaskListOptions) ([]domain.Task, error)
ListTasks lists tasks matching the filter options.
func (*SDKDockerProvider) Ping ¶ added in v0.12.0
func (p *SDKDockerProvider) Ping(ctx context.Context) error
Ping pings the Docker daemon.
func (*SDKDockerProvider) PullImage ¶ added in v0.12.0
func (p *SDKDockerProvider) PullImage(ctx context.Context, image string) error
PullImage pulls an image.
func (*SDKDockerProvider) RemoveContainer ¶ added in v0.12.0
func (p *SDKDockerProvider) RemoveContainer(ctx context.Context, containerID string, force bool) error
RemoveContainer removes a container.
func (*SDKDockerProvider) RemoveService ¶ added in v0.12.0
func (p *SDKDockerProvider) RemoveService(ctx context.Context, serviceID string) error
RemoveService removes a service.
func (*SDKDockerProvider) RunExec ¶ added in v0.12.0
func (p *SDKDockerProvider) RunExec( ctx context.Context, containerID string, config *domain.ExecConfig, stdout, stderr io.Writer, ) (int, error)
RunExec executes a command and waits for completion.
func (*SDKDockerProvider) StartContainer ¶ added in v0.12.0
func (p *SDKDockerProvider) StartContainer(ctx context.Context, containerID string) error
StartContainer starts a container.
func (*SDKDockerProvider) StartExec ¶ added in v0.12.0
func (p *SDKDockerProvider) StartExec(ctx context.Context, execID string, opts domain.ExecStartOptions) (*domain.HijackedResponse, error)
StartExec starts an exec instance.
func (*SDKDockerProvider) StopContainer ¶ added in v0.12.0
func (p *SDKDockerProvider) StopContainer(ctx context.Context, containerID string, timeout *time.Duration) error
StopContainer stops a container.
func (*SDKDockerProvider) SubscribeEvents ¶ added in v0.12.0
func (p *SDKDockerProvider) SubscribeEvents(ctx context.Context, filter domain.EventFilter) (<-chan domain.Event, <-chan error)
SubscribeEvents subscribes to Docker events.
func (*SDKDockerProvider) WaitContainer ¶ added in v0.12.0
WaitContainer waits for a container to exit.
type SDKDockerProviderConfig ¶ added in v0.12.0
type SDKDockerProviderConfig struct {
// Host is the Docker host address (e.g., "unix:///var/run/docker.sock")
Host string
// Logger for operation logging
Logger *slog.Logger
// MetricsRecorder for metrics tracking
MetricsRecorder MetricsRecorder
// AuthProvider for registry authentication (optional)
AuthProvider ports.AuthProvider
}
SDKDockerProviderConfig configures the SDK provider.
type Scheduler ¶
type Scheduler struct {
Jobs []Job
Removed []Job
Disabled []Job
Logger *slog.Logger
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewSchedulerWithClock ¶ added in v0.17.0
NewSchedulerWithClock creates a scheduler with a fake clock for testing. This allows tests to control time advancement without real waits.
func NewSchedulerWithMetrics ¶ added in v0.13.0
func NewSchedulerWithMetrics(l *slog.Logger, metricsRecorder MetricsRecorder) *Scheduler
NewSchedulerWithMetrics creates a scheduler with metrics (deprecated: use NewSchedulerWithOptions)
func NewSchedulerWithOptions ¶ added in v0.17.0
func NewSchedulerWithOptions(l *slog.Logger, metricsRecorder MetricsRecorder, minEveryInterval time.Duration) *Scheduler
NewSchedulerWithOptions creates a scheduler with configurable minimum interval. minEveryInterval of 0 uses the library default (1s). Use negative value to allow sub-second.
func (*Scheduler) AddJobWithTags ¶ added in v0.13.0
AddJobWithTags adds a job with optional tags for categorization. Tags can be used to group, filter, and remove related jobs. Jobs with @triggered/@manual/@none schedules are stored but not scheduled in cron.
func (*Scheduler) DisableJob ¶ added in v0.8.0
DisableJob stops scheduling the job but keeps it for later enabling.
func (*Scheduler) EnableJob ¶ added in v0.8.0
EnableJob schedules a previously disabled job. Uses go-cron's UpsertJob for atomic create-or-update, eliminating the previous polling retry loop that waited for RemoveByName to take effect.
func (*Scheduler) GetDisabledJob ¶ added in v0.8.0
GetDisabledJob returns a disabled job by name.
func (*Scheduler) GetDisabledJobs ¶ added in v0.8.0
GetDisabledJobs returns a copy of all disabled jobs.
func (*Scheduler) GetJobsByTag ¶ added in v0.13.0
GetJobsByTag returns all jobs with the specified tag.
func (*Scheduler) GetRemovedJobs ¶ added in v0.8.0
GetRemovedJobs returns a copy of all jobs that were removed from the scheduler.
func (*Scheduler) IsJobRunning ¶ added in v0.20.0
IsJobRunning reports whether the named job has any invocations currently in flight. Returns false if no job with the given name exists or the scheduler has no cron instance.
func (*Scheduler) IsRunning ¶
IsRunning returns true if the scheduler is active. Delegates to go-cron's IsRunning() which is the authoritative source.
func (*Scheduler) Middlewares ¶
func (c *Scheduler) Middlewares() []Middleware
func (*Scheduler) RemoveJobsByTag ¶ added in v0.13.0
RemoveJobsByTag removes all jobs with the specified tag. Returns the number of jobs removed.
func (*Scheduler) ResetMiddlewares ¶ added in v0.9.0
func (c *Scheduler) ResetMiddlewares(ms ...Middleware)
func (*Scheduler) RunJob ¶ added in v0.8.0
RunJob manually triggers a job by name. The provided context is propagated to the job's RunWithContext method and is available via Context.Ctx.
func (*Scheduler) SetMaxConcurrentJobs ¶ added in v0.10.1
SetMaxConcurrentJobs configures the maximum number of concurrent jobs
func (*Scheduler) SetMetricsRecorder ¶ added in v0.10.1
func (s *Scheduler) SetMetricsRecorder(recorder MetricsRecorder)
func (*Scheduler) SetOnJobComplete ¶ added in v0.17.0
func (*Scheduler) StopAndWait ¶ added in v0.13.0
func (s *Scheduler) StopAndWait()
StopAndWait stops the scheduler and waits indefinitely for all jobs to complete.
func (*Scheduler) StopWithTimeout ¶ added in v0.13.0
StopWithTimeout stops the scheduler with a graceful shutdown timeout. It stops accepting new jobs, then waits up to the timeout for running jobs to complete. Returns nil if all jobs completed, or an error if the timeout was exceeded.
func (*Scheduler) UpdateJob ¶ added in v0.20.0
UpdateJob atomically replaces the schedule and job implementation for an existing named entry using go-cron's UpdateEntryJobByName. The old job's in-flight invocations complete before the new schedule takes effect (because go-cron serializes entry mutations through the scheduler goroutine).
Returns ErrJobNotFound if no active job with the given name exists.
func (*Scheduler) Use ¶
func (c *Scheduler) Use(ms ...Middleware)
type ShutdownHook ¶ added in v0.10.1
type ShutdownHook struct {
Name string
Priority int // Lower values execute first
Hook func(context.Context) error
}
ShutdownHook is a function to be called during shutdown
type ShutdownManager ¶ added in v0.10.1
type ShutdownManager struct {
// contains filtered or unexported fields
}
ShutdownManager handles graceful shutdown of the application
func NewShutdownManager ¶ added in v0.10.1
func NewShutdownManager(logger *slog.Logger, timeout time.Duration) *ShutdownManager
NewShutdownManager creates a new shutdown manager
func (*ShutdownManager) IsShuttingDown ¶ added in v0.10.1
func (sm *ShutdownManager) IsShuttingDown() bool
IsShuttingDown returns true if shutdown is in progress
func (*ShutdownManager) ListenForShutdown ¶ added in v0.10.1
func (sm *ShutdownManager) ListenForShutdown()
ListenForShutdown starts listening for shutdown signals
func (*ShutdownManager) RegisterHook ¶ added in v0.10.1
func (sm *ShutdownManager) RegisterHook(hook ShutdownHook)
RegisterHook registers a shutdown hook
func (*ShutdownManager) Shutdown ¶ added in v0.10.1
func (sm *ShutdownManager) Shutdown() error
Shutdown initiates graceful shutdown
func (*ShutdownManager) ShutdownChan ¶ added in v0.10.1
func (sm *ShutdownManager) ShutdownChan() <-chan struct{}
ShutdownChan returns a channel that's closed when shutdown starts
type SimpleMetricsRecorder ¶ added in v0.10.1
type SimpleMetricsRecorder struct {
// contains filtered or unexported fields
}
SimpleMetricsRecorder provides a basic implementation of JobMetricsRecorder
func NewSimpleMetricsRecorder ¶ added in v0.10.1
func NewSimpleMetricsRecorder() *SimpleMetricsRecorder
NewSimpleMetricsRecorder creates a new simple metrics recorder
func (*SimpleMetricsRecorder) GetMetrics ¶ added in v0.10.1
func (smr *SimpleMetricsRecorder) GetMetrics() map[string]any
GetMetrics returns all recorded metrics
func (*SimpleMetricsRecorder) RecordJobExecution ¶ added in v0.10.1
func (smr *SimpleMetricsRecorder) RecordJobExecution(jobName string, success bool, duration time.Duration)
RecordJobExecution records job execution metrics
func (*SimpleMetricsRecorder) RecordMetric ¶ added in v0.10.1
func (smr *SimpleMetricsRecorder) RecordMetric(name string, value any)
RecordMetric records a generic metric
func (*SimpleMetricsRecorder) RecordRetryAttempt ¶ added in v0.10.1
func (smr *SimpleMetricsRecorder) RecordRetryAttempt(jobName string, attempt int, success bool)
RecordRetryAttempt records retry attempt metrics
type TestContainerMonitor ¶ added in v0.10.1
type TestContainerMonitor struct {
// contains filtered or unexported fields
}
TestContainerMonitor provides a test interface to simulate container monitoring
func (*TestContainerMonitor) SetUseEventsAPI ¶ added in v0.10.1
func (t *TestContainerMonitor) SetUseEventsAPI(use bool)
func (*TestContainerMonitor) WaitForContainer ¶ added in v0.10.1
func (t *TestContainerMonitor) WaitForContainer(containerID string, maxRuntime time.Duration) (*domain.ContainerState, error)
type Timer ¶ added in v0.17.0
Timer represents a single event timer, compatible with go-cron's Timer interface. It provides the same operations as time.Timer.
type WorkflowExecution ¶ added in v0.10.1
type WorkflowExecution struct {
ID string
StartTime time.Time
CompletedJobs map[string]bool
FailedJobs map[string]bool
RunningJobs map[string]bool
// contains filtered or unexported fields
}
WorkflowExecution tracks the state of a workflow execution
type WorkflowOrchestrator ¶ added in v0.10.1
type WorkflowOrchestrator struct {
// contains filtered or unexported fields
}
WorkflowOrchestrator manages job dependencies and workflow execution
func NewWorkflowOrchestrator ¶ added in v0.10.1
func NewWorkflowOrchestrator(scheduler *Scheduler, logger *slog.Logger) *WorkflowOrchestrator
NewWorkflowOrchestrator creates a new workflow orchestrator
func (*WorkflowOrchestrator) BuildDependencyGraph ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) BuildDependencyGraph(jobs []Job) error
BuildDependencyGraph builds the dependency graph from jobs
func (*WorkflowOrchestrator) CanExecute ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) CanExecute(jobName string, executionID string) bool
CanExecute checks if a job can execute based on its dependencies
func (*WorkflowOrchestrator) CleanupOldExecutions ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) CleanupOldExecutions(maxAge time.Duration)
CleanupOldExecutions removes old workflow executions
func (*WorkflowOrchestrator) GetWorkflowStatus ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) GetWorkflowStatus(executionID string) map[string]any
GetWorkflowStatus returns the status of a workflow execution
func (*WorkflowOrchestrator) JobCompleted ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) JobCompleted(ctx context.Context, jobName string, executionID string, success bool)
JobCompleted marks a job as completed and triggers dependent jobs
func (*WorkflowOrchestrator) JobStarted ¶ added in v0.10.1
func (wo *WorkflowOrchestrator) JobStarted(jobName string, executionID string)
JobStarted marks a job as started in the workflow
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
adapters
|
|
|
docker
Package docker provides an adapter for the official Docker SDK.
|
Package docker provides an adapter for the official Docker SDK. |
|
mock
Package mock provides mock implementations of the ports interfaces for testing.
|
Package mock provides mock implementations of the ports interfaces for testing. |
|
Package domain contains SDK-agnostic domain models for Docker operations.
|
Package domain contains SDK-agnostic domain models for Docker operations. |
|
Package ports defines the port interfaces for Docker operations.
|
Package ports defines the port interfaces for Docker operations. |