Documentation
¶
Overview ¶
Package outbox provides transactional outbox primitives.
It includes an event model, repository contracts, a generic dispatcher with retry controls, and PostgreSQL adapters under the postgres subpackage.
Index ¶
- Constants
- Variables
- func ContextWithTenantID(ctx context.Context, tenantID string) context.Context
- func SanitizeErrorMessageForStorage(msg string) string
- func TenantIDFromContext(ctx context.Context) (string, bool)
- func ValidateOutboxTransition(fromRaw, toRaw string) error
- type DispatchResult
- type Dispatcher
- func (dispatcher *Dispatcher) DispatchOnce(ctx context.Context) int
- func (dispatcher *Dispatcher) DispatchOnceResult(ctx context.Context) DispatchResult
- func (dispatcher *Dispatcher) Run(launcher *libCommons.Launcher) error
- func (dispatcher *Dispatcher) RunContext(parentCtx context.Context, launcher *libCommons.Launcher) error
- func (dispatcher *Dispatcher) Shutdown(ctx context.Context) error
- func (dispatcher *Dispatcher) Stop()
- type DispatcherConfig
- type DispatcherOption
- func WithBatchSize(size int) DispatcherOption
- func WithDispatchInterval(interval time.Duration) DispatcherOption
- func WithListPendingFailureThreshold(threshold int) DispatcherOption
- func WithMaxDispatchAttempts(attempts int) DispatcherOption
- func WithMaxFailedPerBatch(maxFailed int) DispatcherOption
- func WithMaxTenantMetricDimensions(maxDimensions int) DispatcherOption
- func WithMaxTrackedListPendingFailureTenants(maxTenants int) DispatcherOption
- func WithMeterProvider(provider metric.MeterProvider) DispatcherOption
- func WithPriorityBudget(budget int) DispatcherOption
- func WithPriorityEventTypes(eventTypes ...string) DispatcherOption
- func WithProcessingTimeout(timeout time.Duration) DispatcherOption
- func WithPublishBackoff(backoff time.Duration) DispatcherOption
- func WithPublishMaxAttempts(maxAttempts int) DispatcherOption
- func WithRetryClassifier(classifier RetryClassifier) DispatcherOption
- func WithRetryWindow(retryWindow time.Duration) DispatcherOption
- func WithTenantMetricAttributes(enabled bool) DispatcherOption
- type EventHandler
- type HandlerRegistry
- type OutboxEvent
- type OutboxEventStatus
- type OutboxRepository
- type RetryClassifier
- type RetryClassifierFunc
- type TenantDiscoverer
- type TenantResolver
- type Tx
Constants ¶
const ( OutboxStatusPending = "PENDING" OutboxStatusProcessing = "PROCESSING" OutboxStatusPublished = "PUBLISHED" OutboxStatusFailed = "FAILED" OutboxStatusInvalid = "INVALID" DefaultMaxPayloadBytes = 1 << 20 )
const TenantIDContextKey tenantIDContextKey = "outbox.tenant_id"
TenantIDContextKey stores tenant id used by outbox multi-tenant operations.
Variables ¶
var ( ErrOutboxEventRequired = errors.New("outbox event is required") ErrOutboxRepositoryRequired = errors.New("outbox repository is required") ErrOutboxDispatcherRequired = errors.New("outbox dispatcher is required") ErrOutboxDispatcherRunning = errors.New("outbox dispatcher is already running") ErrOutboxEventPayloadRequired = errors.New("outbox event payload is required") ErrOutboxEventPayloadTooLarge = errors.New("outbox event payload exceeds maximum allowed size") ErrOutboxEventPayloadNotJSON = errors.New("outbox event payload must be valid JSON (stored as JSONB)") ErrHandlerRegistryRequired = errors.New("handler registry is required") ErrEventTypeRequired = errors.New("event type is required") ErrEventHandlerRequired = errors.New("event handler is required") ErrHandlerAlreadyRegistered = errors.New("event handler already registered") ErrHandlerNotRegistered = errors.New("event handler is not registered") ErrTenantIDRequired = errors.New("tenant id is required") ErrOutboxStatusInvalid = errors.New("invalid outbox status") ErrOutboxTransitionInvalid = errors.New("invalid outbox status transition") )
Functions ¶
func ContextWithTenantID ¶
ContextWithTenantID returns a context carrying tenantID.
func SanitizeErrorMessageForStorage ¶
SanitizeErrorMessageForStorage redacts sensitive values and enforces a bounded length.
func TenantIDFromContext ¶
TenantIDFromContext reads tenant id from context.
func ValidateOutboxTransition ¶
ValidateOutboxTransition validates a status transition using typed lifecycle rules.
Types ¶
type DispatchResult ¶
DispatchResult captures one dispatch cycle outcome.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher handles publishing outbox events through registered handlers.
func NewDispatcher ¶
func NewDispatcher( repo OutboxRepository, handlers *HandlerRegistry, logger libLog.Logger, tracer trace.Tracer, opts ...DispatcherOption, ) (*Dispatcher, error)
NewDispatcher creates a generic outbox dispatcher.
func (*Dispatcher) DispatchOnce ¶
func (dispatcher *Dispatcher) DispatchOnce(ctx context.Context) int
DispatchOnce processes one tenant-scoped dispatch cycle.
func (*Dispatcher) DispatchOnceResult ¶
func (dispatcher *Dispatcher) DispatchOnceResult(ctx context.Context) DispatchResult
DispatchOnceResult processes one tenant-scoped dispatch cycle and returns counters.
func (*Dispatcher) Run ¶
func (dispatcher *Dispatcher) Run(launcher *libCommons.Launcher) error
Run starts the dispatcher loop until Stop is called.
func (*Dispatcher) RunContext ¶
func (dispatcher *Dispatcher) RunContext(parentCtx context.Context, launcher *libCommons.Launcher) error
RunContext starts the dispatcher loop until Stop is called or ctx is cancelled.
func (*Dispatcher) Shutdown ¶
func (dispatcher *Dispatcher) Shutdown(ctx context.Context) error
Shutdown waits for in-flight dispatch cycle completion.
func (*Dispatcher) Stop ¶
func (dispatcher *Dispatcher) Stop()
Stop signals the dispatcher loop to stop.
type DispatcherConfig ¶
type DispatcherConfig struct {
// DispatchInterval is the periodic interval between dispatch cycles.
DispatchInterval time.Duration
// BatchSize is the max number of events processed per cycle.
BatchSize int
// PublishMaxAttempts is the max publish attempts for one event.
PublishMaxAttempts int
// PublishBackoff is the base backoff between publish retries.
PublishBackoff time.Duration
// ListPendingFailureThreshold emits an error log once repeated list failures reach this count.
ListPendingFailureThreshold int
// RetryWindow is the minimum age for failed events to become retry-eligible.
RetryWindow time.Duration
// MaxDispatchAttempts is the max total dispatch attempts before invalidation.
MaxDispatchAttempts int
// ProcessingTimeout is the age threshold for reclaiming stuck processing events.
ProcessingTimeout time.Duration
// PriorityBudget limits how many events can be selected via priority lists per cycle.
PriorityBudget int
// MaxFailedPerBatch limits how many failed events are reclaimed in one cycle.
MaxFailedPerBatch int
// PriorityEventTypes defines ordered event types to pull first each cycle.
PriorityEventTypes []string
// IncludeTenantMetrics enables tenant metric attributes and can increase cardinality.
IncludeTenantMetrics bool
// MaxTenantMetricDimensions caps unique tenant labels before falling back to an overflow label.
MaxTenantMetricDimensions int
// MaxTrackedListPendingFailureTenants caps in-memory tenant counters for ListPending failures.
MaxTrackedListPendingFailureTenants int
// MeterProvider overrides the default global meter provider when set.
MeterProvider metric.MeterProvider
}
DispatcherConfig controls dispatcher polling, retry, and metric behavior.
func DefaultDispatcherConfig ¶
func DefaultDispatcherConfig() DispatcherConfig
DefaultDispatcherConfig returns the baseline dispatcher configuration.
type DispatcherOption ¶
type DispatcherOption func(*Dispatcher)
DispatcherOption mutates dispatcher configuration at construction.
func WithBatchSize ¶
func WithBatchSize(size int) DispatcherOption
WithBatchSize sets the maximum events processed in one dispatch cycle.
func WithDispatchInterval ¶
func WithDispatchInterval(interval time.Duration) DispatcherOption
WithDispatchInterval sets the dispatch polling interval.
func WithListPendingFailureThreshold ¶
func WithListPendingFailureThreshold(threshold int) DispatcherOption
WithListPendingFailureThreshold sets the log threshold for repeated list failures.
func WithMaxDispatchAttempts ¶
func WithMaxDispatchAttempts(attempts int) DispatcherOption
WithMaxDispatchAttempts sets max dispatch attempts before invalidation.
func WithMaxFailedPerBatch ¶
func WithMaxFailedPerBatch(maxFailed int) DispatcherOption
WithMaxFailedPerBatch sets max failed events reclaimed each cycle.
func WithMaxTenantMetricDimensions ¶
func WithMaxTenantMetricDimensions(maxDimensions int) DispatcherOption
WithMaxTenantMetricDimensions sets the maximum unique tenant labels used in metrics.
func WithMaxTrackedListPendingFailureTenants ¶
func WithMaxTrackedListPendingFailureTenants(maxTenants int) DispatcherOption
WithMaxTrackedListPendingFailureTenants sets the in-memory cap for tenant-specific ListPending failure counters.
func WithMeterProvider ¶
func WithMeterProvider(provider metric.MeterProvider) DispatcherOption
WithMeterProvider injects a custom meter provider for dispatcher metrics. Passing nil keeps the default global OpenTelemetry meter provider.
func WithPriorityBudget ¶
func WithPriorityBudget(budget int) DispatcherOption
WithPriorityBudget sets the per-cycle priority selection budget.
func WithPriorityEventTypes ¶
func WithPriorityEventTypes(eventTypes ...string) DispatcherOption
WithPriorityEventTypes sets the ordered event types selected before generic pending events.
func WithProcessingTimeout ¶
func WithProcessingTimeout(timeout time.Duration) DispatcherOption
WithProcessingTimeout sets the timeout used to reclaim stuck processing events.
func WithPublishBackoff ¶
func WithPublishBackoff(backoff time.Duration) DispatcherOption
WithPublishBackoff sets base backoff for publish retry attempts.
func WithPublishMaxAttempts ¶
func WithPublishMaxAttempts(maxAttempts int) DispatcherOption
WithPublishMaxAttempts sets max publish attempts per event.
func WithRetryClassifier ¶
func WithRetryClassifier(classifier RetryClassifier) DispatcherOption
WithRetryClassifier sets the non-retryable error classifier.
func WithRetryWindow ¶
func WithRetryWindow(retryWindow time.Duration) DispatcherOption
WithRetryWindow sets failed-event cooldown before retry reclamation.
func WithTenantMetricAttributes ¶
func WithTenantMetricAttributes(enabled bool) DispatcherOption
WithTenantMetricAttributes toggles tenant attributes for dispatcher metrics.
type EventHandler ¶
type EventHandler func(ctx context.Context, event *OutboxEvent) error
EventHandler handles one outbox event.
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry stores event handlers by event type.
func NewHandlerRegistry ¶
func NewHandlerRegistry() *HandlerRegistry
func (*HandlerRegistry) Handle ¶
func (registry *HandlerRegistry) Handle(ctx context.Context, event *OutboxEvent) error
func (*HandlerRegistry) Register ¶
func (registry *HandlerRegistry) Register(eventType string, handler EventHandler) error
type OutboxEvent ¶
type OutboxEvent struct {
ID uuid.UUID
EventType string
AggregateID uuid.UUID
Payload []byte
Status string
Attempts int
PublishedAt *time.Time
LastError string
CreatedAt time.Time
UpdatedAt time.Time
}
OutboxEvent is an event stored in the outbox for reliable delivery.
func NewOutboxEvent ¶
func NewOutboxEvent( ctx context.Context, eventType string, aggregateID uuid.UUID, payload []byte, ) (*OutboxEvent, error)
NewOutboxEvent creates a valid outbox event initialized as pending.
type OutboxEventStatus ¶
type OutboxEventStatus string
OutboxEventStatus represents a valid outbox event lifecycle state.
const ( StatusPending OutboxEventStatus = OutboxStatusPending StatusProcessing OutboxEventStatus = OutboxStatusProcessing StatusPublished OutboxEventStatus = OutboxStatusPublished StatusFailed OutboxEventStatus = OutboxStatusFailed StatusInvalid OutboxEventStatus = OutboxStatusInvalid )
func ParseOutboxEventStatus ¶
func ParseOutboxEventStatus(raw string) (OutboxEventStatus, error)
ParseOutboxEventStatus validates and converts a raw string status.
func (OutboxEventStatus) CanTransitionTo ¶
func (status OutboxEventStatus) CanTransitionTo(next OutboxEventStatus) bool
CanTransitionTo reports whether a transition from status to next is allowed.
func (OutboxEventStatus) IsValid ¶
func (status OutboxEventStatus) IsValid() bool
IsValid reports whether the status is part of the outbox lifecycle.
func (OutboxEventStatus) String ¶
func (status OutboxEventStatus) String() string
type OutboxRepository ¶
type OutboxRepository interface {
Create(ctx context.Context, event *OutboxEvent) (*OutboxEvent, error)
CreateWithTx(ctx context.Context, tx Tx, event *OutboxEvent) (*OutboxEvent, error)
ListPending(ctx context.Context, limit int) ([]*OutboxEvent, error)
ListPendingByType(ctx context.Context, eventType string, limit int) ([]*OutboxEvent, error)
ListTenants(ctx context.Context) ([]string, error)
GetByID(ctx context.Context, id uuid.UUID) (*OutboxEvent, error)
MarkPublished(ctx context.Context, id uuid.UUID, publishedAt time.Time) error
MarkFailed(ctx context.Context, id uuid.UUID, errMsg string, maxAttempts int) error
ListFailedForRetry(ctx context.Context, limit int, failedBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
ResetForRetry(ctx context.Context, limit int, failedBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
ResetStuckProcessing(ctx context.Context, limit int, processingBefore time.Time, maxAttempts int) ([]*OutboxEvent, error)
MarkInvalid(ctx context.Context, id uuid.UUID, errMsg string) error
}
OutboxRepository defines persistence operations for outbox events.
type RetryClassifier ¶
RetryClassifier determines whether an error should not be retried.
type RetryClassifierFunc ¶
func (RetryClassifierFunc) IsNonRetryable ¶
func (fn RetryClassifierFunc) IsNonRetryable(err error) bool
type TenantDiscoverer ¶
TenantDiscoverer lists tenant identifiers to dispatch events for.
type TenantResolver ¶
type TenantResolver interface {
ApplyTenant(ctx context.Context, tx *sql.Tx, tenantID string) error
}
TenantResolver applies tenant-scoping rules for a transaction.
type Tx ¶
Tx is the transactional handle used by CreateWithTx.
It intentionally aliases *sql.Tx to keep the repository contract compatible with existing database/sql transaction orchestration and tenant resolvers. This avoids hidden adapter layers in write paths where tenant scoping runs inside the caller's transaction.