Documentation
¶
Overview ¶
Package queue provides a task queue with support for persistent storage and concurrent processing
Index ¶
- Constants
- type Consumer
- type Option
- func WithBufferSize(bufferSize int) Option
- func WithConsumers(consumers ...Consumer) Option
- func WithDefaultPriority(defaultPriority int) Option
- func WithDefaultRetryMax(defaultRetryMax int) Option
- func WithPollStorage(pollStorage bool) Option
- func WithPreProcessor(preProcessor PreProcessor) Option
- func WithRunImmediatePriority(runImmediatePriority int) Option
- func WithStorage(storage Storage) Option
- func WithWorkerCount(workerCount int) Option
- type PreProcessor
- type Queue
- type Result
- type Storage
- type Task
- type TaskOption
- func WithAsyncDelay(miliseconds int) TaskOption
- func WithDelayHours(delayHours int) TaskOption
- func WithDelayMinutes(delayMinutes int) TaskOption
- func WithDelaySeconds(delaySeconds int) TaskOption
- func WithPriority(priority int) TaskOption
- func WithRetryMax(retryMax int) TaskOption
- func WithSignature(signature string) TaskOption
- func WithStartTime(timestamp time.Time) TaskOption
Constants ¶
const ResultStatusError = "ERROR"
ResultStatusError represents a task that was experienced an error, but CAN be retried
const ResultStatusFailure = "FAILURE"
ResultStatusFailure represents a task that was experienced an error, and CANNOT be retried
const ResultStatusIgnored = "IGNORED"
ResultStatusIgnored represents a task that was not processed because the consumer does not recognize it. The task will be passed to another consumer, or error out permanently.
const ResultStatusRequeue = "REQUEUE"
ResultStatusRequeue represents a task that has completed successfully and should be re-executed at its same priority level. This is useful for long series of tasks that need to execute over multiple records.
const ResultStatusSuccess = "SUCCESS"
ResultStatusSuccess represents a task that was completed successfully
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶ added in v0.4.6
type Option func(*Queue)
Option is a functional option that modifies a Queue object
func WithBufferSize ¶
WithBufferSize sets the number of tasks to lock in a single transaction
func WithConsumers ¶
WithConsumers adds one or more consumers to process tasks from the Queue
func WithDefaultPriority ¶ added in v0.0.2
WithDefaultPriority sets the default priority for new tasks
func WithDefaultRetryMax ¶ added in v0.0.2
WithDefaultRetryMax sets the default number of times to retry a task before giving up
func WithPollStorage ¶
WithPollStorage sets whether the queue should poll the storage for new tasks
func WithPreProcessor ¶ added in v0.4.3
func WithPreProcessor(preProcessor PreProcessor) Option
WithPreProcessor applies a gloabl taskOption that is
func WithRunImmediatePriority ¶ added in v0.2.0
WithRunImmediatePriority sets the maximum priority level for tasks that will run immediately
func WithStorage ¶
WithStorage sets the storage and unmarshaller for the Queue
func WithWorkerCount ¶
WithWorkerCount sets the number of concurrent processes to run
type PreProcessor ¶ added in v0.4.3
PreProcessor is a custom function that can be added to the Queue. This function is executed on tasks BEFORE they are published to the Queue, and can be used to modify task properties, or to reject tasks before they are executed. If this function returns an error, then the task is rejected with a `queue.Failure` return type, and will not be retried.
PreProcessor is useful for defining centralized rules that apply to all tasks, for instance: by setting global prioritization or timing properties.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a task queue with support for persistent storage and concurrent processing
func (*Queue) NewTask ¶ added in v0.4.1
func (q *Queue) NewTask(name string, args map[string]any, options ...TaskOption)
NewTask pushes a new task to the Queue and swallows any errors that are generated.
type Result ¶ added in v0.2.0
Result is the return value from a task function
func Ignored ¶ added in v0.2.0
func Ignored() Result
Ignored returns a Result object that has been "IGNORED" This happens when a consumer does not recognize the task name
func Requeue ¶ added in v0.4.4
Requeue returns a Result object that will be "REQUEUED" which marks THIS task as successful, and also pushes a copy of this task back onto the queue to run again.
func Success ¶ added in v0.2.0
func Success() Result
Success returns a Result object with a status of "SUCCESS"
func (Result) IsSuccessful ¶ added in v0.2.0
IsSuccessful returns TRUE if the Result is a "SUCCESS" or "REQUEUE"
func (Result) NotSuccessful ¶ added in v0.2.0
NotSuccessful returns TRUE if the Result is NOT a "SUCCESS"
type Storage ¶
type Storage interface {
// GetTasks retrieves a batch of Tasks from the Storage provider
GetTasks() ([]Task, error)
// SaveTask saves a Task to the Storage provider
SaveTask(task Task) error
// DeleteTask removes a Task from the Storage provider
DeleteTask(taskID string) error
// DeleteTaskBySignature removes a Task from the Storage provider using its signature
DeleteTaskBySignature(signature string) error
// LogFailure writes a Task to the error log
LogFailure(task Task) error
}
Storage is the interface for persisting Tasks outside of memory
type Task ¶
type Task struct {
TaskID string `bson:"taskId"` // Unique identfier for this task
LockID string `bson:"lockId,omitempty"` // Unique identifier for the worker that is currently processing this task
Name string `bson:"name"` // Name of the task (used to identify the handler function)
Arguments mapof.Any `bson:"arguments"` // Data required to execute this task (marshalled as a map)
CreateDate int64 `bson:"createDate"` // Unix epoch seconds when this task was created
StartDate int64 `bson:"startDate"` // Unix epoch seconds when this task is scheduled to execute
TimeoutDate int64 `bson:"timeoutDate"` // Unix epoch seconds when this task will "time out" and can be reclaimed by another process
Priority int `bson:"priority"` // Priority of the handler, determines the order that tasks are executed in.
Signature string `bson:"signature,omitempty"` // Signature of the task. If a signature is present, then no other tasks will be allowed with this signature.
RetryCount int `bson:"retryCount"` // Number of times that this task has already been retried
RetryMax int `bson:"retryMax"` // Maximum number of times that this task can be retried
Error string `bson:"error,omitempty"` // Error (if any) from the last execution
AsyncDelay int `bson:"-"` // If non-zero, then the `Publish` method will execute in a separate goroutine, and will sleep for this many miliseconds before publishing the Task.
}
Task wraps a Task with the metadata required to track its runs and retries.
type TaskOption ¶
type TaskOption func(*Task)
TaskOption is a functional option that modifies a Task
func WithAsyncDelay ¶ added in v0.4.3
func WithAsyncDelay(miliseconds int) TaskOption
WithAsyncDelay sets a number of miliseconds to wait before publishing this task onto the queue. If this vaue is non-zero then a new goroutine will be launched that will sleep, then re-publish the task.
func WithDelayHours ¶
func WithDelayHours(delayHours int) TaskOption
WithDelayHours sets the number of hours before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithDelayMinutes ¶
func WithDelayMinutes(delayMinutes int) TaskOption
WithDelayMinutes sets the number of minutes before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithDelaySeconds ¶
func WithDelaySeconds(delaySeconds int) TaskOption
WithDelaySeconds sets the number of seconds before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithPriority ¶
func WithPriority(priority int) TaskOption
WithPriority sets the priority of the task
func WithRetryMax ¶
func WithRetryMax(retryMax int) TaskOption
WithRetryMax sets the maximum number of times that a task can be retried
func WithSignature ¶ added in v0.3.0
func WithSignature(signature string) TaskOption
WithSignature sets the signature of the task. Only one task with a given signature can be active at a time. Duplicates are dropped silently.
func WithStartTime ¶ added in v0.3.0
func WithStartTime(timestamp time.Time) TaskOption
WithStartTime sets the absolute start time of the task This differs from WithDelayXXX options, which set a start time relative to the current clock.