Documentation
¶
Overview ¶
Package queue provides a simple interface to interact with a queue.
Index ¶
- Constants
- Variables
- func Flatten2D[T any](data [][]T) []T
- func ParseToStruct(from, to any) error
- func Publish(ctx context.Context, s IQueue, queueName string, msg *Message, ...) error
- func PublishMany(ctx context.Context, q IQueue, queueName string, items []*Message, ...) error
- func PublishToMany(ctx context.Context, m Map, queueName string, msg *Message, prm *PublishParams, ...) error
- type CallbackFunc
- type HookFunc
- type IQueue
- type Map
- type Message
- type Mock
- func (m *Mock) GetClient() any
- func (m *Mock) GetCounterPublished() *expvar.Int
- func (m *Mock) GetCounterPublishedFailed() *expvar.Int
- func (m *Mock) GetCounterReceived() *expvar.Int
- func (m *Mock) GetCounterReceivedFailed() *expvar.Int
- func (m *Mock) GetCounterSubscribed() *expvar.Int
- func (m *Mock) GetCounterSubscribedFailed() *expvar.Int
- func (m *Mock) GetLogger() sypl.ISypl
- func (m *Mock) GetName() string
- func (m *Mock) GetType() string
- func (m *Mock) Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, ...) error
- func (m *Mock) Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, ...) error
- type Operation
- type Options
- type OptionsFunc
- type PublishParams
- type Queue
- func (s *Queue) GetCounterPublished() *expvar.Int
- func (s *Queue) GetCounterPublishedFailed() *expvar.Int
- func (s *Queue) GetCounterReceived() *expvar.Int
- func (s *Queue) GetCounterReceivedFailed() *expvar.Int
- func (s *Queue) GetCounterSubscribed() *expvar.Int
- func (s *Queue) GetCounterSubscribedFailed() *expvar.Int
- func (s *Queue) GetLogger() sypl.ISypl
- func (s *Queue) GetName() string
- func (s *Queue) GetType() string
- type SubscribeParams
Constants ¶
const ( DefaultMetricCounterLabel = "counter" Type = "queue" )
Type is the type of the entity regarding the framework. It is used to for example, to identify the entity in the logs, metrics, and for tracing.
Variables ¶
var ( // ErrRequiredPostHook is the error returned when the post-hook function is // missing. ErrRequiredPostHook = customerror.NewRequiredError("post-hook function", customerror.WithErrorCode("ERR_REQUIRED_POST_HOOK")) // ErrRequiredPreHook is the error returned when the pre-hook function is // missing. ErrRequiredPreHook = customerror.NewRequiredError("pre-hook function", customerror.WithErrorCode("ERR_REQUIRED_PRE_HOOK")) )
Functions ¶
func Flatten2D ¶
func Flatten2D[T any](data [][]T) []T
Flatten2D takes a 2D slice and returns a 1D slice containing all the elements.
func ParseToStruct ¶
ParseToStruct parses the given JSON (`from`) to struct (`to`).
func Publish ¶
func Publish(ctx context.Context, s IQueue, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error
Publish data.
NOTE: Use `prm.Any` to set implementation-specific parameters.
func PublishMany ¶
func PublishMany( ctx context.Context, q IQueue, queueName string, items []*Message, prm *PublishParams, options ...OptionsFunc, ) error
PublishMany publish `items` concurrently, against the specified Queue.
func PublishToMany ¶
func PublishToMany( ctx context.Context, m Map, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc, ) error
PublishToMany publishes `msg` concurrently, against all Queues in `m`.
Types ¶
type CallbackFunc ¶
CallbackFunc is the function that will be once a message is received on a subscribed channel.
type IQueue ¶
type IQueue interface {
// Publish data.
//
// NOTE: Use `prm.Any` to set implementation-specific parameters.
Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error
// Subscribe to channel.
//
// NOTE: Use `prm.Any` to set implementation-specific parameters.
Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error
// GetType returns its type.
GetType() string
// GetClient returns the queue client. Use that to interact with the
// underlying queue client.
GetClient() any
// GetLogger returns the logger.
GetLogger() sypl.ISypl
// GetName returns the queue name.
GetName() string
// GetCounterPublished returns the metric.
GetCounterPublished() *expvar.Int
// GetCounterPublishedFailed returns the metric.
GetCounterPublishedFailed() *expvar.Int
// GetCounterReceived returns the metric.
GetCounterReceived() *expvar.Int
// GetCounterReceivedFailed returns the metric.
GetCounterReceivedFailed() *expvar.Int
// GetCounterSubscribed returns the metric.
GetCounterSubscribed() *expvar.Int
// GetCounterSubscribedFailed returns the metric.
GetCounterSubscribedFailed() *expvar.Int
}
IQueue defines the queue abstraction layer - interface.
type Map ¶ added in v0.0.3
Map is a map of strgs
type Message ¶
type Message struct {
// Body is the raw message payload in bytes.
Body []byte
// MessageID uniquely identifies the message across a queue system.
// Example: Message ID in SQS, Delivery Tag in RabbitMQ, Offset in Kafka, etc.
MessageID string
// Metadata holds specific attributes that don't fit into standard fields.
Metadata map[string]interface{}
// Timestamp indicates when the message was published to the queue.
Timestamp time.Time
}
Message represents a generic queue message
func NewMessage ¶
NewMessage returns a new Message with a unique ID and current timestamp.
func NewMessageFromStruct ¶
NewMessageFromStruct creates a new Message from a struct.
func NewMustMessageFromStruct ¶
NewMustMessageFromStruct creates a new Message from a struct, panicking on error.
type Mock ¶
type Mock struct {
// Publish data.
MockPublish func(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error
// Subscribe to channel.
MockSubscribe func(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error
// GetType returns its type.
MockGetType func() string
// GetClient returns the queue client. Use that to interact with the underlying queue client.
MockGetClient func() any
// GetLogger returns the logger.
MockGetLogger func() sypl.ISypl
// GetName returns the queue name.
MockGetName func() string
// GetCounterPublished returns the metric.
MockGetCounterPublished func() *expvar.Int
// GetCounterPublishedFailed returns the metric.
MockGetCounterPublishedFailed func() *expvar.Int
// GetCounterReceived returns the metric.
MockGetCounterReceived func() *expvar.Int
// GetCounterReceivedFailed returns the metric.
MockGetCounterReceivedFailed func() *expvar.Int
// GetCounterSubscribed returns the metric.
MockGetCounterSubscribed func() *expvar.Int
// GetCounterSubscribedFailed returns the metric.
MockGetCounterSubscribedFailed func() *expvar.Int
}
Mock is a struct which satisfies the queue.IQueue interface.
func (*Mock) GetClient ¶
GetClient returns the queue client. Use that to interact with the underlying queue client.
func (*Mock) GetCounterPublished ¶
GetCounterPublished returns the metric.
func (*Mock) GetCounterPublishedFailed ¶
GetCounterPublishedFailed returns the metric.
func (*Mock) GetCounterReceived ¶
GetCounterReceived returns the metric.
func (*Mock) GetCounterReceivedFailed ¶
GetCounterReceivedFailed returns the metric.
func (*Mock) GetCounterSubscribed ¶
GetCounterSubscribed returns the metric.
func (*Mock) GetCounterSubscribedFailed ¶
GetCounterSubscribedFailed returns the metric.
func (*Mock) Publish ¶
func (m *Mock) Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error
Publish data.
func (*Mock) Subscribe ¶
func (m *Mock) Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error
Subscribe to channel.
type Options ¶
type Options struct {
// QueueName name.
QueueName string `json:"queueName"`
// PreHookFunc is the function which runs before the operation.
PreHookFunc HookFunc `json:"-"`
// PostHookFunc is the function which runs after the operation.
PostHookFunc HookFunc `json:"-"`
}
Options for operations.
type OptionsFunc ¶
OptionsFunc allows to set options.
func WithPostHook ¶
func WithPostHook(fn HookFunc) OptionsFunc
WithPostHook set the post-hook function.
func WithQueueName ¶
func WithQueueName(queueName string) OptionsFunc
WithQueueName sets the queue name.
type PublishParams ¶
type PublishParams struct {
// Any is a placeholder for Queue-specific implementation specific fields.
Any any `json:"-"`
// Tags attach categorization metadata to the message.
// Example: Message Tags in SQS, Message Properties in RabbitMQ, Record Headers in Kafka, etc.
Tags []string
// Route specifies where to publish the message.
// Example: Routing Key in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
Route string
// DelaySeconds postpones message delivery by specified seconds.
// Example: Message Timer in SQS, Delayed Exchange in RabbitMQ, etc.
DelaySeconds int
// MessageGroupID ensures ordering for related messages.
// Example: Group ID in FIFO queues (SQS), Partition Key in Kafka, etc.
MessageGroupID string
// Priority influences message delivery order where supported.
// Example: Message Priority in RabbitMQ, custom implementation in other queues, etc.
Priority int
// Metadata holds queue-specific attributes for publishing.
// Example: System Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
Metadata map[string]interface{}
}
PublishParams defines the parameters for publishing a message to a queue.
type Queue ¶
type Queue struct {
// Logger.
Logger sypl.ISypl `json:"-" validate:"required"`
// Name of the queue type.
Name string `json:"name" validate:"required,lowercase,gte=1"`
// contains filtered or unexported fields
}
Queue definition.
func (*Queue) GetCounterPublished ¶
GetCounterPublished returns the counterPublished.
func (*Queue) GetCounterPublishedFailed ¶
GetCounterPublishedFailed returns the counterPublishedFailed.
func (*Queue) GetCounterReceived ¶
GetCounterReceived returns the counterReceived.
func (*Queue) GetCounterReceivedFailed ¶
GetCounterReceivedFailed returns the counterReceivedFailed.
func (*Queue) GetCounterSubscribed ¶
GetCounterSubscribed returns the counterSubscribed.
func (*Queue) GetCounterSubscribedFailed ¶
GetCounterSubscribedFailed returns the counterSubscribedFailed.
type SubscribeParams ¶
type SubscribeParams struct {
// Any is a placeholder for Queue-specific implementation specific fields.
Any any `json:"-"`
// GroupID identifies a group of consumers that work as one unit.
// Example: Consumer Group in Kafka/SQS, Consumer Tag prefix in RabbitMQ, etc.
GroupID string
// Tags filter which messages this subscriber receives.
// Example: Message Filtering in SQS, Binding Keys in RabbitMQ, Topic Subscriptions in Kafka, etc.
Tags []string
// Route specifies which route/topic to subscribe to.
// Example: Binding Pattern in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
Route string
// BatchSize defines how many messages to process in a single batch.
// This helps optimize throughput vs. latency across all queue systems.
BatchSize int
// MaxMessages sets the maximum number of messages to receive in total.
// Useful for controlling message flow and preventing overwhelming consumers.
MaxMessages int
// WaitTimeout specifies how long to wait for messages when none are immediately available.
// Example: Long Polling in SQS, Consumer Timeout in Kafka, etc.
WaitTimeout time.Duration
// Metadata holds queue-specific attributes that don't fit into standard fields.
// Example: Message Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
Metadata map[string]interface{}
// ContextTimeout specifies how long to wait for the callback to finish.
ContextTimeout time.Duration
}
SubscribeParams defines the parameters for subscribing to a queue.