Documentation
¶
Overview ¶
Package command provides an in-process CommandBus with single-writer guarantee per stream.
Commands are routed to registered handlers and executed with ordering guarantees within each stream. A dedicated goroutine per active stream ensures no concurrent writes — eliminating the need for optimistic concurrency control.
The bus supports:
- Fire-and-forget (Send) and synchronous (SendAndWait) dispatch
- Lazy stream workers with idle timeout
- Composable middleware (logging, metrics, validation)
- Graceful shutdown with in-flight drain
- Integration with eskit.Decider for domain logic
Multi-command registration helpers.
These reduce boilerplate when registering multiple commands that share the same aggregate state type, evolve function, and event store.
Example:
command.Register3[OrderState](
bus, store,
func() OrderState { return OrderState{} },
evolveOrder,
command.Cmd[CreateOrder, OrderState](decideCreate, command.WithHint(command.HintNewStream)),
command.Cmd[AddItem, OrderState](decideAddItem),
command.Cmd[SubmitOrder, OrderState](decideSubmit, command.WithHint(command.HintExisting)),
)
Index ¶
- Variables
- func CommandTypeName(t reflect.Type) string
- func NameOf(cmd Command) string
- func Register[S any, C Command, E any](bus *InProcessBus, store eskit.EventStore[E], decider eskit.Decider[S, C, E], ...)
- func Register2[S any, E any](bus *InProcessBus, store eskit.EventStore[E], initial func() S, ...)
- func Register3[S any, E any](bus *InProcessBus, store eskit.EventStore[E], initial func() S, ...)
- func Register4[S any, E any](bus *InProcessBus, store eskit.EventStore[E], initial func() S, ...)
- func Register5[S any, E any](bus *InProcessBus, store eskit.EventStore[E], initial func() S, ...)
- func RegisterFunc[C Command](bus *InProcessBus, fn func(ctx context.Context, cmd C) (*Result, error))
- type BusOption
- type CmdSpec
- type Command
- type HandlerFunc
- type InProcessBus
- func (b *InProcessBus) ActiveWorkers() int
- func (b *InProcessBus) Close() error
- func (b *InProcessBus) HandleType(cmdType reflect.Type, h handler)
- func (b *InProcessBus) Send(ctx context.Context, cmd Command) error
- func (b *InProcessBus) SendAndWait(ctx context.Context, cmd Command, timeout time.Duration) (*Result, error)
- func (b *InProcessBus) Use(mw Middleware)
- type Middleware
- type RegisterOption
- type Result
- type StreamHint
Constants ¶
This section is empty.
Variables ¶
var ( // ErrBusClosed is returned when sending to a closed bus. ErrBusClosed = errors.New("command: bus is closed") // ErrUnknownCommand is returned when no handler is registered for a command type. ErrUnknownCommand = errors.New("command: unknown command type") // ErrTimeout is returned when SendAndWait exceeds the timeout. ErrTimeout = errors.New("command: timeout waiting for result") )
var ErrStreamNotFound = errors.New("command: stream not found")
ErrStreamNotFound is returned when HintExisting is used and the stream has no events.
Functions ¶
func CommandTypeName ¶
CommandTypeName derives "{package}.{TypeName}" from a command's reflect.Type. Used by middleware and external packages for logging/error messages. Returns "unknown" for nil types. Always PascalCase.
func NameOf ¶
NameOf returns the derived type name for a command value. Useful in middleware and logging where you need the command name. Returns "unknown" for nil commands.
func Register ¶
func Register[S any, C Command, E any]( bus *InProcessBus, store eskit.EventStore[E], decider eskit.Decider[S, C, E], opts ...RegisterOption, )
Register wires a typed command handler into the bus using a Decider. The handler receives the command's current stream state and returns events. This is the generic, type-safe registration — zero type assertions at runtime.
Options:
- WithHint(HintNewStream): skip load, use initial state, expectedVersion=0
- WithHint(HintExisting): load, fast-fail with ErrStreamNotFound if empty
- default (no hint or HintAny): load → decide → append (current behavior)
Type parameters:
- S: aggregate state type
- C: concrete command type (must implement Command)
- E: event type
func Register2 ¶
func Register2[S any, E any]( bus *InProcessBus, store eskit.EventStore[E], initial func() S, evolve func(S, E) S, spec1, spec2 CmdSpec[S, E], )
Register2 registers 2 command handlers sharing the same state, evolve, and store.
func Register3 ¶
func Register3[S any, E any]( bus *InProcessBus, store eskit.EventStore[E], initial func() S, evolve func(S, E) S, spec1, spec2, spec3 CmdSpec[S, E], )
Register3 registers 3 command handlers sharing the same state, evolve, and store.
func Register4 ¶
func Register4[S any, E any]( bus *InProcessBus, store eskit.EventStore[E], initial func() S, evolve func(S, E) S, spec1, spec2, spec3, spec4 CmdSpec[S, E], )
Register4 registers 4 command handlers sharing the same state, evolve, and store.
func Register5 ¶
func Register5[S any, E any]( bus *InProcessBus, store eskit.EventStore[E], initial func() S, evolve func(S, E) S, spec1, spec2, spec3, spec4, spec5 CmdSpec[S, E], )
Register5 registers 5 command handlers sharing the same state, evolve, and store.
func RegisterFunc ¶
func RegisterFunc[C Command]( bus *InProcessBus, fn func(ctx context.Context, cmd C) (*Result, error), )
RegisterFunc wires a raw handler function (without a Decider) into the bus. Useful when you want custom handling logic that doesn't fit the Decider pattern.
Types ¶
type BusOption ¶
type BusOption func(*busConfig)
BusOption configures the InProcessBus.
func WithIdleTimeout ¶
WithIdleTimeout sets how long an idle stream worker stays alive before shutting down. This reclaims resources for inactive streams. Default: 30s.
func WithMaxConcurrentStreams ¶
WithMaxConcurrentStreams limits the number of active stream workers. When the limit is reached, new streams block until a worker shuts down. Default: 10000.
func WithWorkerBufferSize ¶
WithWorkerBufferSize sets the channel buffer size per stream worker. Larger buffers absorb bursts but use more memory. Default: 64.
type CmdSpec ¶
CmdSpec captures a single command's decide function and options for bulk registration.
type Command ¶
type Command interface {
// StreamID returns the target stream for this command.
// Must not return empty string.
StreamID() string
}
Command represents a domain command targeting a specific stream. The bus routes commands by Go type (reflect.Type), not by string name.
type HandlerFunc ¶
HandlerFunc is a typed command handler function. It takes the current state and a command, returning events to persist.
type InProcessBus ¶
type InProcessBus struct {
// contains filtered or unexported fields
}
InProcessBus is a single-writer command bus. Commands targeting the same stream are serialized through a dedicated goroutine, guaranteeing ordering and eliminating concurrency conflicts.
func NewBus ¶
func NewBus(opts ...BusOption) *InProcessBus
NewBus creates a new InProcessBus with the given options.
func (*InProcessBus) ActiveWorkers ¶
func (b *InProcessBus) ActiveWorkers() int
ActiveWorkers returns the number of currently active stream workers. Useful for testing and monitoring.
func (*InProcessBus) Close ¶
func (b *InProcessBus) Close() error
Close gracefully shuts down the bus. In-flight commands complete; new commands are rejected. Safe to call multiple times.
func (*InProcessBus) HandleType ¶
func (b *InProcessBus) HandleType(cmdType reflect.Type, h handler)
HandleType registers a type-erased handler for a command type. Prefer using the generic Register function instead.
func (*InProcessBus) Send ¶
func (b *InProcessBus) Send(ctx context.Context, cmd Command) error
Send dispatches a command without waiting for the result (fire-and-forget). Returns an error immediately if the command type is unknown or the bus is closed.
func (*InProcessBus) SendAndWait ¶
func (b *InProcessBus) SendAndWait(ctx context.Context, cmd Command, timeout time.Duration) (*Result, error)
SendAndWait dispatches a command and waits for the result or timeout.
func (*InProcessBus) Use ¶
func (b *InProcessBus) Use(mw Middleware)
Use adds middleware to the bus. Middleware executes in order added (first = outermost).
type Middleware ¶
type Middleware func(ctx context.Context, cmd Command, next func(ctx context.Context, cmd Command) (*Result, error)) (*Result, error)
Middleware intercepts command dispatch. It can inspect/modify the command, measure timing, enforce validation, etc. Call next to continue the chain.
func DispatchMiddleware ¶
func DispatchMiddleware(d *eskit.EventDispatcher[any]) Middleware
DispatchMiddleware returns command bus middleware that dispatches newly persisted events to all matching subscriptions after each successful command.
This is the unified replacement for both ProjectionMiddleware and ReactionMiddleware. The EventDispatcher handles both read-model projections and side-effect reactions through the same dispatch mechanism.
Usage:
dispatcher := eskit.NewEventDispatcher[any]() bus.Use(command.DispatchMiddleware(dispatcher))
type RegisterOption ¶
type RegisterOption func(*registerConfig)
RegisterOption configures the Register call.
func WithHint ¶
func WithHint(h StreamHint) RegisterOption
WithHint sets the stream existence hint for the registered command. Default is HintAny (load → decide → append).
func WithSnapshot ¶
func WithSnapshot(n int, snapshotStore interface{}) RegisterOption
WithSnapshot enables snapshot support. After every `n` events, the current state is persisted to the snapshot store. On load, the snapshot is used as a starting point and only delta events are replayed.
The snapshotStore must be an eskit.SnapshotStore[S] matching the state type of the Decider passed to Register.
type Result ¶
type Result struct {
// Events contains the raw produced events (as []any since the bus is type-erased at dispatch).
Events []any
// StreamVersion is the version of the stream after the command executed.
StreamVersion int
// Duration is how long the handler took to execute.
Duration time.Duration
}
Result holds the outcome of a command execution.
type StreamHint ¶
type StreamHint int
StreamHint tells the command bus how to optimize the load/append cycle.
const ( // HintAny is the default: load events, rebuild state, decide, append. HintAny StreamHint = iota // HintNewStream skips the store.Load() call entirely. The decider receives // the initial state and append uses expectedVersion=0. If the stream already // exists the store returns ErrConcurrencyConflict — giving you a cheap // "create-only" guard without reading. HintNewStream // HintExisting loads events and fast-fails with ErrStreamNotFound if the // stream has no events. Useful for update/delete commands that should never // operate on a non-existent stream. HintExisting )