queue

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package queue provides a simple interface to interact with a queue.

Index

Constants

View Source
const (
	DefaultMetricCounterLabel = "counter"
	Type                      = "queue"
)

Type is the type of the entity regarding the framework. It is used to for example, to identify the entity in the logs, metrics, and for tracing.

Variables

View Source
var (
	// ErrRequiredPostHook is the error returned when the post-hook function is
	// missing.
	ErrRequiredPostHook = customerror.NewRequiredError("post-hook function", customerror.WithErrorCode("ERR_REQUIRED_POST_HOOK"))

	// ErrRequiredPreHook is the error returned when the pre-hook function is
	// missing.
	ErrRequiredPreHook = customerror.NewRequiredError("pre-hook function", customerror.WithErrorCode("ERR_REQUIRED_PRE_HOOK"))
)

Functions

func Flatten2D

func Flatten2D[T any](data [][]T) []T

Flatten2D takes a 2D slice and returns a 1D slice containing all the elements.

func ParseToStruct

func ParseToStruct(from, to any) error

ParseToStruct parses the given JSON (`from`) to struct (`to`).

func Publish

func Publish(ctx context.Context, s IQueue, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error

Publish data.

NOTE: Use `prm.Any` to set implementation-specific parameters.

func PublishMany

func PublishMany(
	ctx context.Context,
	q IQueue,
	queueName string,
	items []*Message,
	prm *PublishParams,
	options ...OptionsFunc,
) error

PublishMany publish `items` concurrently, against the specified Queue.

func PublishToMany

func PublishToMany(
	ctx context.Context,
	m Map,
	queueName string,
	msg *Message,
	prm *PublishParams,
	options ...OptionsFunc,
) error

PublishToMany publishes `msg` concurrently, against all Queues in `m`.

Types

type CallbackFunc

type CallbackFunc func(ctx context.Context, msg *Message) error

CallbackFunc is the function that will be once a message is received on a subscribed channel.

type HookFunc

type HookFunc func(ctx context.Context, q IQueue, queueName string, m *Message) error

HookFunc specifies the function that will be called before and after the operation.

type IQueue

type IQueue interface {
	// Publish data.
	//
	// NOTE: Use `prm.Any` to set implementation-specific parameters.
	Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error

	// Subscribe to channel.
	//
	// NOTE: Use `prm.Any` to set implementation-specific parameters.
	Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error

	// GetType returns its type.
	GetType() string

	// GetClient returns the queue client. Use that to interact with the
	// underlying queue client.
	GetClient() any

	// GetLogger returns the logger.
	GetLogger() sypl.ISypl

	// GetName returns the queue name.
	GetName() string

	// GetCounterPublished returns the metric.
	GetCounterPublished() *expvar.Int

	// GetCounterPublishedFailed returns the metric.
	GetCounterPublishedFailed() *expvar.Int

	// GetCounterReceived returns the metric.
	GetCounterReceived() *expvar.Int

	// GetCounterReceivedFailed returns the metric.
	GetCounterReceivedFailed() *expvar.Int

	// GetCounterSubscribed returns the metric.
	GetCounterSubscribed() *expvar.Int

	// GetCounterSubscribedFailed returns the metric.
	GetCounterSubscribedFailed() *expvar.Int
}

IQueue defines the queue abstraction layer - interface.

type Map added in v0.0.3

type Map map[string]IQueue

Map is a map of strgs

func (Map) String added in v0.0.3

func (m Map) String() string

String implements the Stringer interface.

func (Map) ToSlice added in v0.0.3

func (m Map) ToSlice() []IQueue

ToSlice converts Map to Slice of IQueue.

type Message

type Message struct {
	// Body is the raw message payload in bytes.
	Body []byte

	// MessageID uniquely identifies the message across a queue system.
	// Example: Message ID in SQS, Delivery Tag in RabbitMQ, Offset in Kafka, etc.
	MessageID string

	// Metadata holds specific attributes that don't fit into standard fields.
	Metadata map[string]interface{}

	// Timestamp indicates when the message was published to the queue.
	Timestamp time.Time
}

Message represents a generic queue message

func NewMessage

func NewMessage(body []byte) *Message

NewMessage returns a new Message with a unique ID and current timestamp.

func NewMessageFromStruct

func NewMessageFromStruct(body any) (*Message, error)

NewMessageFromStruct creates a new Message from a struct.

func NewMustMessageFromStruct

func NewMustMessageFromStruct(body any) *Message

NewMustMessageFromStruct creates a new Message from a struct, panicking on error.

type Mock

type Mock struct {

	// Publish data.
	MockPublish func(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error

	// Subscribe to channel.
	MockSubscribe func(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error

	// GetType returns its type.
	MockGetType func() string

	// GetClient returns the queue client. Use that to interact with the underlying queue client.
	MockGetClient func() any

	// GetLogger returns the logger.
	MockGetLogger func() sypl.ISypl

	// GetName returns the queue name.
	MockGetName func() string

	// GetCounterPublished returns the metric.
	MockGetCounterPublished func() *expvar.Int

	// GetCounterPublishedFailed returns the metric.
	MockGetCounterPublishedFailed func() *expvar.Int

	// GetCounterReceived returns the metric.
	MockGetCounterReceived func() *expvar.Int

	// GetCounterReceivedFailed returns the metric.
	MockGetCounterReceivedFailed func() *expvar.Int

	// GetCounterSubscribed returns the metric.
	MockGetCounterSubscribed func() *expvar.Int

	// GetCounterSubscribedFailed returns the metric.
	MockGetCounterSubscribedFailed func() *expvar.Int
}

Mock is a struct which satisfies the queue.IQueue interface.

func (*Mock) GetClient

func (m *Mock) GetClient() any

GetClient returns the queue client. Use that to interact with the underlying queue client.

func (*Mock) GetCounterPublished

func (m *Mock) GetCounterPublished() *expvar.Int

GetCounterPublished returns the metric.

func (*Mock) GetCounterPublishedFailed

func (m *Mock) GetCounterPublishedFailed() *expvar.Int

GetCounterPublishedFailed returns the metric.

func (*Mock) GetCounterReceived

func (m *Mock) GetCounterReceived() *expvar.Int

GetCounterReceived returns the metric.

func (*Mock) GetCounterReceivedFailed

func (m *Mock) GetCounterReceivedFailed() *expvar.Int

GetCounterReceivedFailed returns the metric.

func (*Mock) GetCounterSubscribed

func (m *Mock) GetCounterSubscribed() *expvar.Int

GetCounterSubscribed returns the metric.

func (*Mock) GetCounterSubscribedFailed

func (m *Mock) GetCounterSubscribedFailed() *expvar.Int

GetCounterSubscribedFailed returns the metric.

func (*Mock) GetLogger

func (m *Mock) GetLogger() sypl.ISypl

GetLogger returns the logger.

func (*Mock) GetName

func (m *Mock) GetName() string

GetName returns the queue name.

func (*Mock) GetType

func (m *Mock) GetType() string

GetType returns its type.

func (*Mock) Publish

func (m *Mock) Publish(ctx context.Context, queueName string, msg *Message, prm *PublishParams, options ...OptionsFunc) error

Publish data.

func (*Mock) Subscribe

func (m *Mock) Subscribe(ctx context.Context, queueName string, cb CallbackFunc, prm *SubscribeParams, options ...OptionsFunc) error

Subscribe to channel.

type Operation

type Operation string

Operation is the operation name.

const (
	OperationPublished  Operation = "published"
	OperationSubscribed Operation = "subscribed"
	OperationReceived   Operation = "Received"
)

func (Operation) String

func (o Operation) String() string

String implements the Stringer interface.

type Options

type Options struct {
	// QueueName name.
	QueueName string `json:"queueName"`

	// PreHookFunc is the function which runs before the operation.
	PreHookFunc HookFunc `json:"-"`

	// PostHookFunc is the function which runs after the operation.
	PostHookFunc HookFunc `json:"-"`
}

Options for operations.

func NewOptions

func NewOptions() (*Options, error)

NewOptions creates Options.

type OptionsFunc

type OptionsFunc func(o *Options) error

OptionsFunc allows to set options.

func WithPostHook

func WithPostHook(fn HookFunc) OptionsFunc

WithPostHook set the post-hook function.

func WithPreHook

func WithPreHook(fn HookFunc) OptionsFunc

WithPreHook set the pre-hook function.

func WithQueueName

func WithQueueName(queueName string) OptionsFunc

WithQueueName sets the queue name.

type PublishParams

type PublishParams struct {
	// Any is a placeholder for Queue-specific implementation specific fields.
	Any any `json:"-"`

	// Tags attach categorization metadata to the message.
	// Example: Message Tags in SQS, Message Properties in RabbitMQ, Record Headers in Kafka, etc.
	Tags []string

	// Route specifies where to publish the message.
	// Example: Routing Key in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
	Route string

	// DelaySeconds postpones message delivery by specified seconds.
	// Example: Message Timer in SQS, Delayed Exchange in RabbitMQ, etc.
	DelaySeconds int

	// MessageGroupID ensures ordering for related messages.
	// Example: Group ID in FIFO queues (SQS), Partition Key in Kafka, etc.
	MessageGroupID string

	// Priority influences message delivery order where supported.
	// Example: Message Priority in RabbitMQ, custom implementation in other queues, etc.
	Priority int

	// Metadata holds queue-specific attributes for publishing.
	// Example: System Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
	Metadata map[string]interface{}
}

PublishParams defines the parameters for publishing a message to a queue.

type Queue

type Queue struct {
	// Logger.
	Logger sypl.ISypl `json:"-" validate:"required"`

	// Name of the queue type.
	Name string `json:"name" validate:"required,lowercase,gte=1"`
	// contains filtered or unexported fields
}

Queue definition.

func New

func New(ctx context.Context, name string) (*Queue, error)

New returns a new Queue.

func (*Queue) GetCounterPublished

func (s *Queue) GetCounterPublished() *expvar.Int

GetCounterPublished returns the counterPublished.

func (*Queue) GetCounterPublishedFailed

func (s *Queue) GetCounterPublishedFailed() *expvar.Int

GetCounterPublishedFailed returns the counterPublishedFailed.

func (*Queue) GetCounterReceived

func (s *Queue) GetCounterReceived() *expvar.Int

GetCounterReceived returns the counterReceived.

func (*Queue) GetCounterReceivedFailed

func (s *Queue) GetCounterReceivedFailed() *expvar.Int

GetCounterReceivedFailed returns the counterReceivedFailed.

func (*Queue) GetCounterSubscribed

func (s *Queue) GetCounterSubscribed() *expvar.Int

GetCounterSubscribed returns the counterSubscribed.

func (*Queue) GetCounterSubscribedFailed

func (s *Queue) GetCounterSubscribedFailed() *expvar.Int

GetCounterSubscribedFailed returns the counterSubscribedFailed.

func (*Queue) GetLogger

func (s *Queue) GetLogger() sypl.ISypl

GetLogger returns the logger.

func (*Queue) GetName

func (s *Queue) GetName() string

GetName returns the queue name.

func (*Queue) GetType

func (s *Queue) GetType() string

GetType returns its type.

type SubscribeParams

type SubscribeParams struct {
	// Any is a placeholder for Queue-specific implementation specific fields.
	Any any `json:"-"`

	// GroupID identifies a group of consumers that work as one unit.
	// Example: Consumer Group in Kafka/SQS, Consumer Tag prefix in RabbitMQ, etc.
	GroupID string

	// Tags filter which messages this subscriber receives.
	// Example: Message Filtering in SQS, Binding Keys in RabbitMQ, Topic Subscriptions in Kafka, etc.
	Tags []string

	// Route specifies which route/topic to subscribe to.
	// Example: Binding Pattern in RabbitMQ, Topic in Kafka, unused in standard SQS, etc.
	Route string

	// BatchSize defines how many messages to process in a single batch.
	// This helps optimize throughput vs. latency across all queue systems.
	BatchSize int

	// MaxMessages sets the maximum number of messages to receive in total.
	// Useful for controlling message flow and preventing overwhelming consumers.
	MaxMessages int

	// WaitTimeout specifies how long to wait for messages when none are immediately available.
	// Example: Long Polling in SQS, Consumer Timeout in Kafka, etc.
	WaitTimeout time.Duration

	// Metadata holds queue-specific attributes that don't fit into standard fields.
	// Example: Message Attributes in SQS, Headers in RabbitMQ/Kafka, etc.
	Metadata map[string]interface{}

	// ContextTimeout specifies how long to wait for the callback to finish.
	ContextTimeout time.Duration
}

SubscribeParams defines the parameters for subscribing to a queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL