Documentation
¶
Index ¶
- type HandlerFunc
- type HandlerStats
- type MemoryStorage
- func (s *MemoryStorage) Close()
- func (s *MemoryStorage) DoneTask(_ context.Context, taskIDs []int64) error
- func (s *MemoryStorage) GetStat(_ context.Context) (*Stats, error)
- func (s *MemoryStorage) GetTask(_ context.Context, queueNum int, count int) ([]Task, error)
- func (s *MemoryStorage) PutTask(_ context.Context, req PutTaskRequest) error
- func (s *MemoryStorage) RetryTask(_ context.Context, retries []RetryInfo) error
- type Option
- type PutTaskRequest
- type QueueStat
- type QueueWorker
- type RetryInfo
- type Stats
- type Storage
- type Task
- type WorkerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, storage Storage, queueNum int, tasks []Task) (HandlerStats, error)
HandlerFunc is the function signature for processing tasks from a queue. It receives the storage, queue number, and a batch of tasks. The handler is responsible for calling DoneTask/RetryTask on the storage.
type HandlerStats ¶
type HandlerStats struct {
Processed int
Errors int
Retried int
Duration time.Duration
LastError error
}
HandlerStats contains processing statistics returned by a handler.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an in-memory implementation of Storage. It is intended for development and testing. Data is lost on restart.
func NewMemoryStorage ¶
func NewMemoryStorage(opts ...Option) *MemoryStorage
NewMemoryStorage creates a new in-memory storage.
func (*MemoryStorage) Close ¶
func (s *MemoryStorage) Close()
Close stops the background goroutine that returns expired tasks.
func (*MemoryStorage) DoneTask ¶
func (s *MemoryStorage) DoneTask(_ context.Context, taskIDs []int64) error
func (*MemoryStorage) PutTask ¶
func (s *MemoryStorage) PutTask(_ context.Context, req PutTaskRequest) error
type Option ¶
type Option func(*MemoryStorage)
Option configures MemoryStorage.
func WithVisibilityTimeout ¶
WithVisibilityTimeout sets the visibility timeout for taken tasks. If a task is not completed or retried within this duration, it is automatically returned to the queue with Attempts incremented.
type PutTaskRequest ¶
PutTaskRequest is a request to add a new task to the queue.
type QueueStat ¶
type QueueStat struct {
QueueNum int
TotalTasks int64
ReadyTasks int64
TakenTasks int64
DelayedTasks int64
}
QueueStat contains statistics for a single queue.
type QueueWorker ¶
type QueueWorker struct {
// contains filtered or unexported fields
}
QueueWorker manages a pool of JobProcessors and distributes queue assignments using round-robin scheduling.
func NewQueueWorker ¶
func NewQueueWorker(storage Storage, queueIDs []int, handler HandlerFunc, opts ...WorkerOption) *QueueWorker
NewQueueWorker creates a new QueueWorker.
func (*QueueWorker) GracefulStop ¶
func (w *QueueWorker) GracefulStop(_ context.Context) (<-chan struct{}, error)
GracefulStop signals the worker to stop after completing current work. Returns a channel that is closed when all processors have finished.
type Storage ¶
type Storage interface {
// GetTask atomically takes up to count tasks from the given queue.
// Taken tasks are marked as "in progress" and will not be returned
// to other consumers until the visibility timeout expires.
GetTask(ctx context.Context, queueNum int, count int) ([]Task, error)
// DoneTask marks tasks as completed. They are removed from storage.
DoneTask(ctx context.Context, taskIDs []int64) error
// RetryTask returns tasks to the queue with the specified next start time.
// Attempts is incremented, PrevStartTime is set to the current StartTime.
RetryTask(ctx context.Context, retries []RetryInfo) error
// PutTask adds a new task to the queue.
PutTask(ctx context.Context, req PutTaskRequest) error
// GetStat returns statistics for all queues.
GetStat(ctx context.Context) (*Stats, error)
}
Storage is the interface for queue storage implementations. It provides atomic operations for task management with guaranteed delivery.
type Task ¶
type Task struct {
ID int64
QueueNum int
Data []byte
Attempts int
CreatedAt time.Time
StartTime time.Time
PrevStartTime time.Time
}
Task is a task retrieved from storage.
type WorkerOption ¶
type WorkerOption func(*QueueWorker)
WorkerOption configures QueueWorker.
func WithBatchSize ¶
func WithBatchSize(n int) WorkerOption
WithBatchSize sets the number of tasks fetched per GetTask call.
func WithMaxProcessors ¶
func WithMaxProcessors(n int) WorkerOption
WithMaxProcessors sets the maximum number of concurrent JobProcessors.
func WithMetrics ¶
func WithMetrics(registry *prometheus.Registry, namespace string) WorkerOption
WithMetrics enables Prometheus metrics with the given registry and namespace.