Documentation
¶
Overview ¶
Package groupmq provides a Redis-backed per-group FIFO queue implementation. It is a Go port of the TypeScript GroupMQ library.
Index ¶
- Constants
- Variables
- func GetWorkersStatusWithAllJobs(workers []*Worker) []struct{ ... }
- func IsValidStatus(s string) bool
- func WaitForAllWorkersIdle(ctx context.Context, workers []*Worker, timeoutMs int64) bool
- func WaitForQueueToEmpty(ctx context.Context, queue *Queue, timeoutMs int64) (bool, error)
- func WaitForWorkerIdle(ctx context.Context, worker *Worker, timeoutMs int64) bool
- type AddOptions
- type BackoffOptions
- type CompletedJobInfo
- type DefaultLogger
- type EnqueueResult
- type ErrorInfo
- type FailedJobInfo
- type Job
- func (j *Job) ChangeDelay(ctx context.Context, newDelayMs int64) (bool, error)
- func (j *Job) GetData(v interface{}) error
- func (j *Job) GetState() Status
- func (j *Job) Promote(ctx context.Context) error
- func (j *Job) Remove(ctx context.Context) error
- func (j *Job) Retry(ctx context.Context) error
- func (j *Job) ToJSON() map[string]interface{}
- func (j *Job) UnmarshalData(v interface{}) error
- func (j *Job) Update(ctx context.Context, data interface{}) error
- func (j *Job) UpdateData(ctx context.Context, data interface{}) error
- type JobCounts
- type JobEventHandler
- type JobMeta
- type JobProgress
- type JobResult
- type JobResultMeta
- type Logger
- type Metrics
- type NoOpLogger
- type ProcessorFunc
- type Queue
- func (q *Queue) AcquireSchedulerLock(ctx context.Context, ttlMs int64) (bool, error)
- func (q *Queue) Add(ctx context.Context, opts AddOptions) (*Job, error)
- func (q *Queue) AddRepeatingJob(ctx context.Context, opts AddOptions) (*Job, error)
- func (q *Queue) ChangeDelay(ctx context.Context, jobId string, newDelayMs int64) (bool, error)
- func (q *Queue) CheckStalled(ctx context.Context, gracePeriodMs, maxStalledCount int64) ([]StalledJobResult, error)
- func (q *Queue) Clean(ctx context.Context, graceTimeMs int64, limit int, status Status) (int64, error)
- func (q *Queue) Cleanup(ctx context.Context) (int64, error)
- func (q *Queue) CleanupPoisonedGroup(ctx context.Context, groupId string) (string, error)
- func (q *Queue) Close(ctx context.Context) error
- func (q *Queue) Complete(ctx context.Context, jobId, groupId string) error
- func (q *Queue) CompleteAndReserveNext(ctx context.Context, completedJobId, groupId string, result interface{}, ...) (*ReservedJob, error)
- func (q *Queue) CompleteWithMetadata(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error
- func (q *Queue) DeadLetter(ctx context.Context, jobId, groupId string) error
- func (q *Queue) GetActiveCount(ctx context.Context) (int64, error)
- func (q *Queue) GetActiveJobs(ctx context.Context) ([]string, error)
- func (q *Queue) GetCompleted(ctx context.Context, limit int) ([]CompletedJobInfo, error)
- func (q *Queue) GetCompletedCount(ctx context.Context) (int64, error)
- func (q *Queue) GetCompletedJobs(ctx context.Context, limit int) ([]*Job, error)
- func (q *Queue) GetDelayedCount(ctx context.Context) (int64, error)
- func (q *Queue) GetDelayedJobs(ctx context.Context) ([]string, error)
- func (q *Queue) GetFailed(ctx context.Context, limit int) ([]FailedJobInfo, error)
- func (q *Queue) GetFailedCount(ctx context.Context) (int64, error)
- func (q *Queue) GetFailedJobs(ctx context.Context, limit int) ([]*Job, error)
- func (q *Queue) GetGroupJobCount(ctx context.Context, groupId string) (int64, error)
- func (q *Queue) GetJob(ctx context.Context, jobId string) (*Job, error)
- func (q *Queue) GetJobCounts(ctx context.Context) (*JobCounts, error)
- func (q *Queue) GetJobsByStatus(ctx context.Context, statuses []Status, start, end int) ([]*Job, error)
- func (q *Queue) GetUniqueGroups(ctx context.Context) ([]string, error)
- func (q *Queue) GetUniqueGroupsCount(ctx context.Context) (int64, error)
- func (q *Queue) GetWaitingCount(ctx context.Context) (int64, error)
- func (q *Queue) GetWaitingJobs(ctx context.Context) ([]string, error)
- func (q *Queue) Heartbeat(ctx context.Context, jobId, groupId string, extendMs int64) error
- func (q *Queue) IsEmpty(ctx context.Context) (bool, error)
- func (q *Queue) IsJobProcessing(ctx context.Context, jobId string) (bool, error)
- func (q *Queue) IsPaused(ctx context.Context) (bool, error)
- func (q *Queue) JobTimeoutMs() int64
- func (q *Queue) MaxAttempts() int
- func (q *Queue) Name() string
- func (q *Queue) Namespace() string
- func (q *Queue) Pause(ctx context.Context) error
- func (q *Queue) ProcessRepeatingJobsBounded(ctx context.Context, limit int, now int64) (int64, error)
- func (q *Queue) Promote(ctx context.Context, jobId string) error
- func (q *Queue) PromoteDelayedJobs(ctx context.Context) (int64, error)
- func (q *Queue) PromoteDelayedJobsBounded(ctx context.Context, limit int, now int64) (int64, error)
- func (q *Queue) PromoteStaged(ctx context.Context) (int64, error)
- func (q *Queue) PromoteStagedWithLimit(ctx context.Context, limit int) (int64, error)
- func (q *Queue) RawNamespace() string
- func (q *Queue) RecordAttemptFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error
- func (q *Queue) RecordCompleted(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error
- func (q *Queue) RecordFinalFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error
- func (q *Queue) Redis() redis.Cmdable
- func (q *Queue) ReleaseJob(ctx context.Context, jobId string) error
- func (q *Queue) Remove(ctx context.Context, jobId string) error
- func (q *Queue) RemoveRepeatingJob(ctx context.Context, groupId string, repeat *RepeatOptions) (bool, error)
- func (q *Queue) Reserve(ctx context.Context) (*ReservedJob, error)
- func (q *Queue) ReserveAtomic(ctx context.Context, groupId string) (*ReservedJob, error)
- func (q *Queue) ReserveBatch(ctx context.Context, maxBatch int) ([]*ReservedJob, error)
- func (q *Queue) ReserveBlocking(ctx context.Context, timeoutSec float64) (*ReservedJob, error)
- func (q *Queue) ReserveBlockingWithOptions(ctx context.Context, timeoutSec float64, blockUntil *int64, ...) (*ReservedJob, error)
- func (q *Queue) Resume(ctx context.Context) error
- func (q *Queue) Retry(ctx context.Context, jobId string, backoffMs int64) (int, error)
- func (q *Queue) RunSchedulerOnce(ctx context.Context) error
- func (q *Queue) StartPromoter(ctx context.Context) error
- func (q *Queue) StartPromoterWithInterval(ctx context.Context, intervalMs int64) error
- func (q *Queue) StopPromoter()
- func (q *Queue) UpdateData(ctx context.Context, jobId string, data interface{}) error
- func (q *Queue) WaitForEmpty(ctx context.Context, timeoutMs int64) (bool, error)
- type QueueEventHandler
- type QueueOption
- func WithAutoBatch(enabled bool) QueueOption
- func WithBatchMaxWaitMs(ms int64) QueueOption
- func WithBatchSize(size int) QueueOption
- func WithJobTimeoutMs(ms int64) QueueOption
- func WithKeepCompleted(count int) QueueOption
- func WithKeepFailed(count int) QueueOption
- func WithOrderingDelayMs(ms int64) QueueOption
- func WithSchedulerLockTtlMs(ms int64) QueueOption
- type QueueOptions
- type RepeatOptions
- type ReservedJob
- type ScriptLoader
- func (l *ScriptLoader) EvalScript(ctx context.Context, name ScriptName, keys []string, args ...interface{}) *redis.Cmd
- func (l *ScriptLoader) EvalScriptResult(ctx context.Context, name ScriptName, keys []string, args ...interface{}) (interface{}, error)
- func (l *ScriptLoader) LoadScript(ctx context.Context, name ScriptName) (string, error)
- func (l *ScriptLoader) PreloadScripts(ctx context.Context) error
- type ScriptName
- type StalledJobResult
- type Status
- type Worker
- func (w *Worker) Close(ctx context.Context) error
- func (w *Worker) GetCurrentJob() *ReservedJob
- func (w *Worker) GetCurrentJobs() []JobProgress
- func (w *Worker) GetWorkerMetrics() WorkerMetrics
- func (w *Worker) IsClosed() bool
- func (w *Worker) IsProcessing() bool
- func (w *Worker) Run(ctx context.Context) error
- func (w *Worker) Stop()
- func (w *Worker) TotalProcessed() int64
- func (w *Worker) Wait()
- type WorkerEventHandler
- type WorkerJobInfo
- type WorkerMetrics
- type WorkerOption
- func WithBackoff(opts BackoffOptions) WorkerOption
- func WithBlockingClient(client redis.Cmdable) WorkerOption
- func WithCleanupIntervalMs(ms int64) WorkerOption
- func WithCompletedHandler(fn func(job *Job, result interface{})) WorkerOption
- func WithConcurrency(n int) WorkerOption
- func WithEnableCleanup(enabled bool) WorkerOption
- func WithErrorHandler(fn func(err error, job *ReservedJob)) WorkerOption
- func WithFailedHandler(fn func(job *Job, err error)) WorkerOption
- func WithGracePeriod(ms int64) WorkerOption
- func WithHeartbeatMs(ms int64) WorkerOption
- func WithLogger(logger Logger) WorkerOption
- func WithMaxAttempts(n int) WorkerOption
- func WithMaxStalledCount(n int64) WorkerOption
- func WithName(name string) WorkerOption
- func WithSchedulerIntervalMs(ms int64) WorkerOption
- func WithStalledHandler(fn func(jobId, groupId string)) WorkerOption
- func WithStalledInterval(ms int64) WorkerOption
- type WorkerOptions
- type WorkerStatusInfo
- type WorkersStatusSummary
Constants ¶
const ( BackoffTypeExponential = "exponential" BackoffTypeLinear = "linear" BackoffTypeFixed = "fixed" )
Backoff type constants
Variables ¶
var BaseEpoch = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
BaseEpoch is the reference timestamp for score calculations (2020-01-01 UTC)
Functions ¶
func GetWorkersStatusWithAllJobs ¶
func GetWorkersStatusWithAllJobs(workers []*Worker) []struct { Index int IsProcessing bool Jobs []WorkerJobInfo }
GetWorkersStatusWithAllJobs returns detailed status with all jobs per worker This is an enhanced version that exposes Go's concurrent job processing capability
func IsValidStatus ¶
IsValidStatus checks if a status string is a valid Status constant
func WaitForAllWorkersIdle ¶
WaitForAllWorkersIdle waits for all workers to become idle Returns true if all workers became idle before timeout, false otherwise
func WaitForQueueToEmpty ¶
WaitForQueueToEmpty waits for a queue to become empty This is a convenience wrapper around Queue.WaitForEmpty Aligns with TypeScript's waitForQueueToEmpty() helper
Types ¶
type AddOptions ¶
type AddOptions struct {
// GroupId for FIFO ordering - jobs in same group are processed sequentially
GroupId string
// Job data payload (will be JSON serialized)
Data any
// Custom job name (default: "groupmq")
Name string
// Timestamp in milliseconds for ordering within the group
// Jobs with lower orderMs are processed first (default: current time)
OrderMs int64
// Delay in milliseconds before the job becomes visible
Delay int64
// Absolute timestamp when the job should run (overrides Delay)
RunAt int64
// Custom job ID for idempotence - if provided and job exists, returns existing job
JobId string
// Maximum retry attempts for this job (overrides queue default)
MaxAttempts int
// Repeat options for recurring jobs
Repeat *RepeatOptions
}
AddOptions configures how a job is added to the queue
type BackoffOptions ¶
type BackoffOptions struct {
// Type of backoff: "exponential", "linear", "fixed"
Type string
// Initial delay in milliseconds (default: 1000)
Delay int64
// Maximum delay in milliseconds (default: 30000)
MaxDelay int64
// Multiplier for exponential backoff (default: 2)
Factor float64
// Whether to add random jitter to delays (default: true)
Jitter bool
}
BackoffOptions configures retry backoff behavior
func DefaultBackoffOptions ¶
func DefaultBackoffOptions() BackoffOptions
DefaultBackoffOptions returns the default backoff options
type CompletedJobInfo ¶
type CompletedJobInfo struct {
ID string
GroupId string
Data json.RawMessage
Returnvalue json.RawMessage
ProcessedOn int64
FinishedOn int64
Attempts int
MaxAttempts int
}
CompletedJobInfo represents a completed job with metadata
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
DefaultLogger is a simple logger implementation with optional enable/disable This aligns with TypeScript groupmq's Logger class
func NewDefaultLogger ¶
func NewDefaultLogger(enabled bool, name string) *DefaultLogger
NewDefaultLogger creates a new default logger enabled: whether logging is enabled name: logger name (e.g., "Queue", "Worker")
func (*DefaultLogger) Error ¶
func (l *DefaultLogger) Error(msg string)
Error logs an error message with emoji prefix Error logs are always printed regardless of enabled flag
func (*DefaultLogger) Warn ¶
func (l *DefaultLogger) Warn(msg string)
Warn logs a warning message with emoji prefix
type EnqueueResult ¶
type EnqueueResult struct {
JobId string
Status string // "added", "duplicate", "delayed", "staged"
Duplicate bool
}
EnqueueResult represents the result of adding a job
type ErrorInfo ¶
type ErrorInfo struct {
Message string `json:"message"`
Name string `json:"name"`
Stack string `json:"stack"`
}
ErrorInfo represents error information for failed jobs
type FailedJobInfo ¶
type FailedJobInfo struct {
ID string
GroupId string
Data json.RawMessage
FailedReason string
Stacktrace string
ProcessedOn int64
FinishedOn int64
Attempts int
MaxAttempts int
}
FailedJobInfo represents a failed job with metadata
type Job ¶
type Job struct {
// Core fields
ID string `json:"id"`
Name string `json:"name"`
GroupId string `json:"groupId"`
Data json.RawMessage `json:"data"`
Attempts int `json:"attemptsMade"`
MaxAttempts int `json:"maxAttempts"`
// Timing fields
Timestamp int64 `json:"timestamp"` // Creation timestamp in ms
OrderMs int64 `json:"orderMs"` // Ordering timestamp in ms
DelayUntil int64 `json:"delayUntil"` // Delay until timestamp in ms
ProcessedOn int64 `json:"processedOn"` // Processing start timestamp in ms
FinishedOn int64 `json:"finishedOn"` // Completion timestamp in ms
// Status fields
Status Status `json:"status"`
FailedReason string `json:"failedReason"`
Stacktrace string `json:"stacktrace"`
Returnvalue any `json:"returnvalue"`
// contains filtered or unexported fields
}
Job represents a job in the queue with full metadata
func FromRawHash ¶
FromRawHash creates a Job from raw Redis hash data with optional known status This avoids extra Redis lookups when status is already known Aligns with TypeScript's Job.fromRawHash()
func FromReserved ¶
func FromReserved(queue *Queue, reserved *ReservedJob, meta *JobResultMeta) *Job
FromReserved creates a Job from a ReservedJob
func FromStore ¶
FromStore loads a Job from Redis storage by ID This performs additional Redis lookups to determine the current status Aligns with TypeScript's Job.fromStore()
func (*Job) ChangeDelay ¶
ChangeDelay changes the delay of a delayed job
func (*Job) UnmarshalData ¶
UnmarshalData is an alias for GetData
type JobCounts ¶
type JobCounts struct {
Active int64 `json:"active"`
Waiting int64 `json:"waiting"`
Delayed int64 `json:"delayed"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
Paused int64 `json:"paused"`
WaitingChildren int64 `json:"waiting-children"`
Prioritized int64 `json:"prioritized"`
}
JobCounts represents counts of jobs by status
type JobEventHandler ¶
type JobEventHandler interface {
OnCompleted(job *Job, result any)
OnFailed(job *Job, err error)
OnStalled(job *Job)
OnProgress(job *Job, progress int)
}
JobEventHandler handles job lifecycle events
type JobProgress ¶
type JobProgress struct {
Job *ReservedJob
ProcessingTimeMs int64
}
JobProgress represents a job being processed
type JobResult ¶
type JobResult struct {
JobId string
GroupId string
Status Status
Returnvalue any
FinishedOn int64
Error error
}
JobResult represents the result of a completed job
type JobResultMeta ¶
type JobResultMeta struct {
ProcessedOn int64
FinishedOn int64
FailedReason string
Stacktrace string
Returnvalue any
Status Status
DelayMs int64
}
JobResultMeta contains optional metadata for job results
type Logger ¶
Logger is a generic logger interface that works with different logger implementations This aligns with TypeScript groupmq's LoggerInterface
type Metrics ¶
type Metrics struct {
WaitingCount int64
ActiveCount int64
DelayedCount int64
CompletedCount int64
FailedCount int64
GroupCount int64
ProcessingCount int64
}
Metrics provides queue statistics
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger is a logger that does nothing Useful for testing or when logging is explicitly disabled
type ProcessorFunc ¶
type ProcessorFunc func(ctx context.Context, job *ReservedJob) (interface{}, error)
ProcessorFunc is the function signature for job handlers
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a GroupMQ queue with per-group FIFO ordering
func NewQueue ¶
func NewQueue(client redis.Cmdable, opts QueueOptions) *Queue
NewQueue creates a new Queue instance
func (*Queue) AcquireSchedulerLock ¶
AcquireSchedulerLock acquires the scheduler lock
func (*Queue) AddRepeatingJob ¶
AddRepeatingJob adds a repeating job (cron or interval-based)
func (*Queue) ChangeDelay ¶
ChangeDelay changes the delay of a specific job
func (*Queue) CheckStalled ¶
func (q *Queue) CheckStalled(ctx context.Context, gracePeriodMs, maxStalledCount int64) ([]StalledJobResult, error)
CheckStalled checks for stalled jobs and recovers them
func (*Queue) Clean ¶
func (q *Queue) Clean(ctx context.Context, graceTimeMs int64, limit int, status Status) (int64, error)
Clean removes old completed or failed jobs graceTimeMs: remove jobs with finishedOn <= now - graceTimeMs limit: max number of jobs to clean in one call status: "completed", "failed", or "delayed"
func (*Queue) CleanupPoisonedGroup ¶
CleanupPoisonedGroup removes problematic groups from ready queue
func (*Queue) CompleteAndReserveNext ¶
func (q *Queue) CompleteAndReserveNext(ctx context.Context, completedJobId, groupId string, result interface{}, meta JobMeta) (*ReservedJob, error)
CompleteAndReserveNext atomically completes a job and reserves the next one from the same group
func (*Queue) CompleteWithMetadata ¶
func (q *Queue) CompleteWithMetadata(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error
CompleteWithMetadata completes a job and records metadata atomically
func (*Queue) DeadLetter ¶
DeadLetter moves a job to the dead letter queue
func (*Queue) GetActiveCount ¶
GetActiveCount returns the number of jobs currently being processed
func (*Queue) GetActiveJobs ¶
GetActiveJobs returns list of active job IDs
func (*Queue) GetCompleted ¶
GetCompleted returns completed jobs with details
func (*Queue) GetCompletedCount ¶
GetCompletedCount returns the number of completed jobs
func (*Queue) GetCompletedJobs ¶
GetCompletedJobs returns completed jobs as Job entities
func (*Queue) GetDelayedCount ¶
GetDelayedCount returns the number of delayed jobs
func (*Queue) GetDelayedJobs ¶
GetDelayedJobs returns list of delayed job IDs
func (*Queue) GetFailedCount ¶
GetFailedCount returns the number of failed jobs
func (*Queue) GetFailedJobs ¶
GetFailedJobs returns failed jobs as Job entities
func (*Queue) GetGroupJobCount ¶
GetGroupJobCount returns the number of jobs in a specific group
func (*Queue) GetJobCounts ¶
GetJobCounts returns counts structured like BullBoard expects
func (*Queue) GetJobsByStatus ¶
func (q *Queue) GetJobsByStatus(ctx context.Context, statuses []Status, start, end int) ([]*Job, error)
GetJobsByStatus fetches jobs by statuses (like BullMQ's Queue.getJobs API)
func (*Queue) GetUniqueGroups ¶
GetUniqueGroups returns list of unique group IDs that have jobs
func (*Queue) GetUniqueGroupsCount ¶
GetUniqueGroupsCount returns the number of unique groups with jobs
func (*Queue) GetWaitingCount ¶
GetWaitingCount returns the number of jobs waiting to be processed
func (*Queue) GetWaitingJobs ¶
GetWaitingJobs returns list of waiting job IDs
func (*Queue) IsJobProcessing ¶
IsJobProcessing checks if a job is currently in processing state
func (*Queue) JobTimeoutMs ¶
JobTimeoutMs returns the job timeout in milliseconds
func (*Queue) MaxAttempts ¶
MaxAttempts returns the default max attempts
func (*Queue) ProcessRepeatingJobsBounded ¶
func (q *Queue) ProcessRepeatingJobsBounded(ctx context.Context, limit int, now int64) (int64, error)
ProcessRepeatingJobsBounded processes up to limit repeating jobs
func (*Queue) PromoteDelayedJobs ¶
PromoteDelayedJobs promotes delayed jobs that are ready
func (*Queue) PromoteDelayedJobsBounded ¶
PromoteDelayedJobsBounded promotes up to limit delayed jobs
func (*Queue) PromoteStaged ¶
PromoteStaged promotes staged jobs (for orderingDelayMs) limit: maximum number of jobs to promote in one batch (default: 100, matching TypeScript)
func (*Queue) PromoteStagedWithLimit ¶
PromoteStagedWithLimit promotes staged jobs with custom limit
func (*Queue) RawNamespace ¶
RawNamespace returns the raw namespace without prefix
func (*Queue) RecordAttemptFailure ¶
func (q *Queue) RecordAttemptFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error
RecordAttemptFailure records a failure attempt (non-final)
func (*Queue) RecordCompleted ¶
func (q *Queue) RecordCompleted(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error
RecordCompleted records a successful completion for retention
func (*Queue) RecordFinalFailure ¶
func (q *Queue) RecordFinalFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error
RecordFinalFailure records a final failure (dead-lettered)
func (*Queue) ReleaseJob ¶
ReleaseJob releases a job that was reserved but not processed (e.g., worker stopped) This removes the job from processing and active list, and puts it back in the group queue
func (*Queue) RemoveRepeatingJob ¶
func (q *Queue) RemoveRepeatingJob(ctx context.Context, groupId string, repeat *RepeatOptions) (bool, error)
RemoveRepeatingJob removes a repeating job
func (*Queue) Reserve ¶
func (q *Queue) Reserve(ctx context.Context) (*ReservedJob, error)
Reserve reserves the next available job for processing
func (*Queue) ReserveAtomic ¶
ReserveAtomic reserves a job from a specific group atomically
func (*Queue) ReserveBatch ¶
ReserveBatch reserves multiple jobs at once
func (*Queue) ReserveBlocking ¶
ReserveBlocking blocks until a job is available or timeout This is the high-performance blocking reserve similar to TypeScript version
func (*Queue) ReserveBlockingWithOptions ¶
func (q *Queue) ReserveBlockingWithOptions(ctx context.Context, timeoutSec float64, blockUntil *int64, blockingClient redis.Cmdable) (*ReservedJob, error)
ReserveBlockingWithOptions blocks with adaptive timeout and optional separate client blockUntil: optional timestamp for adaptive timeout calculation (can be nil) blockingClient: optional separate Redis client for blocking operations (can be nil)
func (*Queue) RunSchedulerOnce ¶
RunSchedulerOnce runs the scheduler once
func (*Queue) StartPromoter ¶
StartPromoter starts the promoter service with default interval (100ms) This matches TypeScript's startPromoter() with 100ms polling interval
func (*Queue) StartPromoterWithInterval ¶
StartPromoterWithInterval starts the promoter service with custom interval This is the full-featured version, matching TypeScript's internal implementation with distributed locking
func (*Queue) UpdateData ¶
UpdateData updates a job's data payload
type QueueEventHandler ¶
type QueueEventHandler interface {
OnError(err error)
OnReady()
OnPaused()
OnResumed()
OnDrained()
}
QueueEventHandler handles queue lifecycle events
type QueueOption ¶
type QueueOption func(*QueueOptions)
QueueOption is a functional option for queue configuration
func WithAutoBatch ¶
func WithAutoBatch(enabled bool) QueueOption
WithAutoBatch enables auto-batching
func WithBatchMaxWaitMs ¶
func WithBatchMaxWaitMs(ms int64) QueueOption
WithBatchMaxWaitMs sets the maximum wait time before flushing a batch
func WithBatchSize ¶
func WithBatchSize(size int) QueueOption
WithBatchSize sets the maximum jobs per batch
func WithJobTimeoutMs ¶
func WithJobTimeoutMs(ms int64) QueueOption
WithJobTimeoutMs sets the job timeout
func WithKeepCompleted ¶
func WithKeepCompleted(count int) QueueOption
WithKeepCompleted sets the number of completed jobs to keep
func WithKeepFailed ¶
func WithKeepFailed(count int) QueueOption
WithKeepFailed sets the number of failed jobs to keep
func WithOrderingDelayMs ¶
func WithOrderingDelayMs(ms int64) QueueOption
WithOrderingDelayMs sets the ordering delay
func WithSchedulerLockTtlMs ¶
func WithSchedulerLockTtlMs(ms int64) QueueOption
WithSchedulerLockTtlMs sets the scheduler lock TTL
type QueueOptions ¶
type QueueOptions struct {
// Namespace prefix for all Redis keys (default: "gmq")
Namespace string
// Job visibility timeout in milliseconds - how long before a reserved job
// is considered stalled and can be picked up by another worker (default: 30000)
JobTimeoutMs int64
// Delay in milliseconds before newly added jobs become visible (default: 0)
// Useful for ensuring FIFO ordering when multiple producers add jobs concurrently
OrderingDelayMs int64
// Enable auto-batching for high throughput (default: false)
AutoBatch bool
// Maximum jobs per batch when AutoBatch is enabled (default: 10)
AutoBatchMaxJobs int
// Maximum wait time in milliseconds before flushing a batch (default: 10)
AutoBatchMaxWaitMs int64
// Default maximum retry attempts for jobs (default: 3)
DefaultMaxAttempts int
// Whether to keep completed jobs for inspection (default: false)
KeepCompleted bool
// How many completed jobs to keep (default: 1000)
KeepCompletedCount int
// How long to keep completed jobs in seconds (default: 86400 = 1 day)
KeepCompletedAge int64
// Whether to keep failed jobs for inspection (default: true)
KeepFailed bool
// How many failed jobs to keep (default: 1000)
KeepFailedCount int
// How long to keep failed jobs in seconds (default: 604800 = 7 days)
KeepFailedAge int64
// Scheduler lock TTL in milliseconds (default: 1500)
// Controls how long the scheduler lock is held, preventing concurrent scheduler runs
SchedulerLockTtlMs int64
// Logger for queue operations and debugging
// Can be nil (no logging), true (default logger), or a custom Logger instance
Logger interface{} // nil, bool, or Logger
}
QueueOptions configures a Queue instance
func DefaultQueueOptions ¶
func DefaultQueueOptions() QueueOptions
DefaultQueueOptions returns the default queue options
type RepeatOptions ¶
type RepeatOptions struct {
// Cron expression for scheduling (e.g., "0 * * * *" for every hour)
Cron string
// Repeat every N milliseconds
Every int64
// Maximum number of repetitions
Limit int
// End date for repetition
EndDate time.Time
}
RepeatOptions configures job repetition
type ReservedJob ¶
type ReservedJob struct {
ID string
GroupId string
Data []byte // Raw JSON data
Attempts int
MaxAttempts int
Seq int64
Timestamp int64 // Job creation timestamp in ms
OrderMs int64 // Ordering timestamp in ms
Score int64 // Calculated score for ordering
Deadline int64 // Processing deadline timestamp in ms
}
ReservedJob represents a job that has been reserved for processing
func (*ReservedJob) UnmarshalData ¶
func (r *ReservedJob) UnmarshalData(v interface{}) error
UnmarshalData unmarshals the job data into the provided interface
type ScriptLoader ¶
type ScriptLoader struct {
// contains filtered or unexported fields
}
ScriptLoader manages loading and caching of Lua scripts
func NewScriptLoader ¶
func NewScriptLoader(client redis.Cmdable) *ScriptLoader
NewScriptLoader creates a new script loader
func (*ScriptLoader) EvalScript ¶
func (l *ScriptLoader) EvalScript(ctx context.Context, name ScriptName, keys []string, args ...interface{}) *redis.Cmd
EvalScript evaluates a Lua script with the given arguments
func (*ScriptLoader) EvalScriptResult ¶
func (l *ScriptLoader) EvalScriptResult(ctx context.Context, name ScriptName, keys []string, args ...interface{}) (interface{}, error)
EvalScriptResult evaluates a Lua script and returns the result
func (*ScriptLoader) LoadScript ¶
func (l *ScriptLoader) LoadScript(ctx context.Context, name ScriptName) (string, error)
LoadScript loads a Lua script and returns its SHA
func (*ScriptLoader) PreloadScripts ¶
func (l *ScriptLoader) PreloadScripts(ctx context.Context) error
PreloadScripts preloads all scripts into Redis cache
type ScriptName ¶
type ScriptName string
ScriptName represents the name of a Lua script
const ( ScriptEnqueue ScriptName = "enqueue" ScriptEnqueueBatch ScriptName = "enqueue-batch" ScriptReserve ScriptName = "reserve" ScriptReserveBatch ScriptName = "reserve-batch" ScriptReserveAtomic ScriptName = "reserve-atomic" ScriptComplete ScriptName = "complete" ScriptCompleteAndReserveNextWithMeta ScriptName = "complete-and-reserve-next-with-metadata" ScriptCompleteWithMetadata ScriptName = "complete-with-metadata" ScriptRetry ScriptName = "retry" ScriptReleaseJob ScriptName = "release-job" ScriptHeartbeat ScriptName = "heartbeat" ScriptCleanup ScriptName = "cleanup" ScriptPromoteDelayedJobs ScriptName = "promote-delayed-jobs" ScriptPromoteDelayedOne ScriptName = "promote-delayed-one" ScriptPromoteStaged ScriptName = "promote-staged" ScriptChangeDelay ScriptName = "change-delay" ScriptGetActiveCount ScriptName = "get-active-count" ScriptGetWaitingCount ScriptName = "get-waiting-count" ScriptGetDelayedCount ScriptName = "get-delayed-count" ScriptGetCompletedCount ScriptName = "get-completed-count" ScriptGetFailedCount ScriptName = "get-failed-count" ScriptGetActiveJobs ScriptName = "get-active-jobs" ScriptGetWaitingJobs ScriptName = "get-waiting-jobs" ScriptGetDelayedJobs ScriptName = "get-delayed-jobs" ScriptGetUniqueGroups ScriptName = "get-unique-groups" ScriptGetUniqueGroupsCount ScriptName = "get-unique-groups-count" ScriptCleanupPoisonedGroup ScriptName = "cleanup-poisoned-group" ScriptRemove ScriptName = "remove" ScriptCleanStatus ScriptName = "clean-status" ScriptIsEmpty ScriptName = "is-empty" ScriptDeadLetter ScriptName = "dead-letter" ScriptRecordJobResult ScriptName = "record-job-result" ScriptCheckStalled ScriptName = "check-stalled" )
type StalledJobResult ¶
type StalledJobResult struct {
JobId string
GroupId string
Action string // "recovered" or "failed"
}
StalledJobResult represents the result of stalled job detection
type Status ¶
type Status string
Status represents the state of a job in the queue system This aligns with TypeScript groupmq's status.ts for consistency
const ( // StatusLatest represents the most recent jobs (BullBoard compatibility) StatusLatest Status = "latest" // StatusActive represents jobs currently being processed StatusActive Status = "active" // StatusWaiting represents jobs waiting to be processed StatusWaiting Status = "waiting" // StatusWaitingChildren represents jobs waiting for child jobs to complete StatusWaitingChildren Status = "waiting-children" // StatusPrioritized represents high-priority waiting jobs StatusPrioritized Status = "prioritized" // StatusCompleted represents successfully completed jobs StatusCompleted Status = "completed" // StatusFailed represents jobs that failed after all retry attempts StatusFailed Status = "failed" // StatusDelayed represents jobs scheduled to run in the future StatusDelayed Status = "delayed" // StatusPaused represents jobs in a paused queue StatusPaused Status = "paused" // StatusProcessing is an alias for StatusActive (internal use) StatusProcessing Status = "processing" // StatusUnknown represents jobs in an unknown or invalid state StatusUnknown Status = "unknown" )
Job status constants - matches TypeScript STATUS enum
func AllStatuses ¶
func AllStatuses() []Status
AllStatuses returns all valid job statuses Useful for iteration and validation
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes jobs from a Queue
func NewWorker ¶
func NewWorker(queue *Queue, handler ProcessorFunc, opts ...WorkerOption) *Worker
NewWorker creates a new Worker instance
func (*Worker) Close ¶
Close gracefully closes the worker with a timeout This is a unified API similar to TypeScript's close() method
func (*Worker) GetCurrentJob ¶
func (w *Worker) GetCurrentJob() *ReservedJob
GetCurrentJob returns the current job being processed by the worker (for single concurrency)
func (*Worker) GetCurrentJobs ¶
func (w *Worker) GetCurrentJobs() []JobProgress
GetCurrentJobs returns all currently processing jobs
func (*Worker) GetWorkerMetrics ¶
func (w *Worker) GetWorkerMetrics() WorkerMetrics
GetWorkerMetrics returns comprehensive worker performance metrics
func (*Worker) IsProcessing ¶
IsProcessing returns true if the worker is currently processing jobs
func (*Worker) TotalProcessed ¶
TotalProcessed returns the total number of jobs processed
type WorkerEventHandler ¶
type WorkerEventHandler interface {
OnReady()
OnClosed()
OnError(err error)
OnStalled(jobId string, groupId string)
}
WorkerEventHandler handles worker lifecycle events
type WorkerJobInfo ¶
type WorkerJobInfo struct {
JobID string `json:"jobId"`
GroupID string `json:"groupId"`
ProcessingTimeMs int64 `json:"processingTimeMs"`
}
WorkerJobInfo contains information about a job being processed by a worker
type WorkerMetrics ¶
type WorkerMetrics struct {
Name string
TotalJobsProcessed int64
LastJobPickupTime time.Time
TimeSinceLastJobMs int64
IsProcessing bool
JobsInProgressCount int
ConsecutiveEmptyReserves int
LastActivityTime time.Time
JobsInProgress []JobProgress
}
WorkerMetrics contains worker performance metrics
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption configures a Worker
func WithBackoff ¶
func WithBackoff(opts BackoffOptions) WorkerOption
WithBackoff sets the backoff strategy
func WithBlockingClient ¶
func WithBlockingClient(client redis.Cmdable) WorkerOption
WithBlockingClient sets a dedicated Redis client for blocking operations
func WithCleanupIntervalMs ¶
func WithCleanupIntervalMs(ms int64) WorkerOption
WithCleanupIntervalMs sets the cleanup interval
func WithCompletedHandler ¶
func WithCompletedHandler(fn func(job *Job, result interface{})) WorkerOption
WithCompletedHandler sets the completion handler
func WithConcurrency ¶
func WithConcurrency(n int) WorkerOption
WithConcurrency sets the number of concurrent job processors
func WithEnableCleanup ¶
func WithEnableCleanup(enabled bool) WorkerOption
WithEnableCleanup enables or disables automatic cleanup
func WithErrorHandler ¶
func WithErrorHandler(fn func(err error, job *ReservedJob)) WorkerOption
WithErrorHandler sets the error handler
func WithFailedHandler ¶
func WithFailedHandler(fn func(job *Job, err error)) WorkerOption
WithFailedHandler sets the failure handler
func WithGracePeriod ¶
func WithGracePeriod(ms int64) WorkerOption
WithGracePeriod sets the grace period before considering a job stalled
func WithHeartbeatMs ¶
func WithHeartbeatMs(ms int64) WorkerOption
WithHeartbeatMs sets the heartbeat interval
func WithMaxAttempts ¶
func WithMaxAttempts(n int) WorkerOption
WithMaxAttempts sets the maximum retry attempts
func WithMaxStalledCount ¶
func WithMaxStalledCount(n int64) WorkerOption
WithMaxStalledCount sets the max stalled count before failing
func WithSchedulerIntervalMs ¶
func WithSchedulerIntervalMs(ms int64) WorkerOption
WithSchedulerIntervalMs sets the scheduler interval in milliseconds
func WithStalledHandler ¶
func WithStalledHandler(fn func(jobId, groupId string)) WorkerOption
WithStalledHandler sets the stalled job handler
func WithStalledInterval ¶
func WithStalledInterval(ms int64) WorkerOption
WithStalledInterval sets the stalled check interval
type WorkerOptions ¶
type WorkerOptions struct {
// Number of concurrent job processors (default: 1)
Concurrency int
// Heartbeat interval in milliseconds to extend job visibility (default: 5000)
HeartbeatMs int64
// Backoff strategy for retries
Backoff BackoffOptions
// Interval in milliseconds for checking stalled jobs (default: 30000)
StalledInterval int64
// Maximum times a job can be stalled before failing (default: 1)
MaxStalledCount int
// Grace period in milliseconds before considering a job stalled (default: 0)
StalledGracePeriod int64
// Blocking timeout in seconds for BRPOP-style waiting (default: 5)
BlockingTimeoutSec int
// Whether to auto-run the worker on creation (default: true)
AutoRun bool
// Batch size for reserving jobs (default: 1, max: concurrency)
BatchSize int
}
WorkerOptions configures a Worker instance
func DefaultWorkerOptions ¶
func DefaultWorkerOptions() WorkerOptions
DefaultWorkerOptions returns the default worker options
type WorkerStatusInfo ¶
type WorkerStatusInfo struct {
Index int `json:"index"`
IsProcessing bool `json:"isProcessing"`
CurrentJob *WorkerJobInfo `json:"currentJob,omitempty"`
}
WorkerStatusInfo contains status information for a single worker
type WorkersStatusSummary ¶
type WorkersStatusSummary struct {
Total int `json:"total"`
Processing int `json:"processing"`
Idle int `json:"idle"`
Workers []WorkerStatusInfo `json:"workers"`
}
WorkersStatusSummary contains aggregated status for multiple workers
func GetWorkersStatus ¶
func GetWorkersStatus(workers []*Worker) WorkersStatusSummary
GetWorkersStatus returns status information for all workers This aligns with TypeScript's getWorkersStatus() helper