outbox

package
v2.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package outbox provides transactional outbox primitives.

It includes an event model, repository contracts, a generic dispatcher with retry controls, and PostgreSQL adapters under the postgres subpackage.

Index

Constants

View Source
const (
	OutboxStatusPending    = "PENDING"
	OutboxStatusProcessing = "PROCESSING"
	OutboxStatusPublished  = "PUBLISHED"
	OutboxStatusFailed     = "FAILED"
	OutboxStatusInvalid    = "INVALID"
	DefaultMaxPayloadBytes = 1 << 20
)
View Source
const TenantIDContextKey tenantIDContextKey = "outbox.tenant_id"

TenantIDContextKey stores tenant id used by outbox multi-tenant operations.

Variables

View Source
var (
	ErrOutboxEventRequired        = errors.New("outbox event is required")
	ErrOutboxRepositoryRequired   = errors.New("outbox repository is required")
	ErrOutboxDispatcherRequired   = errors.New("outbox dispatcher is required")
	ErrOutboxDispatcherRunning    = errors.New("outbox dispatcher is already running")
	ErrOutboxEventPayloadRequired = errors.New("outbox event payload is required")
	ErrOutboxEventPayloadTooLarge = errors.New("outbox event payload exceeds maximum allowed size")
	ErrOutboxEventPayloadNotJSON  = errors.New("outbox event payload must be valid JSON (stored as JSONB)")
	ErrHandlerRegistryRequired    = errors.New("handler registry is required")
	ErrEventTypeRequired          = errors.New("event type is required")
	ErrEventHandlerRequired       = errors.New("event handler is required")
	ErrHandlerAlreadyRegistered   = errors.New("event handler already registered")
	ErrHandlerNotRegistered       = errors.New("event handler is not registered")
	ErrTenantIDRequired           = errors.New("tenant id is required")
	ErrOutboxStatusInvalid        = errors.New("invalid outbox status")
	ErrOutboxTransitionInvalid    = errors.New("invalid outbox status transition")
)

Functions

func ContextWithTenantID

func ContextWithTenantID(ctx context.Context, tenantID string) context.Context

ContextWithTenantID returns a context carrying tenantID.

func SanitizeErrorMessageForStorage

func SanitizeErrorMessageForStorage(msg string) string

SanitizeErrorMessageForStorage redacts sensitive values and enforces a bounded length.

func TenantIDFromContext

func TenantIDFromContext(ctx context.Context) (string, bool)

TenantIDFromContext reads tenant id from context.

func ValidateOutboxTransition

func ValidateOutboxTransition(fromRaw, toRaw string) error

ValidateOutboxTransition validates a status transition using typed lifecycle rules.

Types

type DispatchResult

type DispatchResult struct {
	Processed         int
	Published         int
	Failed            int
	StateUpdateFailed int
}

DispatchResult captures one dispatch cycle outcome.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher handles publishing outbox events through registered handlers.

func NewDispatcher

func NewDispatcher(
	repo OutboxRepository,
	handlers *HandlerRegistry,
	logger libLog.Logger,
	tracer trace.Tracer,
	opts ...DispatcherOption,
) (*Dispatcher, error)

NewDispatcher creates a generic outbox dispatcher.

func (*Dispatcher) DispatchOnce

func (dispatcher *Dispatcher) DispatchOnce(ctx context.Context) int

DispatchOnce processes one tenant-scoped dispatch cycle.

func (*Dispatcher) DispatchOnceResult

func (dispatcher *Dispatcher) DispatchOnceResult(ctx context.Context) DispatchResult

DispatchOnceResult processes one tenant-scoped dispatch cycle and returns counters.

func (*Dispatcher) Run

func (dispatcher *Dispatcher) Run(launcher *libCommons.Launcher) error

Run starts the dispatcher loop until Stop is called.

func (*Dispatcher) RunContext

func (dispatcher *Dispatcher) RunContext(parentCtx context.Context, launcher *libCommons.Launcher) error

RunContext starts the dispatcher loop until Stop is called or ctx is cancelled.

func (*Dispatcher) Shutdown

func (dispatcher *Dispatcher) Shutdown(ctx context.Context) error

Shutdown waits for in-flight dispatch cycle completion.

func (*Dispatcher) Stop

func (dispatcher *Dispatcher) Stop()

Stop signals the dispatcher loop to stop.

type DispatcherConfig

type DispatcherConfig struct {
	// DispatchInterval is the periodic interval between dispatch cycles.
	DispatchInterval time.Duration
	// BatchSize is the max number of events processed per cycle.
	BatchSize int
	// PublishMaxAttempts is the max publish attempts for one event.
	PublishMaxAttempts int
	// PublishBackoff is the base backoff between publish retries.
	PublishBackoff time.Duration
	// ListPendingFailureThreshold emits an error log once repeated list failures reach this count.
	ListPendingFailureThreshold int
	// RetryWindow is the minimum age for failed events to become retry-eligible.
	RetryWindow time.Duration
	// MaxDispatchAttempts is the max total dispatch attempts before invalidation.
	MaxDispatchAttempts int
	// ProcessingTimeout is the age threshold for reclaiming stuck processing events.
	ProcessingTimeout time.Duration
	// PriorityBudget limits how many events can be selected via priority lists per cycle.
	PriorityBudget int
	// MaxFailedPerBatch limits how many failed events are reclaimed in one cycle.
	MaxFailedPerBatch int
	// PriorityEventTypes defines ordered event types to pull first each cycle.
	PriorityEventTypes []string
	// IncludeTenantMetrics enables tenant metric attributes and can increase cardinality.
	IncludeTenantMetrics bool
	// MaxTenantMetricDimensions caps unique tenant labels before falling back to an overflow label.
	MaxTenantMetricDimensions int
	// MaxTrackedListPendingFailureTenants caps in-memory tenant counters for ListPending failures.
	MaxTrackedListPendingFailureTenants int
	// MeterProvider overrides the default global meter provider when set.
	MeterProvider metric.MeterProvider
}

DispatcherConfig controls dispatcher polling, retry, and metric behavior.

func DefaultDispatcherConfig

func DefaultDispatcherConfig() DispatcherConfig

DefaultDispatcherConfig returns the baseline dispatcher configuration.

type DispatcherOption

type DispatcherOption func(*Dispatcher)

DispatcherOption mutates dispatcher configuration at construction.

func WithBatchSize

func WithBatchSize(size int) DispatcherOption

WithBatchSize sets the maximum events processed in one dispatch cycle.

func WithDispatchInterval

func WithDispatchInterval(interval time.Duration) DispatcherOption

WithDispatchInterval sets the dispatch polling interval.

func WithListPendingFailureThreshold

func WithListPendingFailureThreshold(threshold int) DispatcherOption

WithListPendingFailureThreshold sets the log threshold for repeated list failures.

func WithMaxDispatchAttempts

func WithMaxDispatchAttempts(attempts int) DispatcherOption

WithMaxDispatchAttempts sets max dispatch attempts before invalidation.

func WithMaxFailedPerBatch

func WithMaxFailedPerBatch(maxFailed int) DispatcherOption

WithMaxFailedPerBatch sets max failed events reclaimed each cycle.

func WithMaxTenantMetricDimensions

func WithMaxTenantMetricDimensions(maxDimensions int) DispatcherOption

WithMaxTenantMetricDimensions sets the maximum unique tenant labels used in metrics.

func WithMaxTrackedListPendingFailureTenants

func WithMaxTrackedListPendingFailureTenants(maxTenants int) DispatcherOption

WithMaxTrackedListPendingFailureTenants sets the in-memory cap for tenant-specific ListPending failure counters.

func WithMeterProvider

func WithMeterProvider(provider metric.MeterProvider) DispatcherOption

WithMeterProvider injects a custom meter provider for dispatcher metrics. Passing nil keeps the default global OpenTelemetry meter provider.

func WithPriorityBudget

func WithPriorityBudget(budget int) DispatcherOption

WithPriorityBudget sets the per-cycle priority selection budget.

func WithPriorityEventTypes

func WithPriorityEventTypes(eventTypes ...string) DispatcherOption

WithPriorityEventTypes sets the ordered event types selected before generic pending events.

func WithProcessingTimeout

func WithProcessingTimeout(timeout time.Duration) DispatcherOption

WithProcessingTimeout sets the timeout used to reclaim stuck processing events.

func WithPublishBackoff

func WithPublishBackoff(backoff time.Duration) DispatcherOption

WithPublishBackoff sets base backoff for publish retry attempts.

func WithPublishMaxAttempts

func WithPublishMaxAttempts(maxAttempts int) DispatcherOption

WithPublishMaxAttempts sets max publish attempts per event.

func WithRetryClassifier

func WithRetryClassifier(classifier RetryClassifier) DispatcherOption

WithRetryClassifier sets the non-retryable error classifier.

func WithRetryWindow

func WithRetryWindow(retryWindow time.Duration) DispatcherOption

WithRetryWindow sets failed-event cooldown before retry reclamation.

func WithTenantMetricAttributes

func WithTenantMetricAttributes(enabled bool) DispatcherOption

WithTenantMetricAttributes toggles tenant attributes for dispatcher metrics.

type EventHandler

type EventHandler func(ctx context.Context, event *OutboxEvent) error

EventHandler handles one outbox event.

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry stores event handlers by event type.

func NewHandlerRegistry

func NewHandlerRegistry() *HandlerRegistry

func (*HandlerRegistry) Handle

func (registry *HandlerRegistry) Handle(ctx context.Context, event *OutboxEvent) error

func (*HandlerRegistry) Register

func (registry *HandlerRegistry) Register(eventType string, handler EventHandler) error

type OutboxEvent

type OutboxEvent struct {
	ID          uuid.UUID
	EventType   string
	AggregateID uuid.UUID
	Payload     []byte
	Status      string
	Attempts    int
	PublishedAt *time.Time
	LastError   string
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

OutboxEvent is an event stored in the outbox for reliable delivery.

func NewOutboxEvent

func NewOutboxEvent(
	ctx context.Context,
	eventType string,
	aggregateID uuid.UUID,
	payload []byte,
) (*OutboxEvent, error)

NewOutboxEvent creates a valid outbox event initialized as pending.

func NewOutboxEventWithID

func NewOutboxEventWithID(
	ctx context.Context,
	eventID uuid.UUID,
	eventType string,
	aggregateID uuid.UUID,
	payload []byte,
) (*OutboxEvent, error)

NewOutboxEventWithID creates a valid outbox event initialized as pending using a caller-provided ID.

type OutboxEventStatus

type OutboxEventStatus string

OutboxEventStatus represents a valid outbox event lifecycle state.

func ParseOutboxEventStatus

func ParseOutboxEventStatus(raw string) (OutboxEventStatus, error)

ParseOutboxEventStatus validates and converts a raw string status.

func (OutboxEventStatus) CanTransitionTo

func (status OutboxEventStatus) CanTransitionTo(next OutboxEventStatus) bool

CanTransitionTo reports whether a transition from status to next is allowed.

func (OutboxEventStatus) IsValid

func (status OutboxEventStatus) IsValid() bool

IsValid reports whether the status is part of the outbox lifecycle.

func (OutboxEventStatus) String

func (status OutboxEventStatus) String() string

type OutboxRepository

type OutboxRepository interface {
	Create(ctx context.Context, event *OutboxEvent) (*OutboxEvent, error)
	CreateWithTx(ctx context.Context, tx Tx, event *OutboxEvent) (*OutboxEvent, error)
	ListPending(ctx context.Context, limit int) ([]*OutboxEvent, error)
	ListPendingByType(ctx context.Context, eventType string, limit int) ([]*OutboxEvent, error)
	ListTenants(ctx context.Context) ([]string, error)
	GetByID(ctx context.Context, id uuid.UUID) (*OutboxEvent, error)
	MarkPublished(ctx context.Context, id uuid.UUID, publishedAt time.Time) error
	MarkFailed(ctx context.Context, id uuid.UUID, errMsg string, maxAttempts int) error
	ListFailedForRetry(ctx context.Context, limit int, failedBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
	ResetForRetry(ctx context.Context, limit int, failedBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
	ResetStuckProcessing(ctx context.Context, limit int, processingBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
	MarkInvalid(ctx context.Context, id uuid.UUID, errMsg string) error
}

OutboxRepository defines persistence operations for outbox events.

type RetryClassifier

type RetryClassifier interface {
	IsNonRetryable(err error) bool
}

RetryClassifier determines whether an error should not be retried.

type RetryClassifierFunc

type RetryClassifierFunc func(err error) bool

func (RetryClassifierFunc) IsNonRetryable

func (fn RetryClassifierFunc) IsNonRetryable(err error) bool

type TenantDiscoverer

type TenantDiscoverer interface {
	DiscoverTenants(ctx context.Context) ([]string, error)
}

TenantDiscoverer lists tenant identifiers to dispatch events for.

type TenantResolver

type TenantResolver interface {
	ApplyTenant(ctx context.Context, tx *sql.Tx, tenantID string) error
}

TenantResolver applies tenant-scoping rules for a transaction.

type Tx

type Tx = *sql.Tx

Tx is the transactional handle used by CreateWithTx.

It intentionally aliases *sql.Tx to keep the repository contract compatible with existing database/sql transaction orchestration and tenant resolvers. This avoids hidden adapter layers in write paths where tenant scoping runs inside the caller's transaction.

Directories

Path Synopsis
Package postgres provides PostgreSQL adapters for outbox repository contracts.
Package postgres provides PostgreSQL adapters for outbox repository contracts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL