queue

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: Apache-2.0 Imports: 6 Imported by: 7

README

queue

A simple distributed message queue with swappable back ends.

Documentation

Overview

Package queue provides a task queue with support for persistent storage and concurrent processing

Index

Constants

View Source
const ResultStatusError = "ERROR"

ResultStatusError represents a task that was experienced an error, but CAN be retried

View Source
const ResultStatusFailure = "FAILURE"

ResultStatusFailure represents a task that was experienced an error, and CANNOT be retried

View Source
const ResultStatusIgnored = "IGNORED"

ResultStatusIgnored represents a task that was not processed because the consumer does not recognize it. The task will be passed to another consumer, or error out permanently.

View Source
const ResultStatusRequeue = "REQUEUE"

ResultStatusRequeue represents a task that has completed successfully and should be re-executed at its same priority level. This is useful for long series of tasks that need to execute over multiple records.

View Source
const ResultStatusSuccess = "SUCCESS"

ResultStatusSuccess represents a task that was completed successfully

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer func(name string, args map[string]any) Result

Consumer is a function that processes a task from the queue.

type Option added in v0.4.6

type Option func(*Queue)

Option is a functional option that modifies a Queue object

func WithBufferSize

func WithBufferSize(bufferSize int) Option

WithBufferSize sets the number of tasks to lock in a single transaction

func WithConsumers

func WithConsumers(consumers ...Consumer) Option

WithConsumers adds one or more consumers to process tasks from the Queue

func WithDefaultPriority added in v0.0.2

func WithDefaultPriority(defaultPriority int) Option

WithDefaultPriority sets the default priority for new tasks

func WithDefaultRetryMax added in v0.0.2

func WithDefaultRetryMax(defaultRetryMax int) Option

WithDefaultRetryMax sets the default number of times to retry a task before giving up

func WithPollStorage

func WithPollStorage(pollStorage bool) Option

WithPollStorage sets whether the queue should poll the storage for new tasks

func WithPreProcessor added in v0.4.3

func WithPreProcessor(preProcessor PreProcessor) Option

WithPreProcessor applies a gloabl taskOption that is

func WithRunImmediatePriority added in v0.2.0

func WithRunImmediatePriority(runImmediatePriority int) Option

WithRunImmediatePriority sets the maximum priority level for tasks that will run immediately

func WithStorage

func WithStorage(storage Storage) Option

WithStorage sets the storage and unmarshaller for the Queue

func WithWorkerCount

func WithWorkerCount(workerCount int) Option

WithWorkerCount sets the number of concurrent processes to run

type PreProcessor added in v0.4.3

type PreProcessor func(*Task) error

PreProcessor is a custom function that can be added to the Queue. This function is executed on tasks BEFORE they are published to the Queue, and can be used to modify task properties, or to reject tasks before they are executed. If this function returns an error, then the task is rejected with a `queue.Failure` return type, and will not be retried.

PreProcessor is useful for defining centralized rules that apply to all tasks, for instance: by setting global prioritization or timing properties.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a task queue with support for persistent storage and concurrent processing

func New

func New(options ...Option) *Queue

New returns a fully initialized Queue object, with all options applied

func (*Queue) Delete added in v0.4.1

func (q *Queue) Delete(signature string) error

Delete removes a task from the queue by its signature

func (*Queue) NewTask added in v0.4.1

func (q *Queue) NewTask(name string, args map[string]any, options ...TaskOption)

NewTask pushes a new task to the Queue and swallows any errors that are generated.

func (*Queue) Publish

func (q *Queue) Publish(task Task) error

Publish adds a Task to the Queue

func (*Queue) Schedule

func (q *Queue) Schedule(task Task, delay time.Duration) error

Schedule adds a Task to the Queue to be executed after a delay

func (*Queue) Stop

func (q *Queue) Stop()

Stop closes the queue and stops all workers (after they complete their current task)

type Result added in v0.2.0

type Result struct {
	Status string
	Error  error
	Delay  time.Duration
}

Result is the return value from a task function

func Error added in v0.2.0

func Error(err error) Result

Error returns a Result object with a status of "ERROR"

func Failure added in v0.2.0

func Failure(err error) Result

Failure returns a Result object with a status of "HALT"

func Ignored added in v0.2.0

func Ignored() Result

Ignored returns a Result object that has been "IGNORED" This happens when a consumer does not recognize the task name

func Requeue added in v0.4.4

func Requeue(delay time.Duration) Result

Requeue returns a Result object that will be "REQUEUED" which marks THIS task as successful, and also pushes a copy of this task back onto the queue to run again.

func Success added in v0.2.0

func Success() Result

Success returns a Result object with a status of "SUCCESS"

func (Result) IsSuccessful added in v0.2.0

func (result Result) IsSuccessful() bool

IsSuccessful returns TRUE if the Result is a "SUCCESS" or "REQUEUE"

func (Result) NotSuccessful added in v0.2.0

func (result Result) NotSuccessful() bool

NotSuccessful returns TRUE if the Result is NOT a "SUCCESS"

type Storage

type Storage interface {

	// GetTasks retrieves a batch of Tasks from the Storage provider
	GetTasks() ([]Task, error)

	// SaveTask saves a Task to the Storage provider
	SaveTask(task Task) error

	// DeleteTask removes a Task from the Storage provider
	DeleteTask(taskID string) error

	// DeleteTaskBySignature removes a Task from the Storage provider using its signature
	DeleteTaskBySignature(signature string) error

	// LogFailure writes a Task to the error log
	LogFailure(task Task) error
}

Storage is the interface for persisting Tasks outside of memory

type Task

type Task struct {
	TaskID      string    `bson:"taskId"`              // Unique identfier for this task
	LockID      string    `bson:"lockId,omitempty"`    // Unique identifier for the worker that is currently processing this task
	Name        string    `bson:"name"`                // Name of the task (used to identify the handler function)
	Arguments   mapof.Any `bson:"arguments"`           // Data required to execute this task (marshalled as a map)
	CreateDate  int64     `bson:"createDate"`          // Unix epoch seconds when this task was created
	StartDate   int64     `bson:"startDate"`           // Unix epoch seconds when this task is scheduled to execute
	TimeoutDate int64     `bson:"timeoutDate"`         // Unix epoch seconds when this task will "time out" and can be reclaimed by another process
	Priority    int       `bson:"priority"`            // Priority of the handler, determines the order that tasks are executed in.
	Signature   string    `bson:"signature,omitempty"` // Signature of the task.  If a signature is present, then no other tasks will be allowed with this signature.
	RetryCount  int       `bson:"retryCount"`          // Number of times that this task has already been retried
	RetryMax    int       `bson:"retryMax"`            // Maximum number of times that this task can be retried
	Error       string    `bson:"error,omitempty"`     // Error (if any) from the last execution
	AsyncDelay  int       `bson:"-"`                   // If non-zero, then the `Publish` method will execute in a separate goroutine, and will sleep for this many miliseconds before publishing the Task.
}

Task wraps a Task with the metadata required to track its runs and retries.

func NewTask

func NewTask(name string, arguments map[string]any, options ...TaskOption) Task

NewTask uses a Task object to create a new Task record that can be saved to a Storage provider.

func (*Task) Delay

func (task *Task) Delay(delay time.Duration)

Delay sets the time.Duration before the task is executed

type TaskOption

type TaskOption func(*Task)

TaskOption is a functional option that modifies a Task

func WithAsyncDelay added in v0.4.3

func WithAsyncDelay(miliseconds int) TaskOption

WithAsyncDelay sets a number of miliseconds to wait before publishing this task onto the queue. If this vaue is non-zero then a new goroutine will be launched that will sleep, then re-publish the task.

func WithDelayHours

func WithDelayHours(delayHours int) TaskOption

WithDelayHours sets the number of hours before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.

func WithDelayMinutes

func WithDelayMinutes(delayMinutes int) TaskOption

WithDelayMinutes sets the number of minutes before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.

func WithDelaySeconds

func WithDelaySeconds(delaySeconds int) TaskOption

WithDelaySeconds sets the number of seconds before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority sets the priority of the task

func WithRetryMax

func WithRetryMax(retryMax int) TaskOption

WithRetryMax sets the maximum number of times that a task can be retried

func WithSignature added in v0.3.0

func WithSignature(signature string) TaskOption

WithSignature sets the signature of the task. Only one task with a given signature can be active at a time. Duplicates are dropped silently.

func WithStartTime added in v0.3.0

func WithStartTime(timestamp time.Time) TaskOption

WithStartTime sets the absolute start time of the task This differs from WithDelayXXX options, which set a start time relative to the current clock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL