command

package
v0.0.0-...-18e15ae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package command provides an in-process CommandBus with single-writer guarantee per stream.

Commands are routed to registered handlers and executed with ordering guarantees within each stream. A dedicated goroutine per active stream ensures no concurrent writes — eliminating the need for optimistic concurrency control.

The bus supports:

  • Fire-and-forget (Send) and synchronous (SendAndWait) dispatch
  • Lazy stream workers with idle timeout
  • Composable middleware (logging, metrics, validation)
  • Graceful shutdown with in-flight drain
  • Integration with eskit.Decider for domain logic

Multi-command registration helpers.

These reduce boilerplate when registering multiple commands that share the same aggregate state type, evolve function, and event store.

Example:

command.Register3[OrderState](
    bus, store,
    func() OrderState { return OrderState{} },
    evolveOrder,
    command.Cmd[CreateOrder, OrderState](decideCreate, command.WithHint(command.HintNewStream)),
    command.Cmd[AddItem, OrderState](decideAddItem),
    command.Cmd[SubmitOrder, OrderState](decideSubmit, command.WithHint(command.HintExisting)),
)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBusClosed is returned when sending to a closed bus.
	ErrBusClosed = errors.New("command: bus is closed")

	// ErrUnknownCommand is returned when no handler is registered for a command type.
	ErrUnknownCommand = errors.New("command: unknown command type")

	// ErrTimeout is returned when SendAndWait exceeds the timeout.
	ErrTimeout = errors.New("command: timeout waiting for result")
)
View Source
var ErrStreamNotFound = errors.New("command: stream not found")

ErrStreamNotFound is returned when HintExisting is used and the stream has no events.

Functions

func CommandTypeName

func CommandTypeName(t reflect.Type) string

CommandTypeName derives "{package}.{TypeName}" from a command's reflect.Type. Used by middleware and external packages for logging/error messages. Returns "unknown" for nil types. Always PascalCase.

func NameOf

func NameOf(cmd Command) string

NameOf returns the derived type name for a command value. Useful in middleware and logging where you need the command name. Returns "unknown" for nil commands.

func Register

func Register[S any, C Command, E any](
	bus *InProcessBus,
	store eskit.EventStore[E],
	decider eskit.Decider[S, C, E],
	opts ...RegisterOption,
)

Register wires a typed command handler into the bus using a Decider. The handler receives the command's current stream state and returns events. This is the generic, type-safe registration — zero type assertions at runtime.

Options:

  • WithHint(HintNewStream): skip load, use initial state, expectedVersion=0
  • WithHint(HintExisting): load, fast-fail with ErrStreamNotFound if empty
  • default (no hint or HintAny): load → decide → append (current behavior)

Type parameters:

  • S: aggregate state type
  • C: concrete command type (must implement Command)
  • E: event type

func Register2

func Register2[S any, E any](
	bus *InProcessBus,
	store eskit.EventStore[E],
	initial func() S,
	evolve func(S, E) S,
	spec1, spec2 CmdSpec[S, E],
)

Register2 registers 2 command handlers sharing the same state, evolve, and store.

func Register3

func Register3[S any, E any](
	bus *InProcessBus,
	store eskit.EventStore[E],
	initial func() S,
	evolve func(S, E) S,
	spec1, spec2, spec3 CmdSpec[S, E],
)

Register3 registers 3 command handlers sharing the same state, evolve, and store.

func Register4

func Register4[S any, E any](
	bus *InProcessBus,
	store eskit.EventStore[E],
	initial func() S,
	evolve func(S, E) S,
	spec1, spec2, spec3, spec4 CmdSpec[S, E],
)

Register4 registers 4 command handlers sharing the same state, evolve, and store.

func Register5

func Register5[S any, E any](
	bus *InProcessBus,
	store eskit.EventStore[E],
	initial func() S,
	evolve func(S, E) S,
	spec1, spec2, spec3, spec4, spec5 CmdSpec[S, E],
)

Register5 registers 5 command handlers sharing the same state, evolve, and store.

func RegisterFunc

func RegisterFunc[C Command](
	bus *InProcessBus,
	fn func(ctx context.Context, cmd C) (*Result, error),
)

RegisterFunc wires a raw handler function (without a Decider) into the bus. Useful when you want custom handling logic that doesn't fit the Decider pattern.

Types

type BusOption

type BusOption func(*busConfig)

BusOption configures the InProcessBus.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) BusOption

WithIdleTimeout sets how long an idle stream worker stays alive before shutting down. This reclaims resources for inactive streams. Default: 30s.

func WithMaxConcurrentStreams

func WithMaxConcurrentStreams(max int) BusOption

WithMaxConcurrentStreams limits the number of active stream workers. When the limit is reached, new streams block until a worker shuts down. Default: 10000.

func WithWorkerBufferSize

func WithWorkerBufferSize(size int) BusOption

WithWorkerBufferSize sets the channel buffer size per stream worker. Larger buffers absorb bursts but use more memory. Default: 64.

type CmdSpec

type CmdSpec[S any, E any] struct {
	// contains filtered or unexported fields
}

CmdSpec captures a single command's decide function and options for bulk registration.

func Cmd

func Cmd[C Command, S any, E any](
	decide func(S, C) ([]E, error),
	opts ...RegisterOption,
) CmdSpec[S, E]

Cmd creates a CmdSpec for use with Register2–Register5.

type Command

type Command interface {
	// StreamID returns the target stream for this command.
	// Must not return empty string.
	StreamID() string
}

Command represents a domain command targeting a specific stream. The bus routes commands by Go type (reflect.Type), not by string name.

type HandlerFunc

type HandlerFunc[S any, C Command, E any] func(ctx context.Context, state S, cmd C) ([]E, error)

HandlerFunc is a typed command handler function. It takes the current state and a command, returning events to persist.

type InProcessBus

type InProcessBus struct {
	// contains filtered or unexported fields
}

InProcessBus is a single-writer command bus. Commands targeting the same stream are serialized through a dedicated goroutine, guaranteeing ordering and eliminating concurrency conflicts.

func NewBus

func NewBus(opts ...BusOption) *InProcessBus

NewBus creates a new InProcessBus with the given options.

func (*InProcessBus) ActiveWorkers

func (b *InProcessBus) ActiveWorkers() int

ActiveWorkers returns the number of currently active stream workers. Useful for testing and monitoring.

func (*InProcessBus) Close

func (b *InProcessBus) Close() error

Close gracefully shuts down the bus. In-flight commands complete; new commands are rejected. Safe to call multiple times.

func (*InProcessBus) HandleType

func (b *InProcessBus) HandleType(cmdType reflect.Type, h handler)

HandleType registers a type-erased handler for a command type. Prefer using the generic Register function instead.

func (*InProcessBus) Send

func (b *InProcessBus) Send(ctx context.Context, cmd Command) error

Send dispatches a command without waiting for the result (fire-and-forget). Returns an error immediately if the command type is unknown or the bus is closed.

func (*InProcessBus) SendAndWait

func (b *InProcessBus) SendAndWait(ctx context.Context, cmd Command, timeout time.Duration) (*Result, error)

SendAndWait dispatches a command and waits for the result or timeout.

func (*InProcessBus) Use

func (b *InProcessBus) Use(mw Middleware)

Use adds middleware to the bus. Middleware executes in order added (first = outermost).

type Middleware

type Middleware func(ctx context.Context, cmd Command, next func(ctx context.Context, cmd Command) (*Result, error)) (*Result, error)

Middleware intercepts command dispatch. It can inspect/modify the command, measure timing, enforce validation, etc. Call next to continue the chain.

func DispatchMiddleware

func DispatchMiddleware(d *eskit.EventDispatcher[any]) Middleware

DispatchMiddleware returns command bus middleware that dispatches newly persisted events to all matching subscriptions after each successful command.

This is the unified replacement for both ProjectionMiddleware and ReactionMiddleware. The EventDispatcher handles both read-model projections and side-effect reactions through the same dispatch mechanism.

Usage:

dispatcher := eskit.NewEventDispatcher[any]()
bus.Use(command.DispatchMiddleware(dispatcher))

type RegisterOption

type RegisterOption func(*registerConfig)

RegisterOption configures the Register call.

func WithHint

func WithHint(h StreamHint) RegisterOption

WithHint sets the stream existence hint for the registered command. Default is HintAny (load → decide → append).

func WithSnapshot

func WithSnapshot(n int, snapshotStore interface{}) RegisterOption

WithSnapshot enables snapshot support. After every `n` events, the current state is persisted to the snapshot store. On load, the snapshot is used as a starting point and only delta events are replayed.

The snapshotStore must be an eskit.SnapshotStore[S] matching the state type of the Decider passed to Register.

type Result

type Result struct {
	// Events contains the raw produced events (as []any since the bus is type-erased at dispatch).
	Events []any

	// StreamVersion is the version of the stream after the command executed.
	StreamVersion int

	// Duration is how long the handler took to execute.
	Duration time.Duration
}

Result holds the outcome of a command execution.

type StreamHint

type StreamHint int

StreamHint tells the command bus how to optimize the load/append cycle.

const (
	// HintAny is the default: load events, rebuild state, decide, append.
	HintAny StreamHint = iota

	// HintNewStream skips the store.Load() call entirely. The decider receives
	// the initial state and append uses expectedVersion=0. If the stream already
	// exists the store returns ErrConcurrencyConflict — giving you a cheap
	// "create-only" guard without reading.
	HintNewStream

	// HintExisting loads events and fast-fails with ErrStreamNotFound if the
	// stream has no events. Useful for update/delete commands that should never
	// operate on a non-existent stream.
	HintExisting
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL