queue

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(ctx context.Context, storage Storage, queueNum int, tasks []Task) (HandlerStats, error)

HandlerFunc is the function signature for processing tasks from a queue. It receives the storage, queue number, and a batch of tasks. The handler is responsible for calling DoneTask/RetryTask on the storage.

type HandlerStats

type HandlerStats struct {
	Processed int
	Errors    int
	Retried   int
	Duration  time.Duration
	LastError error
}

HandlerStats contains processing statistics returned by a handler.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is an in-memory implementation of Storage. It is intended for development and testing. Data is lost on restart.

func NewMemoryStorage

func NewMemoryStorage(opts ...Option) *MemoryStorage

NewMemoryStorage creates a new in-memory storage.

func (*MemoryStorage) Close

func (s *MemoryStorage) Close()

Close stops the background goroutine that returns expired tasks.

func (*MemoryStorage) DoneTask

func (s *MemoryStorage) DoneTask(_ context.Context, taskIDs []int64) error

func (*MemoryStorage) GetStat

func (s *MemoryStorage) GetStat(_ context.Context) (*Stats, error)

func (*MemoryStorage) GetTask

func (s *MemoryStorage) GetTask(_ context.Context, queueNum int, count int) ([]Task, error)

func (*MemoryStorage) PutTask

func (s *MemoryStorage) PutTask(_ context.Context, req PutTaskRequest) error

func (*MemoryStorage) RetryTask

func (s *MemoryStorage) RetryTask(_ context.Context, retries []RetryInfo) error

type Option

type Option func(*MemoryStorage)

Option configures MemoryStorage.

func WithVisibilityTimeout

func WithVisibilityTimeout(d time.Duration) Option

WithVisibilityTimeout sets the visibility timeout for taken tasks. If a task is not completed or retried within this duration, it is automatically returned to the queue with Attempts incremented.

type PutTaskRequest

type PutTaskRequest struct {
	QueueNum  int
	Data      []byte
	StartTime *time.Time
	Delay     time.Duration
}

PutTaskRequest is a request to add a new task to the queue.

type QueueStat

type QueueStat struct {
	QueueNum     int
	TotalTasks   int64
	ReadyTasks   int64
	TakenTasks   int64
	DelayedTasks int64
}

QueueStat contains statistics for a single queue.

type QueueWorker

type QueueWorker struct {
	// contains filtered or unexported fields
}

QueueWorker manages a pool of JobProcessors and distributes queue assignments using round-robin scheduling.

func NewQueueWorker

func NewQueueWorker(storage Storage, queueIDs []int, handler HandlerFunc, opts ...WorkerOption) *QueueWorker

NewQueueWorker creates a new QueueWorker.

func (*QueueWorker) GracefulStop

func (w *QueueWorker) GracefulStop(_ context.Context) (<-chan struct{}, error)

GracefulStop signals the worker to stop after completing current work. Returns a channel that is closed when all processors have finished.

func (*QueueWorker) Run

func (w *QueueWorker) Run(ctx context.Context, errGr *errgroup.Group)

Run starts the worker. It launches JobProcessors and schedules queue assignments.

func (*QueueWorker) Shutdown

func (w *QueueWorker) Shutdown(_ context.Context) error

Shutdown performs a hard shutdown of the worker.

type RetryInfo

type RetryInfo struct {
	TaskID        int64
	NextStartTime time.Time
}

RetryInfo contains information for retrying a task.

type Stats

type Stats struct {
	QueueStats map[int]QueueStat
}

Stats contains statistics for all queues.

type Storage

type Storage interface {
	// GetTask atomically takes up to count tasks from the given queue.
	// Taken tasks are marked as "in progress" and will not be returned
	// to other consumers until the visibility timeout expires.
	GetTask(ctx context.Context, queueNum int, count int) ([]Task, error)

	// DoneTask marks tasks as completed. They are removed from storage.
	DoneTask(ctx context.Context, taskIDs []int64) error

	// RetryTask returns tasks to the queue with the specified next start time.
	// Attempts is incremented, PrevStartTime is set to the current StartTime.
	RetryTask(ctx context.Context, retries []RetryInfo) error

	// PutTask adds a new task to the queue.
	PutTask(ctx context.Context, req PutTaskRequest) error

	// GetStat returns statistics for all queues.
	GetStat(ctx context.Context) (*Stats, error)
}

Storage is the interface for queue storage implementations. It provides atomic operations for task management with guaranteed delivery.

type Task

type Task struct {
	ID            int64
	QueueNum      int
	Data          []byte
	Attempts      int
	CreatedAt     time.Time
	StartTime     time.Time
	PrevStartTime time.Time
}

Task is a task retrieved from storage.

type WorkerOption

type WorkerOption func(*QueueWorker)

WorkerOption configures QueueWorker.

func WithBatchSize

func WithBatchSize(n int) WorkerOption

WithBatchSize sets the number of tasks fetched per GetTask call.

func WithMaxProcessors

func WithMaxProcessors(n int) WorkerOption

WithMaxProcessors sets the maximum number of concurrent JobProcessors.

func WithMetrics

func WithMetrics(registry *prometheus.Registry, namespace string) WorkerOption

WithMetrics enables Prometheus metrics with the given registry and namespace.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL