actor

package
v0.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package actor 提供轻量级 Actor 模型实现 设计原则:

  • 最小化依赖,仅使用标准库
  • 与现有 Agent/EventBus 架构兼容
  • 支持本地 Actor,预留分布式扩展接口

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Actor

type Actor interface {
	// Receive 处理接收到的消息
	// ctx 提供 Actor 上下文,msg 为接收到的消息
	Receive(ctx *Context, msg Message)
}

Actor 实现此接口即可成为 Actor

type ActorFunc

type ActorFunc func(ctx *Context, msg Message)

ActorFunc 函数式 Actor,便于快速创建简单 Actor

func (ActorFunc) Receive

func (f ActorFunc) Receive(ctx *Context, msg Message)

Receive 实现 Actor 接口

type AllForOneStrategy

type AllForOneStrategy struct {
	MaxRestarts    int
	WithinDuration time.Duration
	Decider        Decider
	// contains filtered or unexported fields
}

AllForOneStrategy 全部重启策略 当一个子 Actor 失败时,重启所有子 Actor

func NewAllForOneStrategy

func NewAllForOneStrategy(maxRestarts int, within time.Duration, decider Decider) *AllForOneStrategy

NewAllForOneStrategy 创建全部重启策略

func (*AllForOneStrategy) HandleFailure

func (s *AllForOneStrategy) HandleFailure(system *System, child *PID, msg Message, err any) any

HandleFailure 实现 SupervisorStrategy

type ChildSpec

type ChildSpec struct {
	Name    string
	Factory func() Actor
	Props   *Props
}

ChildSpec 子 Actor 规格

type CompositeStrategy

type CompositeStrategy struct {
	// contains filtered or unexported fields
}

CompositeStrategy 组合策略 根据错误类型选择不同的策略

func NewCompositeStrategy

func NewCompositeStrategy(fallback SupervisorStrategy) *CompositeStrategy

NewCompositeStrategy 创建组合策略

func (*CompositeStrategy) HandleFailure

func (s *CompositeStrategy) HandleFailure(system *System, child *PID, msg Message, err any) any

HandleFailure 实现 SupervisorStrategy

func (*CompositeStrategy) RegisterStrategy

func (s *CompositeStrategy) RegisterStrategy(errType string, strategy SupervisorStrategy)

RegisterStrategy 注册特定错误类型的策略

type Context

type Context struct {
	// Self 当前 Actor 的 PID
	Self *PID
	// Sender 消息发送者的 PID(如果有)
	Sender *PID
	// Parent 父 Actor 的 PID(如果有)
	Parent *PID
	// Children 子 Actor 列表
	Children []*PID
	// contains filtered or unexported fields
}

Context Actor 执行上下文 提供 Actor 执行时所需的环境信息和操作方法

func (*Context) Context

func (c *Context) Context() context.Context

Context 获取 Go context

func (*Context) Forward

func (c *Context) Forward(target *PID)

Forward 转发当前消息到另一个 Actor

func (*Context) Message

func (c *Context) Message() Message

Message 获取当前正在处理的消息

func (*Context) Reply

func (c *Context) Reply(msg Message)

Reply 回复消息给发送者 如果是 Request/Response 模式,通过 channel 返回响应 如果有 Sender,通过消息发送响应

func (*Context) Spawn

func (c *Context) Spawn(actor Actor, name string) *PID

Spawn 创建子 Actor

func (*Context) SpawnWithProps

func (c *Context) SpawnWithProps(actor Actor, props *Props) *PID

SpawnWithProps 使用属性创建子 Actor

func (*Context) Stop

func (c *Context) Stop(pid *PID)

Stop 停止指定 Actor

func (*Context) StopSelf

func (c *Context) StopSelf()

StopSelf 停止当前 Actor

type Decider

type Decider func(err any) Directive

Decider 决策函数类型

type Directive

type Directive int

Directive 监督指令

const (
	// DirectiveResume 恢复 Actor,继续处理消息
	DirectiveResume Directive = iota
	// DirectiveRestart 重启 Actor
	DirectiveRestart
	// DirectiveStop 停止 Actor
	DirectiveStop
	// DirectiveEscalate 上报给父 Actor 处理
	DirectiveEscalate
	// DirectiveRestartAfter 延迟重启 Actor
	DirectiveRestartAfter
)

func DefaultDecider

func DefaultDecider(err any) Directive

DefaultDecider 默认决策器 对所有错误采取重启策略

func EscalatingDecider

func EscalatingDecider(err any) Directive

EscalatingDecider 上报决策器 对所有错误采取上报策略

func ResumingDecider

func ResumingDecider(err any) Directive

ResumingDecider 恢复决策器 对所有错误采取恢复策略(忽略错误继续运行)

func StoppingDecider

func StoppingDecider(err any) Directive

StoppingDecider 停止决策器 对所有错误采取停止策略

func (Directive) String

func (d Directive) String() string

String 返回指令名称

type DirectiveWithDelay

type DirectiveWithDelay struct {
	Directive Directive
	Delay     time.Duration
}

DirectiveWithDelay 带延迟的指令

type DispatcherType

type DispatcherType int

DispatcherType 调度器类型

const (
	// DispatcherDefault 默认调度器(每个 Actor 一个 goroutine)
	DispatcherDefault DispatcherType = iota
	// DispatcherShared 共享调度器(多个 Actor 共享 goroutine 池)
	DispatcherShared
)

type ExponentialBackoffStrategy

type ExponentialBackoffStrategy struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	MaxRestarts  int
	Decider      Decider
	// contains filtered or unexported fields
}

ExponentialBackoffStrategy 指数退避策略 重启间隔逐渐增加

func NewExponentialBackoffStrategy

func NewExponentialBackoffStrategy(initialDelay, maxDelay time.Duration, maxRestarts int, decider Decider) *ExponentialBackoffStrategy

NewExponentialBackoffStrategy 创建指数退避策略

func (*ExponentialBackoffStrategy) HandleFailure

func (s *ExponentialBackoffStrategy) HandleFailure(system *System, child *PID, msg Message, err any) any

HandleFailure 实现 SupervisorStrategy

func (*ExponentialBackoffStrategy) Reset

func (s *ExponentialBackoffStrategy) Reset()

Reset 重置退避状态

type Message

type Message interface {
	// Kind 返回消息类型标识,用于路由和监控
	Kind() string
}

Message Actor 消息接口 所有 Actor 间传递的消息都必须实现此接口

type OneForOneStrategy

type OneForOneStrategy struct {
	MaxRestarts    int           // 最大重启次数
	WithinDuration time.Duration // 时间窗口
	Decider        Decider       // 决策函数
	// contains filtered or unexported fields
}

OneForOneStrategy 一对一策略 只重启失败的 Actor,不影响其他子 Actor

func NewOneForOneStrategy

func NewOneForOneStrategy(maxRestarts int, within time.Duration, decider Decider) *OneForOneStrategy

NewOneForOneStrategy 创建一对一策略

func (*OneForOneStrategy) HandleFailure

func (s *OneForOneStrategy) HandleFailure(system *System, child *PID, msg Message, err any) any

HandleFailure 实现 SupervisorStrategy

type PID

type PID struct {
	// ID Actor 唯一标识(本地)
	ID string
	// Address 网络地址,本地 Actor 为空
	// 格式: "host:port" 用于未来分布式扩展
	Address string
	// contains filtered or unexported fields
}

PID (Process ID) Actor 进程标识符 类似 Erlang 的 PID,是 Actor 的唯一寻址方式

func (*PID) Request

func (p *PID) Request(msg Message, timeout time.Duration) (Message, error)

Request 发送请求并等待响应(同步调用)

func (*PID) String

func (p *PID) String() string

String 返回 PID 的字符串表示

func (*PID) Tell

func (p *PID) Tell(msg Message)

Tell 发送消息(fire-and-forget)

type PoisonPill

type PoisonPill struct{}

PoisonPill 毒丸消息,优雅停止 Actor

func (*PoisonPill) Kind

func (p *PoisonPill) Kind() string

type Props

type Props struct {
	// Name Actor 名称
	Name string
	// MailboxSize 邮箱大小
	MailboxSize int
	// Dispatcher 调度器类型
	Dispatcher DispatcherType
	// SupervisorStrategy 监督策略
	SupervisorStrategy SupervisorStrategy
}

Props Actor 属性配置

func DefaultProps

func DefaultProps(name string) *Props

DefaultProps 默认属性

type ResponseTimeout

type ResponseTimeout struct {
	Target  *PID
	Timeout time.Duration
}

ResponseTimeout 响应超时错误

func (*ResponseTimeout) Error

func (r *ResponseTimeout) Error() string

func (*ResponseTimeout) Kind

func (r *ResponseTimeout) Kind() string

type Restarting

type Restarting struct{}

Restarting Actor 正在重启消息

func (*Restarting) Kind

func (r *Restarting) Kind() string

type Started

type Started struct{}

Started Actor 启动完成消息

func (*Started) Kind

func (s *Started) Kind() string

type Stopped

type Stopped struct{}

Stopped Actor 已停止消息

func (*Stopped) Kind

func (s *Stopped) Kind() string

type Stopping

type Stopping struct{}

Stopping Actor 正在停止消息

func (*Stopping) Kind

func (s *Stopping) Kind() string

type SupervisorActor

type SupervisorActor struct {
	// contains filtered or unexported fields
}

SupervisorActor 监督者 Actor 用于构建监督树

func NewSupervisorActor

func NewSupervisorActor(config *SupervisorConfig) *SupervisorActor

NewSupervisorActor 创建监督者 Actor

func (*SupervisorActor) GetChild

func (s *SupervisorActor) GetChild(name string) *PID

GetChild 获取子 Actor

func (*SupervisorActor) Receive

func (s *SupervisorActor) Receive(ctx *Context, msg Message)

Receive 处理消息

type SupervisorConfig

type SupervisorConfig struct {
	Strategy SupervisorStrategy
	Children []ChildSpec
}

SupervisorConfig 监督配置

type SupervisorStrategy

type SupervisorStrategy interface {
	// HandleFailure 处理 Actor 失败
	// 返回应该采取的指令,可以是 Directive 或 DirectiveWithDelay
	HandleFailure(system *System, child *PID, msg Message, err any) any
}

SupervisorStrategy 监督策略接口

func DefaultSupervisorStrategy

func DefaultSupervisorStrategy() SupervisorStrategy

DefaultSupervisorStrategy 默认监督策略 允许 3 次重启在 1 分钟内

func LenientSupervisorStrategy

func LenientSupervisorStrategy() SupervisorStrategy

LenientSupervisorStrategy 宽松监督策略 允许更多重启

func StrictSupervisorStrategy

func StrictSupervisorStrategy() SupervisorStrategy

StrictSupervisorStrategy 严格监督策略 任何失败都停止 Actor

type System

type System struct {
	// contains filtered or unexported fields
}

System Actor 系统 管理所有 Actor 的生命周期、消息路由和监督

func NewSystem

func NewSystem(name string) *System

NewSystem 创建新的 Actor 系统

func NewSystemWithConfig

func NewSystemWithConfig(name string, config *SystemConfig) *System

NewSystemWithConfig 使用配置创建 Actor 系统

func (*System) GetActor

func (s *System) GetActor(name string) (*PID, bool)

GetActor 获取 Actor(用于调试)

func (*System) ListActors

func (s *System) ListActors() []*PID

ListActors 列出所有 Actor(用于调试)

func (*System) Name

func (s *System) Name() string

Name 返回系统名称

func (*System) Request

func (s *System) Request(target *PID, msg Message, timeout time.Duration) (Message, error)

Request 同步请求(等待响应)

func (*System) Send

func (s *System) Send(target *PID, msg Message)

Send 发送消息(无发送者)

func (*System) SendWithSender

func (s *System) SendWithSender(target *PID, msg Message, sender *PID)

SendWithSender 发送消息(带发送者)

func (*System) Shutdown

func (s *System) Shutdown()

Shutdown 关闭整个 Actor 系统

func (*System) ShutdownWithTimeout

func (s *System) ShutdownWithTimeout(timeout time.Duration)

ShutdownWithTimeout 带超时的关闭

func (*System) Spawn

func (s *System) Spawn(actor Actor, name string) *PID

Spawn 创建 Actor

func (*System) SpawnWithProps

func (s *System) SpawnWithProps(actor Actor, props *Props) *PID

SpawnWithProps 使用属性创建 Actor

func (*System) Stats

func (s *System) Stats() *SystemStats

Stats 获取统计信息

func (*System) Stop

func (s *System) Stop(pid *PID)

Stop 停止 Actor

func (*System) StopGracefully

func (s *System) StopGracefully(pid *PID, timeout time.Duration) error

StopGracefully 优雅停止 Actor(等待处理完当前消息)

type SystemConfig

type SystemConfig struct {
	// MailboxSize 全局邮箱大小
	MailboxSize int
	// DeadLetterSize 死信队列大小
	DeadLetterSize int
	// DefaultActorMailboxSize 默认 Actor 邮箱大小
	DefaultActorMailboxSize int
	// EnableDeadLetterLogging 是否记录死信
	EnableDeadLetterLogging bool
	// PanicHandler panic 处理函数
	PanicHandler func(actor *PID, msg Message, err any)
}

SystemConfig 系统配置

func DefaultSystemConfig

func DefaultSystemConfig() *SystemConfig

DefaultSystemConfig 默认系统配置

type SystemStats

type SystemStats struct {
	TotalActors    int64
	TotalMessages  int64
	DeadLetters    int64
	ProcessedMsgs  int64
	AverageLatency time.Duration
	StartTime      time.Time
}

SystemStats 系统统计

type Terminated

type Terminated struct {
	Who *PID
}

Terminated Actor 终止通知

func (*Terminated) Kind

func (t *Terminated) Kind() string

type Unwatch

type Unwatch struct {
	Watcher *PID
}

Unwatch 取消监控

func (*Unwatch) Kind

func (u *Unwatch) Kind() string

type Watch

type Watch struct {
	Watcher *PID
}

Watch 监控请求

func (*Watch) Kind

func (w *Watch) Kind() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL