queue

package
v0.0.0-...-d7e357c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultAQServerConfigFunc = func() asynq.Config {
		return asynq.Config{
			Concurrency: runtime.NumCPU(),

			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},

			Logger: util.ZapLogger("asynq"),

			LogLevel: asynq.WarnLevel,

			DelayedTaskCheckInterval: time.Second * 1,
		}
	}
)
View Source
var (
	DefaultGQOptionFunc = func() GQueueOption {
		return GQueueOption{
			MaxConcurrent: runtime.NumCPU(),
			MaxRetry:      defaultQueueMaxRetry,
			Logger:        util.ZapLogger("queue"),
			Groups:        map[int]int{1: 6, 2: 3, 3: 1},
			BatchSize:     defaultBatchSize,
		}
	}
)

Functions

func AQHandle

func AQHandle(mux *asynq.ServeMux, pattern string, handler asynq.Handler)

func AQHandleFunc

func AQHandleFunc(mux *asynq.ServeMux, pattern string, handler asynq.HandlerFunc)

func EnqueueGq

func EnqueueGq(task string, payload interface{}, opt ...GQueueTaskOpt)

EnqueueGq 将任务加入全局队列

func LoadRedisClientOpt

func LoadRedisClientOpt() asynq.RedisClientOpt

func NewAQClient

func NewAQClient(c ...asynq.RedisClientOpt) *asynq.Client

func NewAQServer

func NewAQServer(c ...asynq.Config) *asynq.Server

func NewAQServerMux

func NewAQServerMux(hm AQHandlerMap) *asynq.ServeMux

func NewAQTask

func NewAQTask(typ string, data interface{}, opts ...asynq.Option) *asynq.Task

func Pattern

func Pattern(typ string) string

func SetAppName

func SetAppName(name string)

SetAppName sets the app name.

func StartGQServer

func StartGQServer(hMap GQHandlerMap)

StartGQServer 批量注册并启动

func WrapAQHandler

func WrapAQHandler(fn any) asynq.HandlerFunc

Types

type AQHandlerMap

type AQHandlerMap map[string]asynq.HandlerFunc

type GQHandlerMap

type GQHandlerMap map[string]GQueueHandler

type GQueue

type GQueue struct {
	// contains filtered or unexported fields
}

func GetGQServer

func GetGQServer(opts ...GQueueOption) *GQueue

GetGQServer 单例

func NewGQueue

func NewGQueue(opt1 GQueueOption) *GQueue

func (*GQueue) Enqueue

func (g *GQueue) Enqueue(task GQueueTask)

func (*GQueue) GetStats

func (g *GQueue) GetStats() map[string]interface{}

func (*GQueue) IsClosed

func (g *GQueue) IsClosed() bool

IsClosed 检查队列是否已关闭

func (*GQueue) Register

func (g *GQueue) Register(task string, handler GQueueHandler)

func (*GQueue) StartServer

func (g *GQueue) StartServer()

func (*GQueue) Stop

func (g *GQueue) Stop()

func (*GQueue) StopAndWait

func (g *GQueue) StopAndWait()

StopAndWait 停止队列并等待所有任务完成

func (*GQueue) Wait

func (g *GQueue) Wait()

type GQueueArray

type GQueueArray struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewGQueueArray

func NewGQueueArray() *GQueueArray

func (*GQueueArray) Clear

func (q *GQueueArray) Clear()

func (*GQueueArray) Dequeue

func (q *GQueueArray) Dequeue() (value interface{}, ok bool)

func (*GQueueArray) Empty

func (q *GQueueArray) Empty() bool

func (*GQueueArray) Enqueue

func (q *GQueueArray) Enqueue(value interface{})

func (*GQueueArray) Peek

func (q *GQueueArray) Peek() (value interface{}, ok bool)

func (*GQueueArray) Size

func (q *GQueueArray) Size() int

func (*GQueueArray) String

func (q *GQueueArray) String() string

func (*GQueueArray) Values

func (q *GQueueArray) Values() []interface{}

type GQueueHandler

type GQueueHandler func(task *GQueueTask) (err error)

func WrapGQHandler

func WrapGQHandler(fn any) GQueueHandler

WrapGQHandler 包装任意函数为队列处理器

type GQueueOption

type GQueueOption struct {
	MaxConcurrent int            `json:"max_concurrent"`
	MaxRetry      int            `json:"max_retry"`
	Logger        util.ItfLogger `json:"-"`
	Groups        map[int]int    `json:"groups"`
	BatchSize     int            `json:"batch_size"`
}

type GQueueTask

type GQueueTask struct {
	ID             string                          `json:"id"`
	Name           string                          `json:"name"`
	Data           []byte                          `json:"data"`
	Retried        int                             `json:"retried"`
	Group          int                             `json:"group"`
	CreatedAt      time.Time                       `json:"created_at"`
	MaxRetry       int                             `json:"max_retry"` // -1 使用全局
	RetryDelayFunc func(retried int) time.Duration `json:"-"`
	RunAt          time.Time                       `json:"run_at"`
}

func NewGQTask

func NewGQTask(task string, payload interface{}, opt ...GQueueTaskOpt) GQueueTask

func NewGQueueTaskBytes

func NewGQueueTaskBytes(name string, data []byte, opt ...GQueueTaskOpt) GQueueTask

func (GQueueTask) Copy

func (t GQueueTask) Copy() *GQueueTask

func (GQueueTask) IsExpired

func (t GQueueTask) IsExpired(timeout time.Duration) bool

func (GQueueTask) String

func (t GQueueTask) String() string

type GQueueTaskOpt

type GQueueTaskOpt func(task *GQueueTask)

func WithGQueueTaskGroup

func WithGQueueTaskGroup(group int) GQueueTaskOpt

func WithGQueueTaskID

func WithGQueueTaskID(id string) GQueueTaskOpt

func WithGQueueTaskMaxRetry

func WithGQueueTaskMaxRetry(maxRetry int) GQueueTaskOpt

func WithGQueueTaskRetryDelayFunc

func WithGQueueTaskRetryDelayFunc(retryDelayFunc func(retried int) time.Duration) GQueueTaskOpt

func WithGQueueTaskRunAt

func WithGQueueTaskRunAt(runAt time.Time) GQueueTaskOpt

type Job

type Job func()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL