Documentation
¶
Overview ¶
Package graph implements a directed acyclic graph (DAG) pattern for orchestrating multi-step LLM workflows. Each node in the graph represents a distinct processing step that can use its own AI provider, tools, and configuration.
The graph executes nodes in topological order, running independent nodes at the same level in parallel. Results flow from upstream nodes to downstream nodes via NodeInput.UpstreamResults, and a thread-safe SharedState allows cross-node data sharing.
Graph[T] is generic over the final output type T: the last node (or a designated output node) produces the result, which is parsed into T using parse.ParseStringAs[T].
The main entry points are NewGraphBuilder to construct a graph, Graph.Execute to run it synchronously, and Graph.ExecuteStream to run it with real-time event streaming. Use NewInMemoryStateProvider for in-process workflows, or implement StateProvider for persistent or distributed execution.
Key features:
- Topological execution with automatic parallelism per level
- Per-node client and tool override (each node can use a different LLM provider)
- Conditional edges with EdgeCondition functions
- Configurable error strategy (fail-fast or continue-on-error)
- Graph-level and node-level timeouts
- Full observability integration (spans, counters, histograms)
- Pluggable state persistence via StateProvider interface
- Cost tracking aggregated across all nodes
- Streaming execution with multiplexed per-node events via GraphStream
Example (synchronous):
type FinalReport struct {
Summary string `json:"summary"`
Score int `json:"score"`
}
g, err := graph.NewGraphBuilder[FinalReport](defaultClient).
AddNode("analyze", analyzeExecutor).
AddNode("summarize", summarizeExecutor).
AddEdge("analyze", "summarize").
Build()
result, err := g.Execute(ctx, map[string]any{"input": "data"})
fmt.Println(result.Data.Summary)
Example (streaming):
stream, err := g.ExecuteStream(ctx, map[string]any{"input": "data"})
for event, err := range stream.Iter() {
if err != nil { log.Fatal(err) }
if event.Type == graph.GraphEventNodeContent {
fmt.Printf("[%s] %s", event.NodeID, event.Content)
}
}
TODO: Future enhancements:
- Cycle support with WithAllowCycles() and max iterations per node
- SubGraph / nesting (nested graphs as nodes)
- ForEach / dynamic spawn (runtime node creation)
- Automatic retry per node
- Additional StateProvider implementations (PostgreSQL, Redis, etc.)
Index ¶
- type EdgeCondition
- type EdgeOption
- type ErrorStrategy
- type Graph
- func (graph *Graph[T]) Execute(ctx context.Context, initialState map[string]any) (*overview.StructuredOverview[T], error)
- func (graph *Graph[T]) ExecuteStream(ctx context.Context, initialState map[string]any) (*GraphStream[T], error)
- func (graph *Graph[T]) Reset(ctx context.Context, initialState map[string]any) error
- type GraphBuilder
- type GraphEvent
- type GraphEventType
- type GraphStream
- type InMemoryStateProvider
- func (provider *InMemoryStateProvider) Get(_ context.Context, key string) (any, bool, error)
- func (provider *InMemoryStateProvider) GetAll(_ context.Context) (map[string]any, error)
- func (provider *InMemoryStateProvider) GetNodeResult(_ context.Context, nodeID string) (*NodeResult, error)
- func (provider *InMemoryStateProvider) GetNodeStatus(_ context.Context, nodeID string) (NodeStatus, error)
- func (provider *InMemoryStateProvider) Set(_ context.Context, key string, value any) error
- func (provider *InMemoryStateProvider) SetNodeResult(_ context.Context, nodeID string, result *NodeResult) error
- func (provider *InMemoryStateProvider) SetNodeStatus(_ context.Context, nodeID string, status NodeStatus) error
- type NodeExecutor
- type NodeExecutorFunc
- type NodeInput
- type NodeOption
- type NodeResult
- type NodeStatus
- type NodeStream
- type Option
- type StateProvider
- type StreamExecutor
- type StreamNodeExecutorFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EdgeCondition ¶
type EdgeCondition func(ctx context.Context, result *NodeResult, state StateProvider) bool
EdgeCondition is a function that determines whether an edge should be traversed during execution. It receives the execution context, the result of the source node, and the current shared state. If the condition returns false, the target node may be skipped (if all its incoming edges have false conditions).
The context carries cancellation signals and deadlines from the graph execution, and should be passed to any StateProvider calls within the condition.
A nil EdgeCondition means the edge is always traversed.
type EdgeOption ¶
type EdgeOption func(*edge)
EdgeOption is a functional option for configuring individual edge behavior. Edge options are applied via GraphBuilder.AddEdge.
func WithEdgeCondition ¶
func WithEdgeCondition(condition EdgeCondition) EdgeOption
WithEdgeCondition sets a condition function on an edge. The condition is evaluated after the source node completes, using its result and the current shared state. If the condition returns false, the edge is not traversed.
A node is skipped if ALL of its incoming edges have conditions that evaluate to false. If at least one incoming edge condition is true (or has no condition), the node executes.
Example:
builder.AddEdge("check", "premium_analysis",
graph.WithEdgeCondition(func(ctx context.Context, result *graph.NodeResult, state graph.StateProvider) bool {
score, _, _ := state.Get(ctx, "quality_score")
return score.(float64) > 0.8
}),
)
type ErrorStrategy ¶
type ErrorStrategy string
ErrorStrategy defines how the graph handles errors when nodes fail during parallel execution within the same level.
const ( // ErrorStrategyFailFast cancels all running nodes and stops graph execution // as soon as any node fails. This is the default strategy. ErrorStrategyFailFast ErrorStrategy = "fail_fast" // ErrorStrategyContinueOnError allows other nodes to continue executing // when one fails. Downstream nodes that depend on the failed node are // automatically skipped. ErrorStrategyContinueOnError ErrorStrategy = "continue_on_error" )
type Graph ¶
type Graph[T any] struct { // contains filtered or unexported fields }
Graph represents a validated, executable directed acyclic graph of LLM processing steps. It is generic over T, the type of the final output produced by the designated output node.
A Graph is created via GraphBuilder[T].Build(), which validates the graph structure (cycle detection, edge validation) and computes the topological ordering.
The Graph is safe for sequential use but not for concurrent Execute() calls on the same instance, because node statuses are mutated during execution. Create separate Graph instances for concurrent workflows.
func (*Graph[T]) Execute ¶
func (graph *Graph[T]) Execute(ctx context.Context, initialState map[string]any) (*overview.StructuredOverview[T], error)
Execute runs the graph by executing nodes in topological order, with nodes at the same level running in parallel (subject to maxConcurrency).
The execution proceeds as follows:
- Initialize state provider with initialState and set all nodes to NodePending
- Create an Overview in the context for cost tracking
- Start observability root span
- For each topological level, launch ready nodes as goroutines
- Collect results, handle errors per the configured strategy
- Parse the output node's result as type T
- Return StructuredOverview[T] with the parsed result and execution statistics
The initialState map is loaded into the StateProvider's shared state before execution begins. Nodes can read and write shared state during execution.
Execute is NOT safe for concurrent use on the same Graph instance. Create separate Graph instances for concurrent workflows.
func (*Graph[T]) ExecuteStream ¶ added in v0.4.0
func (graph *Graph[T]) ExecuteStream(ctx context.Context, initialState map[string]any) (*GraphStream[T], error)
ExecuteStream starts the graph execution with streaming output. Events from all nodes (including parallel nodes within the same level) are multiplexed onto a single stream, identified by NodeID.
The stream must be consumed to avoid resource leaks. Respects the graph's maxConcurrency setting — parallel node launches within a level are throttled by the same semaphore mechanism used by Execute().
ExecuteStream is NOT safe for concurrent use on the same Graph instance. Create separate Graph instances for concurrent workflows.
func (*Graph[T]) Reset ¶
Reset clears the graph's execution state, allowing it to be re-executed. All node statuses are reset to NodePending and node results are cleared. Shared state is preserved unless a new initialState is provided.
This is useful for re-running a graph with different initial state without rebuilding it.
type GraphBuilder ¶
type GraphBuilder[T any] struct { // contains filtered or unexported fields }
GraphBuilder constructs a validated Graph[T] using a fluent API. Nodes and edges are added incrementally, and Build() performs structural validation including cycle detection via Kahn's algorithm.
The builder enforces the following constraints:
- Node IDs must be unique
- Edge endpoints must reference existing nodes
- The graph must be acyclic (DAG)
- If specified, the output node must exist
Example:
graph, err := graph.NewGraphBuilder[FinalReport](defaultClient).
AddNode("fetch", fetchExecutor).
AddNode("analyze", analyzeExecutor).
AddNode("summarize", summarizeExecutor).
AddEdge("fetch", "analyze").
AddEdge("analyze", "summarize").
Build()
func NewGraphBuilder ¶
func NewGraphBuilder[T any](defaultClient *client.Client, opts ...Option) *GraphBuilder[T]
NewGraphBuilder creates a new GraphBuilder for constructing a Graph[T]. The defaultClient is the LLM client used by all nodes that do not have a node-specific client override.
Graph-level options (WithMaxConcurrency, WithExecutionTimeout, etc.) are applied here. Node and edge options are applied via AddNode and AddEdge.
Example:
builder := graph.NewGraphBuilder[MyOutput](defaultClient,
graph.WithMaxConcurrency(5),
graph.WithExecutionTimeout(10 * time.Minute),
)
func (*GraphBuilder[T]) AddEdge ¶
func (builder *GraphBuilder[T]) AddEdge(from, to string, opts ...EdgeOption) *GraphBuilder[T]
AddEdge creates a directed edge from one node to another, indicating that the source node must complete before the target node can execute.
Edge options (WithEdgeCondition) can make the edge conditional.
Returns the builder for method chaining. If either endpoint does not exist, a build error is recorded and reported at Build() time.
Example:
builder.AddEdge("fetch", "analyze")
builder.AddEdge("check", "premium_path",
graph.WithEdgeCondition(isPremiumUser),
)
func (*GraphBuilder[T]) AddNode ¶
func (builder *GraphBuilder[T]) AddNode(nodeID string, executor NodeExecutor, opts ...NodeOption) *GraphBuilder[T]
AddNode registers a processing node in the graph with the given unique ID and executor. Node options (WithNodeClient, WithNodeTools, WithNodeParams, WithNodeTimeout) can customize individual node behavior.
Returns the builder for method chaining. If a node with the same ID already exists, a build error is recorded and reported at Build() time.
Example:
builder.AddNode("analyze", analyzeExecutor,
graph.WithNodeClient(customClient),
graph.WithNodeTimeout(30 * time.Second),
)
func (*GraphBuilder[T]) Build ¶
func (builder *GraphBuilder[T]) Build() (*Graph[T], error)
Build validates the graph structure and produces an executable Graph[T]. It performs the following validations:
- No accumulated build errors from AddNode/AddEdge
- At least one node exists
- All edge endpoints reference existing nodes
- No duplicate edges
- The graph is acyclic (validated via Kahn's algorithm)
- If specified, the output node exists
On success, it computes the topological ordering and level assignment. On failure, it returns a descriptive error.
type GraphEvent ¶ added in v0.4.0
type GraphEvent struct {
// Type identifies what kind of event this is.
Type GraphEventType `json:"type"`
// Level is the topological execution level (0-based) that produced this event.
Level int `json:"level"`
// NodeID identifies which node produced this event.
// Empty for level-scoped events (LevelStart, LevelComplete, Done).
NodeID string `json:"node_id,omitempty"`
// Content carries a text delta for GraphEventNodeContent events.
Content string `json:"content,omitempty"`
// Reasoning carries a reasoning delta for GraphEventNodeReasoning events.
Reasoning string `json:"reasoning,omitempty"`
// ToolName is the name of the tool being called or returning a result.
// Populated for GraphEventNodeToolCall and GraphEventNodeToolResult.
ToolName string `json:"tool_name,omitempty"`
// ToolInput is the JSON-encoded arguments passed to the tool.
// Populated for GraphEventNodeToolCall only.
ToolInput string `json:"tool_input,omitempty"`
// ToolOutput is the string result returned by the tool.
// Populated for GraphEventNodeToolResult only.
ToolOutput string `json:"tool_output,omitempty"`
// NodeResult is the final result of a completed node.
// Populated only for GraphEventNodeComplete events.
NodeResult *NodeResult `json:"node_result,omitempty"`
// NodeIDs lists the node IDs at a level.
// Populated only for GraphEventLevelStart events.
NodeIDs []string `json:"node_ids,omitempty"`
// Error contains the error description for GraphEventNodeError events.
Error string `json:"error,omitempty"`
}
GraphEvent represents a single event from the graph execution pipeline. Each event carries exactly one type of payload, identified by the Type field. The Level and NodeID fields provide context about which part of the DAG produced the event.
type GraphEventType ¶ added in v0.4.0
type GraphEventType string
GraphEventType identifies what happened during graph execution. Each event in the stream carries exactly one type.
const ( // GraphEventLevelStart signals that a new execution level has begun. // The Level field contains the level number (0-based), and NodeIDs lists // the node IDs about to execute at this level. GraphEventLevelStart GraphEventType = "level_start" // GraphEventNodeStart signals that a specific node has begun executing. // The NodeID field identifies the node. GraphEventNodeStart GraphEventType = "node_start" // GraphEventNodeContent carries a content delta from a node's LLM call. // The Content field contains the text fragment. GraphEventNodeContent GraphEventType = "node_content" // GraphEventNodeReasoning carries a reasoning delta from a node's LLM call. // The Reasoning field contains the reasoning fragment. GraphEventNodeReasoning GraphEventType = "node_reasoning" // GraphEventNodeToolCall indicates a node's LLM decided to call a tool. // The ToolName and ToolInput fields are populated. GraphEventNodeToolCall GraphEventType = "node_tool_call" // GraphEventNodeToolResult indicates a tool execution completed for a node. // The ToolName and ToolOutput fields are populated. GraphEventNodeToolResult GraphEventType = "node_tool_result" // GraphEventNodeComplete signals that a node has finished executing. // The NodeResult field contains the node's final result. GraphEventNodeComplete GraphEventType = "node_complete" // GraphEventNodeError signals that a node encountered an error. // The Error field contains the error description. GraphEventNodeError GraphEventType = "node_error" // GraphEventLevelComplete signals that all nodes in a level have finished. // The Level field contains the level number. GraphEventLevelComplete GraphEventType = "level_complete" // GraphEventDone signals that the entire graph has completed execution. GraphEventDone GraphEventType = "done" )
type GraphStream ¶ added in v0.4.0
type GraphStream[T any] struct { // contains filtered or unexported fields }
GraphStream wraps the streaming graph execution pipeline. Events from parallel nodes are multiplexed onto a single stream, identified by their NodeID field. The type parameter T matches the Graph[T] output type, ensuring that Collect() returns the same type-safe result as Execute().
The stream must be consumed either via Iter() or Collect() to avoid resource leaks. Breaking out of an Iter() range loop early is safe — the underlying iterator will be abandoned correctly by Go's range-over-func mechanism.
func (*GraphStream[T]) Collect ¶ added in v0.4.0
func (stream *GraphStream[T]) Collect() (*overview.StructuredOverview[T], error)
Collect consumes the entire stream and returns the final execution result as a StructuredOverview[T], equivalent to what Execute() returns. Any mid-stream error terminates collection and returns that error.
Use this when you want streaming transport (lower time-to-first-byte) but do not need to process intermediate events — for example, to benefit from streaming backpressure semantics while still receiving a single final result.
func (*GraphStream[T]) Iter ¶ added in v0.4.0
func (stream *GraphStream[T]) Iter() iter.Seq2[GraphEvent, error]
Iter returns the underlying iterator for range-over-func consumption.
Example:
stream, _ := pipeline.ExecuteStream(ctx, initialState)
for event, err := range stream.Iter() {
if err != nil { log.Fatal(err) }
switch event.Type {
case graph.GraphEventNodeContent:
fmt.Printf("[%s] %s", event.NodeID, event.Content)
case graph.GraphEventNodeComplete:
fmt.Printf("\nNode %s complete\n", event.NodeID)
case graph.GraphEventDone:
fmt.Println("Pipeline finished!")
}
}
type InMemoryStateProvider ¶
type InMemoryStateProvider struct {
// contains filtered or unexported fields
}
InMemoryStateProvider is the default StateProvider implementation. It stores all state in memory using sync.RWMutex for thread safety. State is lost when the process exits.
This provider is suitable for single-process, non-persistent workflows. For persistent or distributed workflows, implement a custom StateProvider backed by a database or distributed cache.
Example:
provider := graph.NewInMemoryStateProvider(map[string]any{
"profile_id": "user-123",
"language": "en",
})
func NewInMemoryStateProvider ¶
func NewInMemoryStateProvider(initial map[string]any) *InMemoryStateProvider
NewInMemoryStateProvider creates a new in-memory state provider with optional initial shared state. If initial is nil, an empty state is created.
func (*InMemoryStateProvider) Get ¶
Get retrieves a value from the shared state by key. Returns the value, true if found, and nil error (in-memory never fails).
func (*InMemoryStateProvider) GetAll ¶
GetAll retrieves the entire shared state as a copy of the internal map. The returned map is safe to modify without affecting the provider's state.
func (*InMemoryStateProvider) GetNodeResult ¶
func (provider *InMemoryStateProvider) GetNodeResult(_ context.Context, nodeID string) (*NodeResult, error)
GetNodeResult retrieves the execution result of a node. Returns nil if no result has been stored for this node.
func (*InMemoryStateProvider) GetNodeStatus ¶
func (provider *InMemoryStateProvider) GetNodeStatus(_ context.Context, nodeID string) (NodeStatus, error)
GetNodeStatus retrieves the execution status of a node. Returns NodePending if the node has not been registered.
func (*InMemoryStateProvider) Set ¶
Set writes a value to the shared state under the given key. Always returns nil error (in-memory never fails).
func (*InMemoryStateProvider) SetNodeResult ¶
func (provider *InMemoryStateProvider) SetNodeResult(_ context.Context, nodeID string, result *NodeResult) error
SetNodeResult stores the execution result of a node.
func (*InMemoryStateProvider) SetNodeStatus ¶
func (provider *InMemoryStateProvider) SetNodeStatus(_ context.Context, nodeID string, status NodeStatus) error
SetNodeStatus updates the execution status of a node.
type NodeExecutor ¶
type NodeExecutor interface {
Execute(ctx context.Context, input *NodeInput) (*NodeResult, error)
}
NodeExecutor is the interface that every graph node must implement. It defines the processing logic for a single step in the workflow.
Implementations should:
- Use input.Client for LLM interactions
- Read upstream results from input.UpstreamResults
- Use input.SharedState for cross-node data sharing
- Return a NodeResult with the Output field populated on success
- Return an error if the execution fails
Example:
type AnalyzeExecutor struct{}
func (e *AnalyzeExecutor) Execute(ctx context.Context, input *NodeInput) (*NodeResult, error) {
response, err := input.Client.SendMessage(ctx, "Analyze this data")
if err != nil {
return nil, fmt.Errorf("failed to analyze: %w", err)
}
return &NodeResult{Output: response.Content}, nil
}
type NodeExecutorFunc ¶
type NodeExecutorFunc func(ctx context.Context, input *NodeInput) (*NodeResult, error)
NodeExecutorFunc is an adapter that allows using an ordinary function as a NodeExecutor. If f is a function with the appropriate signature, NodeExecutorFunc(f) is a NodeExecutor that calls f.
func (NodeExecutorFunc) Execute ¶
func (executorFunc NodeExecutorFunc) Execute(ctx context.Context, input *NodeInput) (*NodeResult, error)
Execute calls the underlying function, satisfying the NodeExecutor interface.
type NodeInput ¶
type NodeInput struct {
// UpstreamResults maps each upstream node ID to its execution result.
// Only completed upstream nodes appear in this map.
UpstreamResults map[string]*NodeResult
SharedState StateProvider
// Params contains node-specific parameters set at construction time
// via WithNodeParams.
Params map[string]any
// Client is the LLM client for this node. It is either the node-specific
// client set via WithNodeClient, or the graph's default client.
Client *client.Client
}
NodeInput contains all the data available to a node during execution. It provides access to upstream results, shared state, node-specific parameters, and the LLM client configured for this node.
type NodeOption ¶
type NodeOption func(*node)
NodeOption is a functional option for configuring individual node behavior. Node options are applied via GraphBuilder.AddNode.
func WithNodeClient ¶
func WithNodeClient(nodeClient *client.Client) NodeOption
WithNodeClient sets a node-specific LLM client that overrides the graph's default client. Use this when a node needs a different AI provider, model, system prompt, or tool configuration.
Example:
analysisClient, _ := client.New(geminiProvider,
client.WithSystemPrompt("You are a data analyst."),
client.WithTools(chartTool),
)
builder.AddNode("analyze", analyzeExecutor,
graph.WithNodeClient(analysisClient),
)
func WithNodeParams ¶
func WithNodeParams(params map[string]any) NodeOption
WithNodeParams sets key-value parameters that are passed to the node during execution via NodeInput.Params. Use this to configure node-specific behavior without modifying the executor implementation.
Example:
builder.AddNode("analyze_person", analyzeExecutor,
graph.WithNodeParams(map[string]any{
"entity_type": "person",
"entity_id": "user-123",
}),
)
func WithNodeTimeout ¶
func WithNodeTimeout(timeout time.Duration) NodeOption
WithNodeTimeout sets the maximum duration for this node's execution. If the timeout is exceeded, the node's context is canceled and the node fails with a context deadline exceeded error.
A value of 0 (default) means no node-specific timeout. The graph-level execution timeout (WithExecutionTimeout) still applies.
Example:
builder.AddNode("slow_analysis", analysisExecutor,
graph.WithNodeTimeout(30 * time.Second),
)
func WithNodeTools ¶
func WithNodeTools(tools ...tool.GenericTool) NodeOption
WithNodeTools registers additional tools on the node for use during execution. The tools are stored on the node and supplement those already registered on the node's LLM client (whether the graph default or a per-node override set via WithNodeClient).
TODO: clarify — nodeTools are stored on the node struct but NodeInput currently exposes no Tools field; executor implementations must retrieve them via a node-level mechanism or custom client configuration.
Example:
builder.AddNode("search", searchExecutor,
graph.WithNodeTools(webSearchTool, calculatorTool),
)
type NodeResult ¶
type NodeResult struct {
// Output is the data produced by the node. It can be any type, but must be
// JSON-serializable when using external state persistence.
Output any
// Error records the execution error, if the node failed.
Error error
// Duration is the wall-clock time the node took to execute.
Duration time.Duration
// Metadata contains arbitrary key-value pairs for additional information
// such as token counts, model used, cost breakdown, etc.
Metadata map[string]any
}
NodeResult contains the output produced by a node after execution. The Output field must be JSON-serializable when using an external StateProvider for persistence.
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the lifecycle status of a node during graph execution.
const ( // NodePending indicates the node has not started execution yet. NodePending NodeStatus = "pending" // NodeRunning indicates the node is currently executing. NodeRunning NodeStatus = "running" // NodeCompleted indicates the node has finished execution successfully. NodeCompleted NodeStatus = "completed" // NodeFailed indicates the node encountered an error during execution. NodeFailed NodeStatus = "failed" // NodeSkipped indicates the node was skipped because a dependency failed // or an edge condition evaluated to false. NodeSkipped NodeStatus = "skipped" )
type NodeStream ¶ added in v0.4.0
type NodeStream struct {
// contains filtered or unexported fields
}
NodeStream represents the streaming output of a single node's execution. It wraps an iterator that yields GraphEvent values for content deltas, reasoning, tool calls, and tool results produced by the node.
The final NodeResult must be returned separately by the StreamExecutor as the accumulated output of the stream. NodeStream events should NOT include NodeStart, NodeComplete, or NodeError — those are managed by the graph executor.
func NewNodeStream ¶ added in v0.4.0
func NewNodeStream(iterator iter.Seq2[GraphEvent, error], finalResult *NodeResult) *NodeStream
NewNodeStream creates a NodeStream from a raw streaming iterator and the final result that will be available after the stream is consumed. The finalResult pointer may be nil at creation time if the result is accumulated during streaming; in that case, set it via SetFinalResult before the stream ends.
func (*NodeStream) FinalResult ¶ added in v0.4.0
func (stream *NodeStream) FinalResult() *NodeResult
FinalResult returns the accumulated result of the node after the stream has been fully consumed. Returns nil if the stream has not been consumed or if no result was set.
func (*NodeStream) Iter ¶ added in v0.4.0
func (stream *NodeStream) Iter() iter.Seq2[GraphEvent, error]
Iter returns the underlying iterator for range-over-func consumption.
func (*NodeStream) SetFinalResult ¶ added in v0.4.0
func (stream *NodeStream) SetFinalResult(result *NodeResult)
SetFinalResult sets the accumulated result of the node. This is intended for StreamExecutor implementations that build up the result incrementally during streaming and need to set it after the iterator completes.
type Option ¶
type Option func(*graphConfig)
Option is a functional option for configuring Graph behavior. Options are applied during GraphBuilder construction via NewGraphBuilder.
func WithErrorStrategy ¶
func WithErrorStrategy(strategy ErrorStrategy) Option
WithErrorStrategy sets the error handling strategy for the graph. The default is ErrorStrategyFailFast, which cancels all running nodes and stops execution as soon as any node fails.
Use ErrorStrategyContinueOnError to allow other nodes to finish even when one fails. Downstream nodes that depend on the failed node are automatically skipped.
Example:
graph.NewGraphBuilder[Result](defaultClient,
graph.WithErrorStrategy(graph.ErrorStrategyContinueOnError),
)
func WithExecutionTimeout ¶
WithExecutionTimeout sets the maximum duration for the entire graph execution. If the timeout is exceeded, the context is canceled and all running nodes receive a cancellation signal. A value of 0 (default) means no timeout.
Example:
graph.NewGraphBuilder[Result](defaultClient,
graph.WithExecutionTimeout(5 * time.Minute),
)
func WithMaxConcurrency ¶
WithMaxConcurrency limits the number of nodes that can execute in parallel within the same topological level. A value of 0 (default) means unlimited concurrency — all ready nodes at a level execute simultaneously.
Use this to control resource consumption when nodes are resource-intensive (e.g., each node makes expensive API calls).
Example:
graph.NewGraphBuilder[Result](defaultClient,
graph.WithMaxConcurrency(3), // at most 3 nodes running at once
)
func WithOutputNode ¶
WithOutputNode designates which node produces the final typed output T. By default, the last node in topological order is used as the output node.
Use this when the graph has multiple terminal nodes (nodes with no outgoing edges) and you want to control which one provides the final result.
Example:
builder.AddNode("summary", summaryExecutor)
builder.AddNode("metrics", metricsExecutor) // side-effect node
// ... edges ...
graph.NewGraphBuilder[Summary](defaultClient,
graph.WithOutputNode("summary"),
)
func WithStateProvider ¶
func WithStateProvider(provider StateProvider) Option
WithStateProvider sets a custom StateProvider for graph state persistence. By default, an InMemoryStateProvider is used.
Implement a custom StateProvider to persist state to a database, Redis, or other external storage. This enables resuming partially completed graphs after process restarts and distributed execution.
Example:
graph.NewGraphBuilder[Result](defaultClient,
graph.WithStateProvider(myPostgresStateProvider),
)
func WithStreamBufferSize ¶ added in v0.4.0
WithStreamBufferSize sets the internal channel buffer size for streaming events in ExecuteStream. A larger buffer reduces goroutine blocking when the consumer is slower than the producers, at the cost of higher memory usage.
A value of 0 (default) uses the default buffer size of 64.
Example:
graph.NewGraphBuilder[Result](defaultClient,
graph.WithStreamBufferSize(128), // larger buffer for many parallel nodes
)
type StateProvider ¶
type StateProvider interface {
// Get retrieves a value from the shared state by key.
// Returns the value, a boolean indicating whether the key exists, and any error.
Get(ctx context.Context, key string) (any, bool, error)
// Set writes a value to the shared state under the given key.
// Overwrites any existing value for the same key.
Set(ctx context.Context, key string, value any) error
// GetAll retrieves the entire shared state as a map.
// Returns a copy of the internal state to prevent external mutations.
GetAll(ctx context.Context) (map[string]any, error)
// GetNodeStatus retrieves the execution status of a node by its ID.
// Returns NodePending if the node has not been registered yet.
GetNodeStatus(ctx context.Context, nodeID string) (NodeStatus, error)
// SetNodeStatus updates the execution status of a node.
// Valid transitions: pending -> running -> completed|failed, pending -> skipped.
SetNodeStatus(ctx context.Context, nodeID string, status NodeStatus) error
// GetNodeResult retrieves the execution result of a node.
// Returns nil if the node has not completed yet or if no result was stored.
GetNodeResult(ctx context.Context, nodeID string) (*NodeResult, error)
// SetNodeResult stores the execution result of a node.
// The result is typically set when a node transitions to completed or failed.
SetNodeResult(ctx context.Context, nodeID string, result *NodeResult) error
}
StateProvider defines the interface for graph state persistence. It manages both shared state (arbitrary key-value data accessible to all nodes) and graph execution state (node statuses and results).
The default implementation is InMemoryStateProvider, which stores everything in memory using sync.RWMutex for thread safety. State is lost when the process exits.
Users can implement this interface to persist state to databases (PostgreSQL, Redis), file systems, or any other storage backend. This enables:
- Resuming partially completed graphs after process crashes
- Distributed execution across multiple processes
- Audit trails and debugging of state transitions
- Long-running workflows that survive process restarts
All methods accept a context for cancellation and timeout support. Implementations must be safe for concurrent use by multiple goroutines, as nodes at the same level execute in parallel.
The NodeResult.Output field must be JSON-serializable for implementations that persist to external storage. The InMemoryStateProvider does not impose this restriction.
type StreamExecutor ¶ added in v0.4.0
type StreamExecutor interface {
ExecuteStream(ctx context.Context, input *NodeInput) (*NodeStream, error)
}
StreamExecutor is an optional interface for nodes that support streaming output. If a node's executor implements StreamExecutor, ExecuteStream will use it to obtain token-by-token events. Nodes that only implement NodeExecutor have their full result delivered as a single GraphEventNodeComplete event (no content deltas).
The returned NodeStream must yield events with Type restricted to: GraphEventNodeContent, GraphEventNodeReasoning, GraphEventNodeToolCall, and GraphEventNodeToolResult. The graph executor handles NodeStart, NodeComplete, and NodeError events automatically.
Example:
type StreamingAnalyzer struct{}
func (a *StreamingAnalyzer) Execute(ctx context.Context, input *NodeInput) (*NodeResult, error) {
// Non-streaming fallback
response, err := input.Client.SendMessage(ctx, "Analyze this")
if err != nil { return nil, err }
return &NodeResult{Output: response.Content}, nil
}
func (a *StreamingAnalyzer) ExecuteStream(ctx context.Context, input *NodeInput) (*NodeStream, error) {
// Streaming implementation
return NewNodeStream(func(yield func(GraphEvent, error) bool) {
yield(GraphEvent{Type: GraphEventNodeContent, Content: "streaming..."}, nil)
}), nil
}
type StreamNodeExecutorFunc ¶ added in v0.4.0
type StreamNodeExecutorFunc func(ctx context.Context, input *NodeInput) (*NodeStream, error)
StreamNodeExecutorFunc is an adapter that allows using an ordinary function as a StreamExecutor — mirroring NodeExecutorFunc for the non-streaming case.
func (StreamNodeExecutorFunc) ExecuteStream ¶ added in v0.4.0
func (executorFunc StreamNodeExecutorFunc) ExecuteStream(ctx context.Context, input *NodeInput) (*NodeStream, error)
ExecuteStream calls the underlying function, satisfying the StreamExecutor interface.