Documentation
¶
Overview ¶
- **WorkflowHandler**: Multi-step workflow execution and orchestration
Core Operations ¶
The API layer provides unified access to all muster functionality:
## Service Management
- Service registry operations (register, get, list services)
- Service lifecycle management (start, stop, restart)
- Service state monitoring and health checking
- Dynamic service instance creation and management
- Both static services (from configuration) and ServiceClass-based instances
## ServiceClass Operations
- List available ServiceClasses with real-time availability status
- Get ServiceClass definitions, args, and lifecycle tool configuration
- Check tool availability for ServiceClasses and validate configurations
- Create and manage ServiceClass-based service instances with arg validation
- ServiceClass instance lifecycle, health monitoring, and event streaming
## MCP Server Management
- MCP server definition management (create, update, delete, validate)
- MCP server lifecycle management and health monitoring
- Tool aggregation, namespace management, and conflict resolution
- Dynamic MCP server registration and tool discovery
## Service Class System
- User-defined service class definition management with operation validation
- Service class operation execution with arg validation
- Dynamic service class availability checking based on underlying tools
- Integration with tool provider system for extensible operations
- Service class namespace management and conflict resolution
## Workflow Management
- Workflow definition management (create, update, delete, validate)
- Multi-step workflow execution with arg templating
- Conditional logic and step dependency management
- Tool integration for workflow steps with error handling
- Workflow input schema validation and documentation generation
## Tool Provider System
- Tool provider registration and discovery
- Tool metadata management and arg validation
- Tool execution abstraction for different implementation types
- Automatic tool aggregation and namespace management
## Request/Response Handling
- Structured request types for all operations (Create, Update, Validate patterns)
- Type-safe arg parsing and validation using ParseRequest
- Comprehensive error handling with contextual information
- Response mapping for ServiceClass tool integration
Tool Update Events ¶
The API layer provides a centralized event system for tool availability changes:
// Subscribe to tool updates
api.SubscribeToToolUpdates(mySubscriber)
// Publish tool update events
event := api.ToolUpdateEvent{
Type: "server_registered",
ServerName: "kubernetes",
Tools: []string{"kubectl_get_pods", "kubectl_describe"},
Timestamp: time.Now(),
}
api.PublishToolUpdateEvent(event)
Event types include:
- "server_registered": New MCP server registration
- "server_deregistered": MCP server removal
- "tools_updated": Tool availability changes
This enables managers to automatically refresh availability when the tool landscape changes, supporting real-time reactivity throughout the system.
API Registration Pattern ¶
**Critical**: All packages must follow the registration pattern:
**Implement Handler Interface** in adapter pattern: ```go type ServiceAdapter struct { service *MyService }
func (a *ServiceAdapter) SomeOperation(ctx context.Context) error { return a.service.performOperation(ctx) } ```
**Register with API Layer**: ```go func (a *ServiceAdapter) Register() { api.RegisterMyServiceHandler(a) } ```
**Access through API Layer** (never direct imports): ```go handler := api.GetMyServiceHandler() if handler != nil { handler.SomeOperation(ctx) } ```
Example Usage ¶
## Service Registration (Service Package)
// Service adapter implements handler interface
type RegistryAdapter struct {
registry *ServiceRegistry
}
func (r *RegistryAdapter) GetService(ctx context.Context, name string) (*ServiceInfo, error) {
return r.registry.Get(name)
}
func (r *RegistryAdapter) Register() {
api.RegisterServiceRegistry(r)
}
## ServiceClass Registration
type ServiceClassAdapter struct {
manager *ServiceClassManager
}
func (s *ServiceClassAdapter) ListServiceClasses(ctx context.Context) ([]*ServiceClassInfo, error) {
return s.manager.ListServiceClasses(ctx)
}
func (s *ServiceClassAdapter) Register() {
api.RegisterServiceClassManager(s)
}
## Tool Provider Registration
type MyToolProvider struct {
tools []ToolMetadata
}
func (p *MyToolProvider) GetTools() []ToolMetadata {
return p.tools
}
func (p *MyToolProvider) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*CallToolResult, error) {
// Implementation specific logic
return &CallToolResult{Content: []interface{}{result}, IsError: false}, nil
}
## API Usage (Consumer Package)
// Access through API layer (correct approach)
serviceManager := api.GetServiceManager()
if serviceManager != nil {
err := serviceManager.StartService(ctx, "my-service")
}
serviceClassMgr := api.GetServiceClassManager()
if serviceClassMgr != nil {
classes, err := serviceClassMgr.ListServiceClasses(ctx)
}
// Execute workflows through convenience functions
result, err := api.ExecuteWorkflow(ctx, "deploy-app", args)
## Request Validation and Parsing
// Parse and validate request args
var req ServiceClassCreateRequest
if err := api.ParseRequest(args, &req); err != nil {
return fmt.Errorf("invalid request: %w", err)
}
// Validate without creating
var validateReq ServiceClassValidateRequest
api.ParseRequest(args, &validateReq)
// Perform validation logic...
Workflow Integration ¶
The API provides convenient functions for workflow operations:
// Get workflow information and input schemas workflows := api.GetWorkflowInfo()
Health Check System ¶
The API defines standardized health checking for all components:
// Health status constants
const (
HealthUnknown = "unknown" // Status cannot be determined
HealthHealthy = "healthy" // Operating normally
HealthDegraded = "degraded" // Some issues but functional
HealthUnhealthy = "unhealthy" // Significant issues
HealthChecking = "checking" // Health check in progress
)
// Health check configuration
config := HealthCheckConfig{
Enabled: true,
Interval: 30 * time.Second,
FailureThreshold: 3,
SuccessThreshold: 1,
}
Orchestrator Integration ¶
The API provides orchestrator functionality for unified service operations:
// Create services through orchestrator
orchestrator := api.GetOrchestrator()
if orchestrator != nil {
instance, err := orchestrator.CreateServiceClassInstance(ctx, req)
}
// List all services with unified status
services, err := orchestrator.ListServices(ctx)
Thread Safety ¶
All API components are fully thread-safe:
- Handler registry uses mutex protection for registration/access
- Concurrent handler registration and access operations
- Thread-safe API method execution across all handlers
- Safe concurrent service operations and state management
- Tool update event broadcasting with goroutine safety
- Request parsing and validation thread safety
Error Handling ¶
The API layer provides structured error handling:
- Handler availability checking with nil safety
- Service and ServiceClass validation with detailed error messages
- Tool execution error propagation with context preservation
- Workflow execution error handling
- Request parsing validation with field-level error reporting
- Comprehensive error context and recovery mechanisms
Performance Characteristics ¶
The Service Locator Pattern provides:
- **Minimal overhead**: Thin delegation layer to handlers
- **Lazy initialization**: Handlers registered as needed during startup
- **Concurrent access**: No bottlenecks in handler access patterns
- **Memory efficiency**: Single handler instances shared across requests
- **Event efficiency**: Asynchronous tool update broadcasting
- **Request efficiency**: JSON-based parsing with minimal allocations
Testing Support ¶
The API package provides testing utilities:
- **Mock handler registration**: SetServiceClassManagerForTesting and similar
- **Handler isolation**: Independent handler registration per test
- **Event testing**: Tool update event validation and mocking
- **Request validation testing**: ParseRequest error scenario testing
Design Principles ¶
1. **Single Point of Truth**: All inter-package communication through API 2. **No Direct Dependencies**: Packages never import each other directly 3. **Interface Segregation**: Small, focused handler interfaces 4. **Dependency Inversion**: Depend on abstractions, not implementations 5. **Open/Closed Principle**: Easy to extend with new handlers 6. **Event-Driven Architecture**: Reactive updates through tool events 7. **Type Safety**: Strong typing through request/response structures 8. **Validation by Default**: All requests validated before processing
**Critical Rule**: ALL inter-package communication MUST go through this API layer. Direct imports between internal packages are **forbidden** and violate the core architectural principle.
Index ¶
- Constants
- Variables
- func CollectRequiredAudiences() []string
- func GetClientSessionIDFromContext(ctx context.Context) (string, bool)
- func IsActiveState(state ServiceState) bool
- func ParseRequest[T any](args map[string]interface{}, request *T) error
- func PublishToolUpdateEvent(event ToolUpdateEvent)
- func RegisterAggregator(h AggregatorHandler)
- func RegisterAuthHandler(h AuthHandler)
- func RegisterConfig(h ConfigHandler)deprecated
- func RegisterConfigHandler(h ConfigHandler)
- func RegisterEventManager(h EventManagerHandler)
- func RegisterMCPServerManager(h MCPServerManagerHandler)
- func RegisterMetaTools(h MetaToolsHandler)
- func RegisterMetaToolsDataProvider(p MetaToolsDataProvider)
- func RegisterOAuthHandler(h OAuthHandler)
- func RegisterReconcileManager(h ReconcileManagerHandler)
- func RegisterSecretCredentialsHandler(h SecretCredentialsHandler)
- func RegisterServiceClassManager(h ServiceClassManagerHandler)
- func RegisterServiceManager(h ServiceManagerHandler)
- func RegisterServiceRegistry(h ServiceRegistryHandler)
- func RegisterSessionInitCallback(cb SessionInitCallback)
- func RegisterTeleportClient(h TeleportClientHandler)
- func RegisterWorkflow(h WorkflowHandler)
- func SubscribeToToolUpdates(subscriber ToolUpdateSubscriber)
- func UpdateMCPServerState(name string, state ServiceState, health HealthStatus, err error) error
- func WithClientSessionID(ctx context.Context, sessionID string) context.Context
- type AggregatorHandler
- type Arg
- type ArgDefinition
- type ArgMetadata
- type AuthChallenge
- type AuthCompletionCallback
- type AuthHandler
- type AuthInfo
- type AuthStatus
- type CallToolResult
- type ClientCredentials
- type ClientCredentialsSecretRef
- type ClientSessionIDContextKey
- type ConfigAPI
- type ConfigHandler
- type ConfigurableService
- type CreateServiceInstanceRequest
- type CreateWorkflowRequest
- type EventManagerHandler
- type EventQueryOptions
- type EventQueryResult
- type EventResult
- type GetWorkflowExecutionRequest
- type HealthCheckConfig
- type HealthCheckExpectation
- type HealthCheckToolCall
- type HealthStatus
- type LifecycleTools
- type ListWorkflowExecutionsRequest
- type ListWorkflowExecutionsResponse
- type MCPServer
- type MCPServerAuth
- type MCPServerCreateRequest
- type MCPServerInfo
- type MCPServerManagerHandler
- type MCPServerType
- type MCPServerUpdateRequest
- type MCPServerValidateRequest
- type MetaToolsDataProvider
- type MetaToolsHandler
- type NotFoundError
- type OAuthHandler
- type OAuthToken
- type ObjectReference
- type OperationDefinition
- type OrchestratorAPI
- type ReconcileManagerHandler
- type ReconcileOverview
- type ReconcileStatusInfo
- type ReconcileStatusSummary
- type SchemaProperty
- type SecretCredentialsHandler
- type ServerAuthInfo
- type ServiceClass
- type ServiceClassCreateRequest
- type ServiceClassManagerHandler
- type ServiceClassUpdateRequest
- type ServiceClassValidateRequest
- type ServiceConfig
- type ServiceInfo
- type ServiceInstance
- type ServiceInstanceEvent
- type ServiceListResponse
- type ServiceManagerHandler
- type ServiceRegistryHandler
- type ServiceState
- type ServiceStateChangedEvent
- type ServiceStatus
- type ServiceType
- type ServiceValidateRequest
- type SessionInitCallback
- type StateUpdater
- type TeleportAuth
- type TeleportClientConfig
- type TeleportClientHandler
- type TimeoutConfig
- type TokenExchangeConfig
- type ToolCall
- type ToolCaller
- func (atc *ToolCaller) CallTool(ctx context.Context, toolName string, args map[string]interface{}) (map[string]interface{}, error)
- func (atc *ToolCaller) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
- func (atc *ToolCaller) GetAvailableTools() []string
- func (atc *ToolCaller) IsToolAvailable(toolName string) bool
- type ToolChecker
- type ToolMetadata
- type ToolProvider
- type ToolUpdateEvent
- type ToolUpdateSubscriber
- type Workflow
- type WorkflowCondition
- type WorkflowConditionExpectation
- type WorkflowCreateRequest
- type WorkflowExecution
- type WorkflowExecutionStatus
- type WorkflowExecutionStep
- type WorkflowExecutionSummary
- type WorkflowHandler
- type WorkflowInputSchema
- type WorkflowStep
- type WorkflowUpdateRequest
- type WorkflowValidateRequest
Constants ¶
const ( // AuthMsgAlreadyConnected indicates the user already has an active session connection // to the specific server they're trying to authenticate to. AuthMsgAlreadyConnected = "Already Connected" // AuthMsgSuccessfullyConnected indicates a successful connection was established, // either through SSO (Token Forwarding/Exchange) or direct authentication. AuthMsgSuccessfullyConnected = "Successfully connected" // AuthMsgAlreadyAuthenticated is an alternative marker for existing authentication. AuthMsgAlreadyAuthenticated = "already authenticated" )
Auth tool response message markers. These constants define the standard message prefixes/markers used in auth tool responses. They enable consistent detection of connection status across CLI and aggregator components.
IMPORTANT: When modifying these values, ensure both the message generation (auth_tools.go) and message detection (auth_helpers.go) code remain synchronized.
const AuthTypeTeleport = "teleport"
AuthTypeTeleport is the auth type value for Teleport authentication.
const ClientSessionIDHeader = "X-Muster-Session-ID"
ClientSessionIDHeader is the HTTP header name for client-provided session IDs. This enables CLI tools to maintain persistent session identity across invocations.
When present, this header takes precedence over the random session ID generated by mcp-go for token storage. This is critical for CLI tools where each invocation creates a new connection - without it, MCP server tokens would be lost between CLI invocations because the mcp-go session ID changes on each connection.
Security: The client-provided session ID is trusted because:
- It's sent by the authenticated CLI client (aggregator auth validates the user)
- Token lookup still requires matching (sessionID, issuer, scope)
- A malicious client can only access tokens it previously stored with that session ID
Variables ¶
var ( // NewWorkflowNotFoundError creates a workflow not found error. // // Args: // - name: The name of the workflow that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified workflow NewWorkflowNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("workflow", name) } // NewServiceClassNotFoundError creates a service class not found error. // // Args: // - name: The name of the service class that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified service class NewServiceClassNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("service class", name) } // NewServiceNotFoundError creates a service not found error. // // Args: // - name: The name of the service that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified service NewServiceNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("service", name) } // NewMCPServerNotFoundError creates an MCP server not found error. // // Args: // - name: The name of the MCP server that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified MCP server NewMCPServerNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("MCP server", name) } // NewToolNotFoundError creates a tool not found error. // // Args: // - name: The name of the tool that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified tool NewToolNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("tool", name) } // NewResourceNotFoundError creates a resource not found error. // // Args: // - name: The name of the resource that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified resource NewResourceNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("resource", name) } // NewPromptNotFoundError creates a prompt not found error. // // Args: // - name: The name of the prompt that was not found // // Returns: // - *NotFoundError: A NotFoundError for the specified prompt NewPromptNotFoundError = func(name string) *NotFoundError { return NewNotFoundError("prompt", name) } )
Specific NotFoundError constructors for each resource type. These provide convenient, type-specific error creation with consistent naming.
var ( // ErrOrchestratorNotRegistered indicates the orchestrator handler is not registered ErrOrchestratorNotRegistered = errors.New("orchestrator handler not registered") // ErrMCPServiceNotRegistered indicates the MCP service handler is not registered ErrMCPServiceNotRegistered = errors.New("MCP service handler not registered") // ErrPortForwardNotRegistered indicates the port forward handler is not registered ErrPortForwardNotRegistered = errors.New("port forward handler not registered") // ErrK8sServiceNotRegistered indicates the Kubernetes service handler is not registered ErrK8sServiceNotRegistered = errors.New("K8s service handler not registered") // ErrConfigServiceNotRegistered indicates the config service handler is not registered ErrConfigServiceNotRegistered = errors.New("config service handler not registered") // ErrWorkflowNotRegistered indicates the workflow handler is not registered ErrWorkflowNotRegistered = errors.New("workflow handler not registered") // ErrAggregatorNotRegistered indicates the aggregator handler is not registered ErrAggregatorNotRegistered = errors.New("aggregator handler not registered") // Legacy workflow error (deprecated - use NewWorkflowNotFoundError instead) // This error is maintained for backward compatibility but should not be used in new code. // // Deprecated: Use NewWorkflowNotFoundError(name) instead for better error context. ErrWorkflowNotFound = errors.New("workflow not found") )
Common errors for API operations. These predefined errors provide consistent error reporting for common failure scenarios related to handler registration in the Service Locator Pattern.
Functions ¶
func CollectRequiredAudiences ¶
func CollectRequiredAudiences() []string
CollectRequiredAudiences collects all unique required audiences from MCPServers that have forwardToken: true configured. This is used to determine which cross-client audiences to request from Dex during OAuth authentication.
When users authenticate to muster, the OAuth flow requests tokens with these audiences, allowing the tokens to be forwarded to downstream MCPServers that require specific audience claims (e.g., Kubernetes OIDC authentication).
Returns:
- []string: Unique list of required audiences from all SSO-enabled MCPServers
If no MCPServer manager is registered, returns an empty slice.
Thread-safe: Yes, uses registered MCPServerManager which is thread-safe.
Example:
audiences := api.CollectRequiredAudiences() // Returns: ["dex-k8s-authenticator", "another-audience"]
func GetClientSessionIDFromContext ¶
GetClientSessionIDFromContext extracts the client-provided session ID from context. Returns the session ID and true if found, or empty string and false if not present.
This is a convenience function that encapsulates the context key type assertion.
func IsActiveState ¶
func IsActiveState(state ServiceState) bool
IsActiveState returns true if the given state indicates the service is actively running and operational. This includes both StateRunning (for local stdio servers) and StateConnected (for remote servers).
Use this helper when checking if a service is available for use, rather than checking for StateRunning directly, to properly handle remote servers.
func ParseRequest ¶
ParseRequest converts a map[string]interface{} to a typed request struct. This uses JSON marshaling/unmarshaling for type conversion and validation, providing strict arg checking and type safety.
The function validates that no unknown args are present and performs basic type validation according to the target struct's field types and tags.
Args:
- args: The input arguments to parse and validate
- request: Pointer to the target request struct to populate
Returns:
- error: Validation error if arguments are invalid or contain unknown fields
Example:
var req ServiceClassCreateRequest
args := map[string]interface{}{
"name": "auth",
"type": "authentication",
"operations": map[string]interface{}{...},
}
err := ParseRequest(args, &req)
if err != nil {
return fmt.Errorf("invalid request: %w", err)
}
func PublishToolUpdateEvent ¶
func PublishToolUpdateEvent(event ToolUpdateEvent)
PublishToolUpdateEvent publishes a tool update event to all registered subscribers. This function is used to notify components about changes in tool availability, such as when MCP servers are registered/deregistered or their tools change.
The event is delivered asynchronously to all subscribers. Each subscriber receives the event in a separate goroutine to prevent blocking, ensuring that slow or failing subscribers don't affect other subscribers or the publisher.
Args:
- event: ToolUpdateEvent containing details about the tool update
Thread-safe: Yes, subscriber list is safely copied before notification.
Note: Each subscriber is notified in a separate goroutine to prevent blocking. Panics in subscriber callbacks are recovered and logged as errors.
Example:
event := api.ToolUpdateEvent{
Type: "server_registered",
ServerName: "kubernetes",
Tools: []string{"kubectl_get_pods", "kubectl_describe"},
Timestamp: time.Now(),
}
api.PublishToolUpdateEvent(event)
func RegisterAggregator ¶
func RegisterAggregator(h AggregatorHandler)
RegisterAggregator registers the aggregator handler implementation. This handler provides tool execution and MCP server aggregation functionality, serving as the central component for unified tool access across multiple MCP servers.
The registration is thread-safe and should be called during system initialization. Only one aggregator handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: AggregatorHandler implementation that manages tool execution and MCP server coordination
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := &aggregator.Adapter{aggregator: myAggregator}
api.RegisterAggregator(adapter)
func RegisterAuthHandler ¶
func RegisterAuthHandler(h AuthHandler)
RegisterAuthHandler registers the auth handler implementation. This handler provides OAuth authentication for CLI commands and agent clients.
The registration is thread-safe and should be called during system initialization. Only one auth handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: AuthHandler implementation that manages OAuth operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := cli.NewAuthAdapter() api.RegisterAuthHandler(adapter)
func RegisterConfig
deprecated
func RegisterConfig(h ConfigHandler)
RegisterConfig is an alias for RegisterConfigHandler for backward compatibility. New code should prefer using RegisterConfigHandler for clarity.
Args:
- h: ConfigHandler implementation that manages configuration operations
Thread-safe: Yes, delegates to RegisterConfigHandler.
Deprecated: Use RegisterConfigHandler directly for better clarity.
func RegisterConfigHandler ¶
func RegisterConfigHandler(h ConfigHandler)
RegisterConfigHandler registers the configuration handler implementation. This handler provides runtime configuration management functionality, including configuration retrieval, updates, and persistence operations.
The registration is thread-safe and should be called during system initialization. Only one configuration handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: ConfigHandler implementation that manages configuration operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := &config.Adapter{manager: myConfigManager}
api.RegisterConfigHandler(adapter)
func RegisterEventManager ¶
func RegisterEventManager(h EventManagerHandler)
RegisterEventManager registers the event manager handler implementation. This handler provides Kubernetes Event generation functionality for CRD lifecycle operations, automatically adapting to both Kubernetes and filesystem modes through the unified client.
The registration is thread-safe and should be called during system initialization. Only one event manager handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: EventManagerHandler implementation that manages event generation operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := events.NewAdapter(musterClient) adapter.Register()
func RegisterMCPServerManager ¶
func RegisterMCPServerManager(h MCPServerManagerHandler)
RegisterMCPServerManager registers the MCP server manager handler implementation. This handler provides MCP server definition management and lifecycle operations, enabling the management of MCP servers that provide tools to the aggregator.
The registration is thread-safe and should be called during system initialization. Only one MCP server manager handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: MCPServerManagerHandler implementation that manages MCP server operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := mcpserver.NewAdapter(configPath) adapter.Register()
func RegisterMetaTools ¶
func RegisterMetaTools(h MetaToolsHandler)
RegisterMetaTools registers the meta-tools handler implementation. This handler provides server-side meta-tool functionality for AI assistants, enabling tool discovery, execution, and resource/prompt access.
The registration is thread-safe and should be called during system initialization. Only one meta-tools handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: MetaToolsHandler implementation that manages meta-tool operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := metatools.NewAdapter() adapter.Register()
func RegisterMetaToolsDataProvider ¶
func RegisterMetaToolsDataProvider(p MetaToolsDataProvider)
RegisterMetaToolsDataProvider registers the data provider for meta-tools. This is typically the aggregator, which provides access to all tools, resources, and prompts from connected MCP servers.
The registration is thread-safe and should be called during system initialization after the aggregator is created but before the metatools adapter is wired up.
Args:
- p: MetaToolsDataProvider implementation (typically the aggregator)
Thread-safe: Yes, protected by handlerMutex.
func RegisterOAuthHandler ¶
func RegisterOAuthHandler(h OAuthHandler)
RegisterOAuthHandler registers the OAuth handler implementation. This handler provides OAuth proxy functionality for remote MCP server authentication.
The registration is thread-safe and should be called during system initialization. Only one OAuth handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: OAuthHandler implementation that manages OAuth operations
Thread-safe: Yes, protected by oauthMutex.
func RegisterReconcileManager ¶
func RegisterReconcileManager(h ReconcileManagerHandler)
RegisterReconcileManager registers the reconcile manager handler implementation. This handler provides reconciliation status and control functionality, enabling automatic synchronization of resource definitions with running services.
The registration is thread-safe and should be called during system initialization. Only one reconcile manager handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: ReconcileManagerHandler implementation that manages reconciliation operations
Thread-safe: Yes, protected by handlerMutex.
func RegisterSecretCredentialsHandler ¶
func RegisterSecretCredentialsHandler(h SecretCredentialsHandler)
RegisterSecretCredentialsHandler registers the secret credentials handler implementation. This handler loads OAuth client credentials from Kubernetes secrets.
The registration is thread-safe and should be called during system initialization. Only one handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: SecretCredentialsHandler implementation
Thread-safe: Yes, protected by secretCredentialsMutex.
func RegisterServiceClassManager ¶
func RegisterServiceClassManager(h ServiceClassManagerHandler)
RegisterServiceClassManager registers the service class manager handler implementation. This handler provides ServiceClass definition management and lifecycle tool access, enabling the creation of service instances from predefined templates.
The registration is thread-safe and should be called during system initialization. Only one service class manager handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: ServiceClassManagerHandler implementation that manages ServiceClass operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := serviceclass.NewAdapter(configPath) adapter.Register()
func RegisterServiceManager ¶
func RegisterServiceManager(h ServiceManagerHandler)
RegisterServiceManager registers the service manager handler implementation. This handler provides unified service lifecycle management for both static services and ServiceClass-based service instances.
The registration is thread-safe and should be called during system initialization. Only one service manager handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: ServiceManagerHandler implementation that manages service lifecycle operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := &services.ManagerAdapter{manager: myManager}
api.RegisterServiceManager(adapter)
func RegisterServiceRegistry ¶
func RegisterServiceRegistry(h ServiceRegistryHandler)
RegisterServiceRegistry registers the service registry handler implementation. This handler provides access to all registered services in the system, including both static services (defined in configuration) and dynamic ServiceClass-based service instances.
The registration is thread-safe and should be called during system initialization. Only one service registry handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: ServiceRegistryHandler implementation that manages service discovery and information
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := &myservice.RegistryAdapter{registry: myRegistry}
api.RegisterServiceRegistry(adapter)
func RegisterSessionInitCallback ¶
func RegisterSessionInitCallback(cb SessionInitCallback)
RegisterSessionInitCallback registers a callback for session initialization. This callback is triggered on the first authenticated MCP request for a session, enabling proactive SSO connections to be established.
Thread-safe: Yes, protected by sessionInitMutex.
func RegisterTeleportClient ¶
func RegisterTeleportClient(h TeleportClientHandler)
RegisterTeleportClient registers the Teleport client handler implementation. This handler provides HTTP clients configured with Teleport Machine ID certificates for accessing MCP servers on private installations via Teleport Application Access.
The registration is thread-safe and should be called during system initialization. Only one Teleport client handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: TeleportClientHandler implementation that provides Teleport HTTP clients
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := teleport.NewAdapter() adapter.Register()
func RegisterWorkflow ¶
func RegisterWorkflow(h WorkflowHandler)
RegisterWorkflow registers the workflow handler implementation. This handler provides workflow definition management and execution functionality, allowing components to define and execute multi-step processes through the system.
The registration is thread-safe and should be called during system initialization. Only one workflow handler can be registered at a time; subsequent registrations will replace the previous handler.
Args:
- h: WorkflowHandler implementation that manages workflow operations
Thread-safe: Yes, protected by handlerMutex.
Example:
adapter := workflow.NewAdapter(toolCaller, toolChecker) adapter.Register()
func SubscribeToToolUpdates ¶
func SubscribeToToolUpdates(subscriber ToolUpdateSubscriber)
SubscribeToToolUpdates allows components to subscribe to tool availability change events. Subscribers will receive notifications when tools are added, removed, or updated across MCP servers. This enables components to react to changes in the tool landscape in real-time.
The subscription is thread-safe and can be called from any goroutine. Subscriber callbacks are executed in separate goroutines to prevent blocking the event publishing mechanism.
Args:
- subscriber: ToolUpdateSubscriber that will receive tool update notifications
Thread-safe: Yes, protected by toolUpdateMutex.
Note: Subscriber callbacks are executed asynchronously and should not block. Panics in subscriber callbacks are recovered and logged as errors.
Example:
type MySubscriber struct{}
func (s *MySubscriber) OnToolsUpdated(event api.ToolUpdateEvent) {
fmt.Printf("Tools updated: %s\n", event.Type)
}
subscriber := &MySubscriber{}
api.SubscribeToToolUpdates(subscriber)
func UpdateMCPServerState ¶
func UpdateMCPServerState(name string, state ServiceState, health HealthStatus, err error) error
UpdateMCPServerState updates the state of an MCPServer service. This is used when external events (such as SSO authentication success) need to update the service state. The function retrieves the service from the registry, checks if it implements StateUpdater, and updates its state.
This function is typically called by the aggregator when: - SSO token forwarding succeeds for a session - SSO token exchange succeeds for a session
The state update will trigger the reconciler to sync the new state to the CRD status.
Args:
- name: The name of the MCPServer service to update
- state: The new service state (typically StateConnected for SSO success)
- health: The new health status (typically HealthHealthy for SSO success)
- err: Optional error to associate with the state (typically nil for success)
Returns:
- error: Error if the service doesn't exist, doesn't support state updates, or update fails
Thread-safe: Yes, operations are synchronized appropriately.
Example:
// Update MCPServer state after SSO token forwarding succeeds
if err := api.UpdateMCPServerState("gazelle-mcp-kubernetes", api.StateConnected, api.HealthHealthy, nil); err != nil {
logging.Warn("SSO", "Failed to update MCPServer state: %v", err)
}
Types ¶
type AggregatorHandler ¶
type AggregatorHandler interface {
// GetServiceData returns runtime data and configuration information about the aggregator service.
// This includes connection details, registered servers, and operational metrics.
//
// Returns:
// - map[string]interface{}: Service-specific data and metadata
GetServiceData() map[string]interface{}
// GetEndpoint returns the network endpoint where the aggregator is accessible.
// This is typically used for client connections and service discovery.
//
// Returns:
// - string: The network endpoint (e.g., "localhost" or IP address)
GetEndpoint() string
// GetPort returns the network port where the aggregator is listening.
// This is used in conjunction with GetEndpoint for complete connection information.
//
// Returns:
// - int: The port number where the aggregator service is accessible
GetPort() int
// CallTool executes a tool through the aggregator and returns a structured result.
// This method provides the primary interface for external tool execution with
// result formatting suitable for API responses.
//
// Args:
// - ctx: Context for request cancellation and timeout control
// - toolName: The name of the tool to execute
// - args: Arguments to pass to the tool as key-value pairs
//
// Returns:
// - *CallToolResult: Structured result containing tool output and metadata
// - error: nil on success, or an error if the tool call fails
CallTool(ctx context.Context, toolName string, args map[string]interface{}) (*CallToolResult, error)
// CallToolInternal executes a tool and returns the raw MCP result.
// This method is used internally by workflow and other subsystems that need
// direct access to the underlying MCP tool result format.
//
// Args:
// - ctx: Context for request cancellation and timeout control
// - toolName: The name of the tool to execute
// - args: Arguments to pass to the tool as key-value pairs
//
// Returns:
// - *mcp.CallToolResult: Raw MCP tool result
// - error: nil on success, or an error if the tool call fails
CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
// IsToolAvailable checks whether a specific tool is currently available for execution.
// This is used for validation before attempting tool calls.
//
// Args:
// - toolName: The name of the tool to check
//
// Returns:
// - bool: true if the tool is available, false otherwise
IsToolAvailable(toolName string) bool
// GetAvailableTools returns a list of all currently available tool names.
// This is used for tool discovery and validation.
//
// Returns:
// - []string: Slice of available tool names (empty if no tools available)
GetAvailableTools() []string
// UpdateCapabilities triggers a refresh of the aggregator's capability information.
// This should be called when MCP servers are added, removed, or their tools change.
UpdateCapabilities()
// RegisterServerPendingAuth registers a server that requires OAuth authentication.
// This creates a placeholder server in auth_required state with a synthetic
// authentication tool that users can call to initiate the OAuth flow.
//
// Args:
// - serverName: Unique name of the server
// - url: The server endpoint URL
// - toolPrefix: Server-specific prefix for tools
// - authInfo: OAuth information from the 401 response
//
// Returns an error if registration fails.
RegisterServerPendingAuth(serverName, url, toolPrefix string, authInfo *AuthInfo) error
// RegisterServerPendingAuthWithConfig registers a server that requires OAuth authentication
// with additional auth configuration for SSO token forwarding.
//
// Args:
// - serverName: Unique name of the server
// - url: The server endpoint URL
// - toolPrefix: Server-specific prefix for tools
// - authInfo: OAuth information from the 401 response
// - authConfig: Auth configuration for token forwarding (may be nil)
//
// Returns an error if registration fails.
RegisterServerPendingAuthWithConfig(serverName, url, toolPrefix string, authInfo *AuthInfo, authConfig *MCPServerAuth) error
}
AggregatorHandler provides aggregator-specific functionality for managing MCP server tools It acts as the central coordinator for tool availability, execution, and tool management across all registered MCP servers.
func GetAggregator ¶
func GetAggregator() AggregatorHandler
GetAggregator returns the registered aggregator handler. This provides access to tool execution and MCP server aggregation functionality.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- AggregatorHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
aggregator := api.GetAggregator()
if aggregator == nil {
return fmt.Errorf("aggregator not available")
}
result, err := aggregator.CallTool(ctx, "my-tool", args)
type Arg ¶
type Arg struct {
// Type specifies the expected data type (string, number, boolean, object, array)
Type string `yaml:"type" json:"type"`
// Required indicates whether this argument must be provided
Required bool `yaml:"required" json:"required"`
// Description provides human-readable documentation for the argument
Description string `yaml:"description" json:"description"`
// Default specifies the default value used when the argument is not provided.
// Only applicable when Required is false.
Default interface{} `yaml:"default,omitempty" json:"default,omitempty"`
}
Arg defines an argument for operations and workflows. This provides a standardized way to define input validation and documentation for both workflow inputs and operation arguments.
type ArgDefinition ¶
type ArgDefinition struct {
// Type specifies the expected data type for this arg.
// Valid types are "string", "integer", "boolean", and "number".
// This is used for runtime type validation during service creation.
Type string `yaml:"type" json:"type"`
// Required indicates whether this arg must be provided when creating a service instance.
// Required args without default values will cause service creation to fail if not provided.
Required bool `yaml:"required" json:"required"`
// Default provides a default value to use if this arg is not provided during service creation.
// The default value must match the specified Type. If no default is provided and the arg
// is not required, the arg will be omitted from the service configuration.
Default interface{} `yaml:"default,omitempty" json:"default,omitempty"`
// Description provides human-readable documentation for this arg.
// This is used for generating help text, UI forms, and API documentation.
Description string `yaml:"description,omitempty" json:"description,omitempty"`
}
ArgDefinition defines validation rules and metadata for a service creation argument. This structure provides comprehensive argument validation capabilities for ServiceClass templates, ensuring that service instances are created with valid configuration values.
type ArgMetadata ¶
type ArgMetadata struct {
// Name is the argument identifier used in tool calls
Name string
// Type specifies the expected argument type for validation.
// Valid values: "string", "number", "boolean", "object", "array"
Type string
// Required indicates whether this argument must be provided in tool calls
Required bool
// Description provides human-readable documentation for this argument,
// explaining its purpose and expected format
Description string
// Default specifies the default value used when the argument is not provided.
// Only used when Required is false. Must match the specified Type.
Default interface{}
// Schema provides detailed JSON Schema definition for complex types.
// When specified, this takes precedence over the basic Type field for
// generating detailed MCP schemas. This is particularly useful for:
// - Object types that need property definitions and validation rules
// - Array types that need item type specifications
// - Advanced validation constraints (patterns, ranges, etc.)
//
// For simple types (string, number, boolean), this field can be omitted
// and the basic Type field will be used.
Schema map[string]interface{} `json:"schema,omitempty"`
}
ArgMetadata describes a single argument for a tool. This is used for validation, documentation, and UI generation for tool arguments in various interfaces.
Argument metadata enables automatic validation of tool calls and helps generate helpful error messages when arguments are invalid.
type AuthChallenge ¶
type AuthChallenge struct {
// Status indicates this is an auth required response.
Status string `json:"status"` // "auth_required"
// AuthURL is the OAuth authorization URL the user should visit.
AuthURL string `json:"auth_url"`
// ServerName is the name of the MCP server requiring authentication.
ServerName string `json:"server_name,omitempty"`
// Message is a human-readable description of why auth is needed.
Message string `json:"message,omitempty"`
}
AuthChallenge represents an authentication challenge returned when a remote MCP server requires OAuth authentication.
type AuthCompletionCallback ¶
type AuthCompletionCallback func(ctx context.Context, sessionID, serverName, accessToken string) error
AuthCompletionCallback is called after successful OAuth authentication. The aggregator registers this callback to establish session connections when users authenticate to MCP servers via the browser OAuth flow.
Args:
- ctx: Context for the operation
- sessionID: The MCP session ID that authenticated
- serverName: The name of the MCP server that was authenticated to
- accessToken: The access token to use for the connection
Returns an error if the connection could not be established.
type AuthHandler ¶
type AuthHandler interface {
// CheckAuthRequired probes the endpoint to determine if OAuth is required.
// Returns true if 401 was received and OAuth flow should be initiated.
CheckAuthRequired(ctx context.Context, endpoint string) (bool, error)
// HasCredentials reports whether usable credentials exist for the
// endpoint: either a non-expired access token or an expired token with
// a refresh token that the mcp-go transport can use for automatic
// refresh.
HasCredentials(endpoint string) bool
// GetBearerToken returns a valid Bearer token for the endpoint.
// Returns an error if not authenticated.
GetBearerToken(endpoint string) (string, error)
// Login initiates the OAuth flow for the given endpoint.
// Opens browser and waits for callback completion.
Login(ctx context.Context, endpoint string) error
// LoginWithIssuer initiates the OAuth flow for the given endpoint with a known issuer.
// This is used when the issuer URL is already known (e.g., from a WWW-Authenticate header).
LoginWithIssuer(ctx context.Context, endpoint, issuerURL string) error
// Logout clears stored tokens for the endpoint.
Logout(endpoint string) error
// LogoutAll clears all stored tokens.
LogoutAll() error
// GetStatus returns authentication status for all known endpoints.
GetStatus() []AuthStatus
// GetStatusForEndpoint returns authentication status for a specific endpoint.
GetStatusForEndpoint(endpoint string) *AuthStatus
// InvalidateCache removes any cached state for the given endpoint.
// This forces the next status or token lookup to read fresh data from
// the persistent store. Call this after an external mechanism (e.g.
// mcp-go's transport) may have refreshed a token outside of this handler.
InvalidateCache(endpoint string)
// GetSessionID returns a persistent session ID for this CLI user.
// This is used for the X-Muster-Session-ID header to enable MCP server
// token persistence across CLI invocations. Returns empty string if not available.
GetSessionID() string
// Close cleans up any resources held by the auth handler.
Close() error
}
AuthHandler provides OAuth authentication for CLI and agent clients. This interface abstracts authentication operations, enabling consistent auth handling across all CLI commands while maintaining testability.
Following the project's service locator pattern, this interface is defined in the API layer and implemented by adapters in consuming packages.
func GetAuthHandler ¶
func GetAuthHandler() AuthHandler
GetAuthHandler returns the registered auth handler. This provides access to OAuth authentication functionality.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- AuthHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
handler := api.GetAuthHandler()
if handler == nil {
return fmt.Errorf("auth handler not available")
}
if err := handler.Login(ctx, endpoint); err != nil {
return fmt.Errorf("login failed: %w", err)
}
type AuthInfo ¶
type AuthInfo struct {
// Issuer is the OAuth issuer URL (from WWW-Authenticate realm)
Issuer string `json:"issuer,omitempty"`
// Scope is the OAuth scope required by the server
Scope string `json:"scope,omitempty"`
// ResourceMetadataURL is the URL to fetch OAuth metadata (MCP-specific)
ResourceMetadataURL string `json:"resource_metadata_url,omitempty"`
}
AuthInfo contains OAuth authentication information extracted from a 401 response during MCP server initialization.
type AuthStatus ¶
type AuthStatus struct {
// Endpoint is the URL of the authenticated endpoint.
Endpoint string
// Authenticated indicates whether there is a valid token.
Authenticated bool
// ExpiresAt is when the current token expires.
ExpiresAt time.Time
// IssuerURL is the OAuth issuer that issued this token.
IssuerURL string
// Subject is the authenticated user's subject (sub) claim from the token.
// This is typically a unique user identifier.
Subject string
// Email is the authenticated user's email address (if available in the token).
Email string
// HasRefreshToken indicates whether a refresh token is available for this endpoint.
// If false, the token cannot be refreshed and will require re-authentication when it expires.
HasRefreshToken bool
// RefreshExpiresAt is the estimated time when the refresh token (session) expires.
// This represents the muster-side refresh token expiry, calculated from the token's
// creation time plus the configured refresh token TTL. The actual session may end
// earlier if the upstream provider (e.g., Dex) has a shorter absolute lifetime.
RefreshExpiresAt time.Time
// Error is non-empty if the auth check failed.
Error string
}
AuthStatus represents authentication state for a single endpoint.
type CallToolResult ¶
type CallToolResult struct {
// Content contains the actual result data from the tool execution.
// Can be strings, objects, or any other JSON-serializable data.
//
// For successful executions: contains the tool's output data
// For errors: contains error messages and diagnostic information
Content []interface{} `json:"content"`
// IsError indicates whether the tool execution resulted in an error.
// true: The execution failed and Content contains error information
// false: The execution succeeded and Content contains the result data
IsError bool `json:"isError,omitempty"`
}
CallToolResult represents the result of a tool execution or capability call. This standardized result format is used across all tool calling interfaces to provide consistent response handling throughout the muster system.
The result can represent either successful execution (IsError=false) or failure conditions (IsError=true), with Content containing the appropriate response data or error information.
func HandleErrorWithPrefix ¶
func HandleErrorWithPrefix(err error, prefix string) *CallToolResult
HandleErrorWithPrefix creates an appropriate CallToolResult with a custom prefix. This function is similar to HandleError but allows customizing the error message prefix for more specific error context.
Args:
- err: The error to handle and format
- prefix: Custom prefix to prepend to the error message
Returns:
- *CallToolResult: A CallToolResult with prefixed error information and IsError set to true
Example:
if err != nil {
return api.HandleErrorWithPrefix(err, "Failed to create service")
}
type ClientCredentials ¶
type ClientCredentials struct {
// ClientID is the OAuth client ID.
ClientID string
// ClientSecret is the OAuth client secret.
ClientSecret string
}
ClientCredentials contains OAuth client credentials loaded from a secret.
type ClientCredentialsSecretRef ¶
type ClientCredentialsSecretRef struct {
// Name is the name of the Kubernetes Secret.
// Required.
Name string `yaml:"name" json:"name"`
// Namespace is the Kubernetes namespace where the secret is located.
// If not specified, defaults to the MCPServer's namespace.
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
// ClientIDKey is the key in the secret data that contains the client ID.
// Defaults to "client-id" if not specified.
ClientIDKey string `yaml:"clientIdKey,omitempty" json:"clientIdKey,omitempty"`
// ClientSecretKey is the key in the secret data that contains the client secret.
// Defaults to "client-secret" if not specified.
ClientSecretKey string `yaml:"clientSecretKey,omitempty" json:"clientSecretKey,omitempty"`
}
ClientCredentialsSecretRef references a Kubernetes Secret containing OAuth client credentials for token exchange authentication.
type ClientSessionIDContextKey ¶
type ClientSessionIDContextKey struct{}
ClientSessionIDContextKey is the context key for storing client-provided session IDs. This type is defined in the api package to ensure both the aggregator and server packages use the same type identity when setting/getting context values.
Usage:
// Setting the value (in middleware)
ctx := context.WithValue(r.Context(), api.ClientSessionIDContextKey{}, sessionID)
// Getting the value
if sessionID, ok := ctx.Value(api.ClientSessionIDContextKey{}).(string); ok {
// use sessionID
}
type ConfigAPI ¶
type ConfigAPI interface {
// GetConfig returns the entire muster configuration including all sections.
// This provides access to the complete configuration state for comprehensive
// configuration management scenarios.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
//
// Returns:
// - *config.MusterConfig: The complete system configuration
// - error: Error if configuration cannot be retrieved or parsed
//
// Example:
//
// cfg, err := configAPI.GetConfig(ctx)
// if err != nil {
// return fmt.Errorf("configuration unavailable: %w", err)
// }
// fmt.Printf("System listening on port: %d\n", cfg.Aggregator.Port)
GetConfig(ctx context.Context) (*config.MusterConfig, error)
// GetAggregatorConfig returns only the aggregator configuration section.
// This is useful when only aggregator-specific settings are needed,
// avoiding the overhead of retrieving the entire configuration.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - *config.AggregatorConfig: The aggregator configuration section
// - error: Error if configuration cannot be retrieved
//
// Example:
//
// aggConfig, err := configAPI.GetAggregatorConfig(ctx)
// if err != nil {
// return err
// }
// fmt.Printf("Aggregator endpoint: %s:%d\n", aggConfig.Host, aggConfig.Port)
GetAggregatorConfig(ctx context.Context) (*config.AggregatorConfig, error)
// UpdateAggregatorConfig updates the aggregator configuration section.
// Changes take effect immediately but are not persisted until SaveConfig is called.
//
// Args:
// - ctx: Context for the operation
// - aggregator: The new aggregator configuration to apply
//
// Returns:
// - error: Error if the update fails or configuration is invalid
//
// Note: Changes are not persisted to disk until SaveConfig is called.
//
// Example:
//
// aggConfig.Port = 9090
// err := configAPI.UpdateAggregatorConfig(ctx, aggConfig)
// if err != nil {
// return fmt.Errorf("failed to update aggregator config: %w", err)
// }
// // Don't forget to save!
// err = configAPI.SaveConfig(ctx)
UpdateAggregatorConfig(ctx context.Context, aggregator config.AggregatorConfig) error
// SaveConfig persists the current in-memory configuration to disk.
// This makes all pending configuration changes permanent and updates
// the configuration files on disk.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - error: Error if the configuration cannot be saved to disk
//
// Note: This operation is atomic - either all changes are saved or none are.
//
// Example:
//
// // After making configuration changes...
// err := configAPI.SaveConfig(ctx)
// if err != nil {
// return fmt.Errorf("failed to persist configuration: %w", err)
// }
// fmt.Println("Configuration saved successfully")
SaveConfig(ctx context.Context) error
// ReloadConfig reloads the configuration from disk, discarding any unsaved changes.
// This is useful for reverting to the last saved configuration state or
// picking up external changes to configuration files.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - error: Error if the configuration cannot be reloaded from disk
//
// Warning: This operation discards all unsaved in-memory changes.
//
// Example:
//
// // Revert to last saved configuration
// err := configAPI.ReloadConfig(ctx)
// if err != nil {
// return fmt.Errorf("failed to reload configuration: %w", err)
// }
// fmt.Println("Configuration reloaded from disk")
ReloadConfig(ctx context.Context) error
}
ConfigServiceAPI defines the interface for managing configuration at runtime. This interface provides a higher-level abstraction over the ConfigHandler for configuration management operations, including retrieval, updates, and persistence.
The ConfigServiceAPI is designed for direct use by components that need configuration management capabilities, while ConfigHandler is used in the Service Locator Pattern.
Example usage:
configAPI := api.NewConfigServiceAPI()
// Get current configuration
cfg, err := configAPI.GetConfig(ctx)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
// Update aggregator settings
cfg.Aggregator.Port = 8080
err = configAPI.UpdateAggregatorConfig(ctx, cfg.Aggregator)
if err != nil {
return fmt.Errorf("failed to update config: %w", err)
}
// Persist changes to disk
err = configAPI.SaveConfig(ctx)
if err != nil {
return fmt.Errorf("failed to save config: %w", err)
}
func NewConfigServiceAPI ¶
func NewConfigServiceAPI() ConfigAPI
NewConfigServiceAPI creates a new ConfigServiceAPI instance. This function returns a ready-to-use configuration API that delegates operations to the registered ConfigHandler.
Returns:
- ConfigAPI: A new configuration API instance
Note: The returned API will only work if a ConfigHandler has been registered through the Service Locator Pattern. Operations will fail with appropriate error messages if no handler is available.
Example:
configAPI := api.NewConfigServiceAPI() // Use the API for configuration operations...
type ConfigHandler ¶
type ConfigHandler interface {
// GetConfig returns the entire system configuration.
// This provides access to all configuration sections including
// aggregator settings, global settings, and any other configured components.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
//
// Returns:
// - *config.MusterConfig: The complete system configuration
// - error: Error if configuration cannot be retrieved or is corrupted
//
// Example:
//
// cfg, err := handler.GetConfig(ctx)
// if err != nil {
// return fmt.Errorf("configuration error: %w", err)
// }
// // Access any configuration section
// fmt.Printf("System has %d MCP servers configured\n", len(cfg.MCPServers))
GetConfig(ctx context.Context) (*config.MusterConfig, error)
// GetAggregatorConfig returns the aggregator-specific configuration section.
// This method is optimized for cases where only aggregator settings are needed.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - *config.AggregatorConfig: The aggregator configuration
// - error: Error if configuration cannot be retrieved
GetAggregatorConfig(ctx context.Context) (*config.AggregatorConfig, error)
// UpdateAggregatorConfig updates the aggregator configuration section.
// The new configuration is validated before being applied to ensure system stability.
//
// Args:
// - ctx: Context for the operation
// - aggregator: The new aggregator configuration to apply
//
// Returns:
// - error: Error if the update fails, validation fails, or configuration is invalid
//
// Note: Changes are applied immediately but not persisted until SaveConfig is called.
UpdateAggregatorConfig(ctx context.Context, aggregator config.AggregatorConfig) error
// SaveConfig persists the current configuration to disk.
// This operation is atomic - either all changes are saved successfully
// or no changes are made to the persistent storage.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - error: Error if the configuration cannot be saved, disk is full, or permissions are insufficient
//
// Example:
//
// err := handler.SaveConfig(ctx)
// if err != nil {
// return fmt.Errorf("configuration persistence failed: %w", err)
// }
SaveConfig(ctx context.Context) error
// ReloadConfig reloads the configuration from disk, discarding any unsaved changes.
// This is useful for reverting to the last known good configuration or
// picking up external configuration changes.
//
// Args:
// - ctx: Context for the operation
//
// Returns:
// - error: Error if the configuration cannot be reloaded, files are corrupted, or parsing fails
//
// Warning: This operation discards all unsaved in-memory configuration changes.
ReloadConfig(ctx context.Context) error
// ToolProvider integration for exposing configuration management as discoverable MCP tools.
// This allows configuration operations to be invoked through the standard
// tool discovery and execution mechanisms, enabling external configuration management.
ToolProvider
}
ConfigHandler provides configuration management functionality within the Service Locator Pattern. This handler is the primary interface for runtime configuration management, including configuration retrieval, updates, and persistence operations.
The ConfigHandler abstracts the underlying configuration storage and management, allowing components to manage configuration without knowing the specific implementation details of how configuration is stored or processed.
Key features: - Runtime configuration updates without system restart - Atomic configuration persistence to prevent partial updates - Configuration validation to ensure system stability - Section-specific access for performance optimization
Thread-safety: All methods are safe for concurrent use.
func GetConfig
deprecated
func GetConfig() ConfigHandler
GetConfig is an alias for GetConfigHandler for backward compatibility. New code should prefer using GetConfigHandler for clarity.
Returns:
- ConfigHandler: The registered handler, or nil if not registered
Thread-safe: Yes, delegates to GetConfigHandler.
Deprecated: Use GetConfigHandler directly for better clarity.
func GetConfigHandler ¶
func GetConfigHandler() ConfigHandler
GetConfigHandler returns the registered configuration handler. This provides access to runtime configuration management functionality.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- ConfigHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
configHandler := api.GetConfigHandler()
if configHandler == nil {
return fmt.Errorf("config handler not available")
}
config, err := configHandler.GetConfig(ctx)
type ConfigurableService ¶
type ConfigurableService interface {
ServiceInfo
// UpdateConfiguration updates the service's internal configuration.
// This should be called before restarting a service when its definition changes.
//
// Args:
// - config: The new configuration (type depends on service implementation)
//
// Returns:
// - error: Error if the configuration is invalid or update fails
UpdateConfiguration(config interface{}) error
}
ConfigurableService extends ServiceInfo with the ability to update configuration. Services that can have their configuration updated at runtime should implement this interface to allow reconcilers and managers to update configuration before restarting the service.
type CreateServiceInstanceRequest ¶
type CreateServiceInstanceRequest struct {
// ServiceClassName specifies which ServiceClass template to use for creating the instance.
// The ServiceClass must exist and be available in the system.
ServiceClassName string `json:"serviceClassName"`
// Name provides a unique name for the new service instance.
// This name must be unique across all service instances in the system.
Name string `json:"name"`
// Args contains the configuration values for the new service instance.
// These args are validated against the ServiceClass arg definitions
// and used to customize the service behavior.
Args map[string]interface{} `json:"args"`
// Persist determines whether this service instance definition should be saved to YAML files.
// When true, the instance configuration will be persisted and survive system restarts.
// When false, the instance exists only in memory for the current session.
Persist bool `json:"persist,omitempty"`
// AutoStart specifies whether this service instance should be automatically started
// when the system boots up or when all dependencies become available.
AutoStart bool `json:"autoStart,omitempty"`
// CreateTimeout overrides the default timeout for service creation operations.
// If not specified, the system default timeout will be used.
CreateTimeout *time.Duration `json:"createTimeout,omitempty"`
// DeleteTimeout overrides the default timeout for service deletion operations.
// If not specified, the system default timeout will be used.
DeleteTimeout *time.Duration `json:"deleteTimeout,omitempty"`
}
CreateServiceInstanceRequest represents a request to create a new service instance from a ServiceClass template. This request contains all the information needed to instantiate and configure a new service based on a predefined ServiceClass blueprint.
type CreateWorkflowRequest ¶
type CreateWorkflowRequest struct {
// Name is the unique identifier for the new workflow
Name string `yaml:"name" json:"name"`
// Description provides human-readable documentation for the workflow
Description string `yaml:"description,omitempty" json:"description,omitempty"`
// Args defines the expected input arguments for workflow execution
Args map[string]ArgDefinition `yaml:"args,omitempty" json:"args,omitempty"`
// Steps defines the sequence of operations to be performed during workflow execution
Steps []WorkflowStep `yaml:"steps" json:"steps"`
}
CreateWorkflowRequest represents a request to create a new workflow. This is used for API-based workflow creation with validation and structured input.
type EventManagerHandler ¶
type EventManagerHandler interface {
// CreateEvent creates an event for a specific object reference.
// This method is used when you have the complete object reference information
// but not necessarily the actual Kubernetes object.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - objectRef: Reference to the object this event relates to
// - reason: Short, machine-readable reason for the event (e.g., "Created", "Failed")
// - message: Human-readable description of the event
// - eventType: Type of event ("Normal" or "Warning")
//
// Returns:
// - error: Error if event creation fails
//
// Example:
//
// objectRef := ObjectReference{
// Kind: "MCPServer",
// Name: "github-server",
// Namespace: "default",
// }
// err := handler.CreateEvent(ctx, objectRef, "Created", "MCPServer successfully created", "Normal")
CreateEvent(ctx context.Context, objectRef ObjectReference, reason, message, eventType string) error
// CreateEventForCRD creates an event for a CRD by type, name, and namespace.
// This method is used when you know the CRD details but don't have the full object reference.
//
// Args:
// - ctx: Context for the operation
// - crdType: Type of CRD ("MCPServer", "ServiceClass", "Workflow")
// - name: Name of the CRD instance
// - namespace: Namespace of the CRD instance
// - reason: Short, machine-readable reason for the event
// - message: Human-readable description of the event
// - eventType: Type of event ("Normal" or "Warning")
//
// Returns:
// - error: Error if event creation fails
//
// Example:
//
// err := handler.CreateEventForCRD(ctx, "MCPServer", "github-server", "default",
// "Started", "MCPServer service started successfully", "Normal")
CreateEventForCRD(ctx context.Context, crdType, name, namespace, reason, message, eventType string) error
// QueryEvents retrieves events based on the provided filtering options.
// This method works with both Kubernetes and filesystem modes:
// - Kubernetes mode: Queries the native Kubernetes Events API
// - Filesystem mode: Parses stored event files
//
// Args:
// - ctx: Context for the operation
// - options: Filtering options for the event query
//
// Returns:
// - *EventQueryResult: Query result containing matching events
// - error: Error if query fails
//
// Example:
//
// options := EventQueryOptions{
// ResourceType: "MCPServer",
// Namespace: "default",
// Limit: 50,
// }
// result, err := handler.QueryEvents(ctx, options)
QueryEvents(ctx context.Context, options EventQueryOptions) (*EventQueryResult, error)
// IsKubernetesMode returns true if the event manager is using Kubernetes mode.
// This can be useful for components that need to adapt their behavior
// based on the deployment environment.
//
// Returns:
// - bool: true if using Kubernetes mode, false if using filesystem mode
//
// Example:
//
// if handler.IsKubernetesMode() {
// // Can access events via kubectl get events
// } else {
// // Events are logged to console and events.log
// }
IsKubernetesMode() bool
}
EventManagerHandler provides Kubernetes Event generation functionality for muster CRD lifecycle operations and service management.
This interface abstracts the event generation system, allowing components to create events without knowing the specific implementation details of how events are stored or delivered (Kubernetes Events API vs filesystem logging).
The handler automatically adapts to the current client mode:
- Kubernetes mode: Creates actual Kubernetes Events API objects
- Filesystem mode: Logs events to console and events.log file
Key features: - Unified event generation across both Kubernetes and filesystem modes - Dynamic message templating with contextual data - Automatic event type classification (Normal vs Warning) - Support for both CRD objects and synthetic references
Thread-safety: All methods are safe for concurrent use.
func GetEventManager ¶
func GetEventManager() EventManagerHandler
GetEventManager returns the registered event manager handler. This provides access to Kubernetes Event generation functionality for CRD lifecycle operations.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- EventManagerHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
eventManager := api.GetEventManager()
if eventManager == nil {
return fmt.Errorf("event manager not available")
}
err := eventManager.CreateEvent(ctx, objectRef, "Created", "Object successfully created", "Normal")
type EventQueryOptions ¶
type EventQueryOptions struct {
// ResourceType filters events by object kind (MCPServer, ServiceClass, Workflow, ServiceInstance)
ResourceType string `json:"resourceType,omitempty"`
// ResourceName filters events by object name
ResourceName string `json:"resourceName,omitempty"`
// Namespace filters events by namespace
Namespace string `json:"namespace,omitempty"`
// EventType filters by event type (Normal, Warning)
EventType string `json:"eventType,omitempty"`
// Since filters events that occurred after this time
Since *time.Time `json:"since,omitempty"`
// Until filters events that occurred before this time
Until *time.Time `json:"until,omitempty"`
// Limit restricts the number of events returned
Limit int `json:"limit,omitempty"`
}
EventQueryOptions represents filtering options for event queries.
type EventQueryResult ¶
type EventQueryResult struct {
// Events is the list of events matching the query
Events []EventResult `json:"events"`
// TotalCount is the total number of events (before limit is applied)
TotalCount int `json:"totalCount"`
}
EventQueryResult represents the result of an event query.
type EventResult ¶
type EventResult struct {
// Timestamp when the event occurred
Timestamp time.Time `json:"timestamp"`
// Namespace of the involved object
Namespace string `json:"namespace"`
// InvolvedObject information
InvolvedObject ObjectReference `json:"involvedObject"`
// Reason for the event
Reason string `json:"reason"`
// Message describing the event
Message string `json:"message"`
// Type of event (Normal, Warning)
Type string `json:"type"`
// Source component that generated the event
Source string `json:"source"`
// Count for how many times this event occurred (Kubernetes mode)
Count int32 `json:"count,omitempty"`
}
EventResult represents a single event result.
type GetWorkflowExecutionRequest ¶
type GetWorkflowExecutionRequest struct {
// ExecutionID is the unique identifier of the execution to retrieve
ExecutionID string `json:"execution_id"`
// IncludeSteps controls whether detailed step information is included (default: true)
IncludeSteps bool `json:"include_steps,omitempty"`
// StepID specifies a specific step to retrieve (optional, returns only that step)
StepID string `json:"step_id,omitempty"`
}
GetWorkflowExecutionRequest represents a request to get detailed information about a specific workflow execution.
This request structure enables flexible querying of execution details with options to include/exclude step information and retrieve specific step results.
type HealthCheckConfig ¶
type HealthCheckConfig struct {
// Enabled determines whether health checks should be performed.
// When false, the component health status remains unknown.
Enabled bool `yaml:"enabled" json:"enabled"`
// Interval specifies how often to perform health checks.
// Shorter intervals provide faster failure detection but use more resources.
Interval time.Duration `yaml:"interval" json:"interval"`
// FailureThreshold is the number of consecutive failures before marking unhealthy.
// Higher values reduce false negatives but increase detection time.
FailureThreshold int `yaml:"failureThreshold" json:"failureThreshold"`
// SuccessThreshold is the number of consecutive successes before marking healthy.
// Higher values reduce false positives but increase recovery time.
SuccessThreshold int `yaml:"successThreshold" json:"successThreshold"`
}
HealthCheckConfig defines health checking behavior for services and components. This configuration controls how often health checks are performed and when a component is considered unhealthy based on check results.
Health check configuration enables automated monitoring and helps maintain system reliability by detecting and responding to component failures.
type HealthCheckExpectation ¶
type HealthCheckExpectation struct {
// Success indicates whether the tool call itself should succeed (default: true)
Success *bool `yaml:"success,omitempty" json:"success,omitempty"`
// JsonPath defines specific field values that should match in the tool response.
// Format: fieldPath: expectedValue
JsonPath map[string]interface{} `yaml:"json_path,omitempty" json:"json_path,omitempty"`
}
HealthCheckExpectation defines the expected conditions for health check evaluation. This mirrors the structure used in workflow step conditions.
type HealthCheckToolCall ¶
type HealthCheckToolCall struct {
// Tool specifies the name of the tool to call.
// Must correspond to an available tool in the aggregator.
Tool string `yaml:"tool" json:"tool"`
// Args provides static arguments to pass to the tool.
// These can be combined with dynamic arguments from service args.
Args map[string]interface{} `yaml:"args" json:"args"`
// Expect defines conditions that must be met for the service to be considered healthy.
// Similar to workflow step conditions, this supports success checks and JSON path matching.
Expect *HealthCheckExpectation `yaml:"expect,omitempty" json:"expect,omitempty"`
// ExpectNot defines conditions that must NOT be met for the service to be considered healthy.
// If any of these conditions are met, the service is considered unhealthy.
ExpectNot *HealthCheckExpectation `yaml:"expect_not,omitempty" json:"expect_not,omitempty"`
}
HealthCheckToolCall defines how to call a health check tool with condition evaluation. This extends ToolCall with expectation matching similar to workflow conditions.
type HealthStatus ¶
type HealthStatus string
HealthStatus represents the health status of a service, capability, or other component. This standardized status is used across all muster components for consistent health reporting.
Health status provides a unified way to represent component operational state, enabling consistent monitoring and alerting across the entire system.
const ( // HealthUnknown indicates the health status cannot be determined. // This is the default state when no health check has been performed. HealthUnknown HealthStatus = "unknown" // HealthHealthy indicates the component is operating normally. // All health checks are passing and the component is fully functional. HealthHealthy HealthStatus = "healthy" // HealthDegraded indicates the component has some issues but is still functional. // Some non-critical features may be impaired but core functionality works. HealthDegraded HealthStatus = "degraded" // HealthUnhealthy indicates the component has significant issues. // Core functionality may be impaired and manual intervention may be required. HealthUnhealthy HealthStatus = "unhealthy" // HealthChecking indicates a health check is currently in progress. // This is a transient state during health check execution. HealthChecking HealthStatus = "checking" )
type LifecycleTools ¶
type LifecycleTools struct {
// Start specifies the tool to call when starting a service instance.
// This tool is responsible for initializing and launching the service.
Start ToolCall `yaml:"start" json:"start"`
// Stop specifies the tool to call when stopping a service instance.
// This tool should gracefully shut down the service and clean up resources.
Stop ToolCall `yaml:"stop" json:"stop"`
// Restart specifies the tool to call when restarting a service instance.
// If not provided, restart operations will use Stop followed by Start.
Restart *ToolCall `yaml:"restart,omitempty" json:"restart,omitempty"`
// HealthCheck specifies the tool to call for health monitoring with expectation evaluation.
// This tool supports condition matching similar to workflow step conditions.
HealthCheck *HealthCheckToolCall `yaml:"healthCheck,omitempty" json:"healthCheck,omitempty"`
// Status specifies the tool to call for retrieving detailed service status.
// This tool should return comprehensive information about the service's current state.
Status *ToolCall `yaml:"status,omitempty" json:"status,omitempty"`
}
LifecycleTools maps service lifecycle events to aggregator tools. This structure defines which tools should be called for each lifecycle operation, enabling the orchestrator to manage services without knowing about specific implementations.
type ListWorkflowExecutionsRequest ¶
type ListWorkflowExecutionsRequest struct {
// WorkflowName filters executions to only those from the specified workflow (optional)
WorkflowName string `json:"workflow_name,omitempty"`
// Status filters executions to only those with the specified status (optional)
Status WorkflowExecutionStatus `json:"status,omitempty"`
// Limit is the maximum number of executions to return (default: 50, max: 1000)
Limit int `json:"limit,omitempty"`
// Offset is the number of executions to skip for pagination (default: 0)
Offset int `json:"offset,omitempty"`
}
ListWorkflowExecutionsRequest represents a request to list workflow executions with optional filtering and pagination args.
This request structure enables efficient querying of execution history with support for filtering by workflow name and status, plus pagination for handling large execution datasets.
type ListWorkflowExecutionsResponse ¶
type ListWorkflowExecutionsResponse struct {
// Executions contains the list of execution records (summary information only)
Executions []WorkflowExecutionSummary `json:"executions"`
// Total is the total number of executions matching the filter criteria
Total int `json:"total"`
// Limit is the maximum number of executions returned in this response
Limit int `json:"limit"`
// Offset is the number of executions skipped in this response
Offset int `json:"offset"`
// HasMore indicates whether there are more executions available
HasMore bool `json:"has_more"`
}
ListWorkflowExecutionsResponse represents the response from listing workflow executions. This provides paginated execution results with metadata for navigation.
The response includes both the execution data and pagination metadata to enable efficient client-side handling of large execution datasets.
type MCPServer ¶
type MCPServer struct {
// Name is the unique identifier for this MCP server instance.
// This name is used for registration, lookup, and management operations.
Name string `yaml:"name" json:"name"`
// Type specifies how this MCP server should be executed.
// Supported values: "stdio" for local processes, "streamable-http" for HTTP-based servers, "sse" for Server-Sent Events
Type MCPServerType `yaml:"type" json:"type"`
// ToolPrefix is an optional prefix that will be prepended to all tool names
// provided by this MCP server. This helps avoid naming conflicts when multiple
// servers provide tools with similar names.
ToolPrefix string `yaml:"toolPrefix,omitempty" json:"toolPrefix,omitempty"`
// AutoStart determines whether this MCP server should be automatically started
// when the muster system initializes or when dependencies become available.
AutoStart bool `yaml:"autoStart,omitempty" json:"autoStart,omitempty"`
// Command specifies the executable path for stdio type servers.
// This field is required when Type is "stdio".
Command string `yaml:"command,omitempty" json:"command,omitempty"`
// Args specifies the command line arguments for stdio type servers.
// This field is only available when Type is "stdio".
Args []string `yaml:"args,omitempty" json:"args,omitempty"`
// URL is the endpoint where the remote MCP server can be reached
// This field is required when Type is "streamable-http" or "sse".
// Examples: http://mcp-server:8080/mcp, https://api.example.com/mcp
URL string `yaml:"url,omitempty" json:"url,omitempty"`
// Env contains environment variables to set for the MCP server.
// For stdio servers, these are passed to the process when it is started.
// For remote servers, these can be used for authentication or configuration.
Env map[string]string `yaml:"env,omitempty" json:"env,omitempty"`
// Headers contains HTTP headers to send with requests to remote MCP servers.
// This field is only relevant when Type is "streamable-http" or "sse".
Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"`
// Auth configures authentication behavior for this MCP server.
// This is only relevant for remote servers (streamable-http or sse).
Auth *MCPServerAuth `yaml:"auth,omitempty" json:"auth,omitempty"`
// Timeout specifies the connection timeout for remote operations (in seconds)
Timeout int `yaml:"timeout,omitempty" json:"timeout,omitempty"`
// Error contains any error message from the most recent server operation.
// This is runtime information and not persisted to YAML files.
Error string `json:"error,omitempty" yaml:"-"`
// Description provides a human-readable description of this MCP server's purpose.
// This is runtime information populated from server metadata and not persisted to YAML.
Description string `json:"description,omitempty" yaml:"-"`
}
MCPServer represents a single MCP (Model Context Protocol) server definition and runtime state. It consolidates MCPServerDefinition, MCPServerInfo, and MCPServerConfig into a unified type that can be used for both configuration persistence (YAML) and API responses (JSON).
MCP servers provide tools and capabilities to the muster system through the aggregator. They are configured as stdio processes or remote HTTP endpoints with their own specific configuration requirements and runtime characteristics.
type MCPServerAuth ¶
type MCPServerAuth struct {
// Type specifies the authentication type.
// Supported values:
// - "oauth": OAuth 2.0/OIDC authentication
// - "teleport": Teleport Application Access with Machine ID certificates
// - "none": No authentication
Type string `yaml:"type,omitempty" json:"type,omitempty"`
// ForwardToken enables SSO via Token Forwarding.
// When true, muster forwards its own ID token (obtained when user authenticated
// TO muster) to this downstream server. The downstream server must be configured
// to trust muster's OAuth client ID in its TrustedAudiences configuration.
//
// Use ForwardToken when:
// - Muster itself is OAuth-protected (oauth_server enabled)
// - The downstream server trusts muster as an identity relay
// - You want users to authenticate once to muster for all downstream access
ForwardToken bool `yaml:"forwardToken,omitempty" json:"forwardToken,omitempty"`
// RequiredAudiences specifies additional audience(s) that the SSO token should contain.
// This is used with both Token Forwarding and Token Exchange SSO methods.
//
// When the downstream server requires tokens with specific audiences (e.g., Kubernetes
// OIDC authentication), specify them here:
// requiredAudiences:
// - "dex-k8s-authenticator"
//
// For Token Forwarding (forwardToken: true):
// - At session initialization, muster collects all requiredAudiences from MCPServers
// - These are requested from muster's IdP using cross-client scopes
// - The resulting multi-audience ID token is forwarded to downstream servers
//
// For Token Exchange (tokenExchange.enabled: true):
// - The audiences are appended as cross-client scopes to the token exchange request
// - The remote IdP issues a token containing the requested audiences
RequiredAudiences []string `yaml:"requiredAudiences,omitempty" json:"requiredAudiences,omitempty"`
// TokenExchange enables SSO via RFC 8693 Token Exchange for cross-cluster SSO.
// When configured, muster exchanges its local token for a token valid on the
// remote cluster's Identity Provider (e.g., Dex).
//
// Use TokenExchange when:
// - The remote cluster has its own Dex instance
// - The remote Dex is configured with an OIDC connector for muster's Dex
// - You need a token issued by the remote cluster's IdP (not just forwarded)
//
// Token exchange takes precedence over ForwardToken if both are configured.
TokenExchange *TokenExchangeConfig `yaml:"tokenExchange,omitempty" json:"tokenExchange,omitempty"`
// Teleport configures Teleport authentication for accessing private installations.
// This is only used when Type is "teleport".
//
// When configured, muster uses Teleport Machine ID certificates to establish
// mutual TLS connections to MCP servers accessible via Teleport Application Access.
//
// The Teleport identity files (tls.crt, tls.key, ca.crt) are typically:
// - In Kubernetes: Mounted from a Secret managed by tbot
// - In filesystem mode: Read directly from the tbot output directory
Teleport *TeleportAuth `yaml:"teleport,omitempty" json:"teleport,omitempty"`
}
MCPServerAuth configures authentication behavior for an MCP server.
Muster supports three distinct authentication mechanisms:
SSO Token Forwarding: Muster forwards its own ID token to downstream servers. Enable with ForwardToken: true. Requires downstream to trust muster's client ID.
SSO Token Exchange (RFC 8693): Muster exchanges its token for one valid on the remote cluster's Dex. Enable with TokenExchange config. Requires the remote Dex to have an OIDC connector configured for the local cluster's Dex.
Teleport Authentication: Muster uses Teleport Machine ID certificates to access private installations via Teleport Application Access. Enable with Type: "teleport" and configure Teleport settings.
type MCPServerCreateRequest ¶
type MCPServerCreateRequest struct {
// Name is the unique identifier for the MCP server (required).
Name string `json:"name" validate:"required"`
// Type specifies the MCP server type (required).
// Valid values: "stdio", "streamable-http", "sse"
Type string `json:"type" validate:"required"`
// ToolPrefix is prepended to all tool names from this server to avoid conflicts.
// Optional; if not specified, tools are exposed with their original names.
ToolPrefix string `json:"toolPrefix,omitempty"`
// Description for the MCP server
Description string `json:"description,omitempty"`
// AutoStart determines whether this MCP server should be automatically started
AutoStart bool `json:"autoStart,omitempty"`
// Command specifies the executable path for stdio type servers.
// This field is required when Type is "stdio".
Command string `json:"command,omitempty"`
// Args specifies the command line arguments for stdio type servers.
// This field is only available when Type is "stdio".
Args []string `json:"args,omitempty"`
// URL is the endpoint where the remote MCP server can be reached
// This field is required when Type is "streamable-http" or "sse".
URL string `json:"url,omitempty"`
// Env contains environment variables to set for the MCP server.
Env map[string]string `json:"env,omitempty"`
// Headers contains HTTP headers to send with requests to remote MCP servers.
// This field is only relevant when Type is "streamable-http" or "sse".
Headers map[string]string `json:"headers,omitempty"`
// Timeout specifies the connection timeout for remote operations (in seconds)
Timeout int `json:"timeout,omitempty"`
// Auth configures authentication behavior for this MCP server.
// This is only relevant for remote servers (streamable-http or sse).
Auth *MCPServerAuth `json:"auth,omitempty"`
}
type MCPServerInfo ¶
type MCPServerInfo struct {
// Name is the unique identifier for this MCP server instance.
Name string `json:"name"`
// Type indicates the execution model for this server (stdio, streamable-http, or sse).
Type string `json:"type"`
// Description provides a human-readable description of the server's purpose and capabilities.
Description string `json:"description,omitempty"`
// AutoStart determines whether this MCP server should be automatically started
AutoStart bool `json:"autoStart,omitempty"`
// Command specifies the executable path for stdio type servers.
Command string `json:"command,omitempty"`
// Args specifies the command line arguments for stdio type servers.
Args []string `json:"args,omitempty"`
// URL is the endpoint where the remote MCP server can be reached
URL string `json:"url,omitempty"`
// Env contains environment variables to set for the MCP server.
Env map[string]string `json:"env,omitempty"`
// Headers contains HTTP headers to send with requests to remote MCP servers.
Headers map[string]string `json:"headers,omitempty"`
// Auth configures authentication behavior for this MCP server.
Auth *MCPServerAuth `json:"auth,omitempty"`
// Timeout specifies the connection timeout for remote operations (in seconds)
Timeout int `json:"timeout,omitempty"`
// ToolPrefix is an optional prefix for tool names.
ToolPrefix string `json:"toolPrefix,omitempty"`
// Error contains any error message from recent server operations.
// This field is populated if the server is in an error state.
Error string `json:"error,omitempty"`
// State represents the high-level infrastructure state of the MCP server.
// This is the primary status indicator.
// Possible values for stdio servers: Running, Starting, Stopped, Failed
// Possible values for remote servers: Connected, Connecting, Disconnected, Failed
// Note: State reflects infrastructure availability only. Per-user session state
// (auth status, connection status) is tracked in the Session Registry.
State string `json:"state,omitempty"`
// StatusMessage provides a user-friendly, actionable message about the server's status.
// This field is populated based on the server's state and error information.
// Examples:
// - "Server is running normally"
// - "Authentication required - run: muster auth login --server <name>"
// - "Cannot reach server - check network connectivity"
// - "Certificate error - verify TLS configuration"
StatusMessage string `json:"statusMessage,omitempty"`
// ConsecutiveFailures tracks the number of consecutive connection failures.
// Used for exponential backoff and to identify unreachable servers.
ConsecutiveFailures int `json:"consecutiveFailures,omitempty"`
// LastAttempt indicates when the last connection attempt was made.
LastAttempt *time.Time `json:"lastAttempt,omitempty"`
// NextRetryAfter indicates the earliest time when the next retry should be attempted.
NextRetryAfter *time.Time `json:"nextRetryAfter,omitempty"`
// SessionStatus represents the per-user session connection status.
// This is only populated when the request includes a session context.
// Possible values: connected, disconnected, pending_auth, failed
// Empty if no session context is available.
SessionStatus string `json:"sessionStatus,omitempty"`
// SessionAuth represents the per-user authentication status for this server.
// This is only populated when the request includes a session context.
// Possible values: authenticated, auth_required, token_expired, unknown
// Empty if no session context is available or auth is not required.
SessionAuth string `json:"sessionAuth,omitempty"`
// ToolsCount is the number of tools available from this server for the current session.
// This is session-specific as OAuth-protected servers may expose different tools
// based on user permissions.
ToolsCount int `json:"toolsCount,omitempty"`
// ConnectedAt indicates when the current session connected to this server.
// Only populated if there is an active session connection.
ConnectedAt *time.Time `json:"connectedAt,omitempty"`
}
MCPServerInfo contains consolidated MCP server information for API responses. This type is used when returning server information through the API, providing a flattened view of server configuration and runtime state that is convenient for clients and user interfaces.
type MCPServerManagerHandler ¶
type MCPServerManagerHandler interface {
// ListMCPServers returns information about all registered MCP servers.
// This includes both configuration and runtime state information for each server.
//
// Returns:
// - []MCPServerInfo: Slice of server information (empty if no servers exist)
ListMCPServers() []MCPServerInfo
// GetMCPServer retrieves detailed information about a specific MCP server.
// This includes both configuration and runtime state for the requested server.
//
// Args:
// - name: The unique name of the MCP server to retrieve
//
// Returns:
// - *MCPServerInfo: Server information, or nil if server not found
// - error: nil on success, or an error if the server could not be retrieved
GetMCPServer(name string) (*MCPServerInfo, error)
// ToolProvider interface for exposing MCP server management tools.
// This allows MCP server operations to be performed through the aggregator
// tool system, enabling programmatic and user-driven server management.
ToolProvider
}
MCPServerManagerHandler defines the interface for MCP server management operations. This interface provides the core functionality for managing MCP server lifecycle, configuration, and tool availability. It also implements the ToolProvider interface to expose MCP server management capabilities as tools that can be called through the aggregator.
func GetMCPServerManager ¶
func GetMCPServerManager() MCPServerManagerHandler
GetMCPServerManager returns the registered MCP server manager handler. This provides access to MCP server definition management and lifecycle operations.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- MCPServerManagerHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
manager := api.GetMCPServerManager()
if manager == nil {
return fmt.Errorf("MCP server manager not available")
}
servers := manager.ListMCPServers()
type MCPServerType ¶
type MCPServerType string
MCPServerType defines the execution model for an MCP server. This determines how the server process is managed and what configuration options are available for server deployment.
const ( // MCPServerTypeStdio indicates that the MCP server should be run as a local process. // Stdio servers are started using the configured command and arguments, // with communication typically happening over stdin/stdout. MCPServerTypeStdio MCPServerType = "stdio" // MCPServerTypeStreamableHTTP indicates that the MCP server should be accessed via HTTP. // StreamableHTTP servers are accessed via HTTP/HTTPS endpoints with streaming support. MCPServerTypeStreamableHTTP MCPServerType = "streamable-http" // MCPServerTypeSSE indicates that the MCP server should be accessed via Server-Sent Events. // SSE servers are accessed via HTTP/HTTPS endpoints using Server-Sent Events for communication. MCPServerTypeSSE MCPServerType = "sse" )
func (MCPServerType) IsRemote ¶
func (t MCPServerType) IsRemote() bool
IsRemote returns true if the server type is a remote (HTTP-based) server. Remote servers use connected/disconnected states rather than running/stopped.
type MCPServerUpdateRequest ¶
type MCPServerUpdateRequest struct {
// Name of the MCP server to update (required).
Name string `json:"name" validate:"required"`
// Type can be changed, but may require significant reconfiguration.
Type string `json:"type" validate:"required"`
// ToolPrefix can be updated, affecting tool naming.
ToolPrefix string `json:"toolPrefix,omitempty"`
// Description for the MCP server
Description string `json:"description,omitempty"`
// AutoStart determines whether this MCP server should be automatically started
AutoStart bool `json:"autoStart,omitempty"`
// Command specifies the executable path for stdio type servers.
Command string `json:"command,omitempty"`
// Args specifies the command line arguments for stdio type servers.
Args []string `json:"args,omitempty"`
// URL is the endpoint where the remote MCP server can be reached
URL string `json:"url,omitempty"`
// Env contains environment variables to set for the MCP server.
Env map[string]string `json:"env,omitempty"`
// Headers contains HTTP headers to send with requests to remote MCP servers.
Headers map[string]string `json:"headers,omitempty"`
// Timeout specifies the connection timeout for remote operations (in seconds)
Timeout int `json:"timeout,omitempty"`
// Auth configures authentication behavior for this MCP server.
Auth *MCPServerAuth `json:"auth,omitempty"`
}
MCPServerUpdateRequest represents a request to update an existing MCP server definition. All fields except Name can be modified. Changes may require server restart.
type MCPServerValidateRequest ¶
type MCPServerValidateRequest struct {
// Name for validation (required).
Name string `json:"name" validate:"required"`
// Type for validation (required).
Type string `json:"type" validate:"required"`
// ToolPrefix for validation.
ToolPrefix string `json:"toolPrefix,omitempty"`
// AutoStart determines whether this MCP server should be automatically started
AutoStart bool `json:"autoStart,omitempty"`
// Command specifies the executable path for stdio type servers.
Command string `json:"command,omitempty"`
// Args specifies the command line arguments for stdio type servers.
Args []string `json:"args,omitempty"`
// URL is the endpoint where the remote MCP server can be reached
URL string `json:"url,omitempty"`
// Env contains environment variables to set for the MCP server.
Env map[string]string `json:"env,omitempty"`
// Headers contains HTTP headers to send with requests to remote MCP servers.
Headers map[string]string `json:"headers,omitempty"`
// Timeout specifies the connection timeout for remote operations (in seconds)
Timeout int `json:"timeout,omitempty"`
// Description for validation and documentation.
Description string `json:"description,omitempty"`
// Auth configures authentication behavior for this MCP server.
Auth *MCPServerAuth `json:"auth,omitempty"`
}
MCPServerValidateRequest represents a request to validate an MCP server definition without creating it. Validates configuration consistency and tool availability.
type MetaToolsDataProvider ¶
type MetaToolsDataProvider interface {
// ListToolsForContext returns all available tools for the current session context.
// The returned tools are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Tool: List of available tools for the session
ListToolsForContext(ctx context.Context) []mcp.Tool
// CallToolInternal executes a tool by name with the provided arguments.
// This bypasses the MCP protocol layer and calls tools directly.
//
// Args:
// - ctx: Context for the operation
// - name: Name of the tool to execute
// - args: Arguments to pass to the tool
//
// Returns:
// - *mcp.CallToolResult: The result of the tool execution
// - error: Error if execution fails
CallToolInternal(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error)
// ListResourcesForContext returns all available resources for the current session context.
// The returned resources are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Resource: List of available resources for the session
ListResourcesForContext(ctx context.Context) []mcp.Resource
// ReadResource retrieves the contents of a resource by URI.
//
// Args:
// - ctx: Context for the operation
// - uri: URI of the resource to retrieve
//
// Returns:
// - *mcp.ReadResourceResult: The resource contents
// - error: Error if retrieval fails
ReadResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)
// ListPromptsForContext returns all available prompts for the current session context.
// The returned prompts are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Prompt: List of available prompts for the session
ListPromptsForContext(ctx context.Context) []mcp.Prompt
// GetPrompt executes a prompt with the provided arguments.
//
// Args:
// - ctx: Context for the operation
// - name: Name of the prompt to execute
// - args: Arguments to pass to the prompt (as string values)
//
// Returns:
// - *mcp.GetPromptResult: The prompt result with messages
// - error: Error if execution fails
GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)
// ListServersRequiringAuth returns a list of servers that require authentication
// for the current session. This enables the list_tools meta-tool to inform users
// about servers that are available but require authentication before their tools
// become visible.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []ServerAuthInfo: List of servers requiring authentication
ListServersRequiringAuth(ctx context.Context) []ServerAuthInfo
}
MetaToolsDataProvider provides data access for meta-tools. This interface is implemented by the aggregator and used by the metatools adapter to access tools, resources, and prompts from the aggregated servers.
The interface is designed to support session-scoped visibility, where different sessions may see different tools based on their authentication state (OAuth). Context is used to pass session information for proper scoping.
func GetMetaToolsDataProvider ¶
func GetMetaToolsDataProvider() MetaToolsDataProvider
GetMetaToolsDataProvider returns the registered meta-tools data provider. This provides access to the underlying data layer (typically the aggregator) for listing and accessing tools, resources, and prompts.
Returns nil if no provider has been registered yet.
Returns:
- MetaToolsDataProvider: The registered provider, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
type MetaToolsHandler ¶
type MetaToolsHandler interface {
// ListTools returns all available tools for the current session.
// The returned tools are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Tool: List of available tools
// - error: Error if listing fails
ListTools(ctx context.Context) ([]mcp.Tool, error)
// CallTool executes a tool by name with the provided arguments.
// This is the primary interface for tool execution through meta-tools.
//
// Args:
// - ctx: Context for the operation
// - name: Name of the tool to execute
// - args: Arguments to pass to the tool
//
// Returns:
// - *mcp.CallToolResult: The result of the tool execution
// - error: Error if execution fails
CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error)
// ListResources returns all available resources for the current session.
// The returned resources are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Resource: List of available resources
// - error: Error if listing fails
ListResources(ctx context.Context) ([]mcp.Resource, error)
// GetResource retrieves the contents of a resource by URI.
//
// Args:
// - ctx: Context for the operation
// - uri: URI of the resource to retrieve
//
// Returns:
// - *mcp.ReadResourceResult: The resource contents
// - error: Error if retrieval fails
GetResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)
// ListPrompts returns all available prompts for the current session.
// The returned prompts are session-aware based on authentication state.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []mcp.Prompt: List of available prompts
// - error: Error if listing fails
ListPrompts(ctx context.Context) ([]mcp.Prompt, error)
// GetPrompt executes a prompt with the provided arguments.
//
// Args:
// - ctx: Context for the operation
// - name: Name of the prompt to execute
// - args: Arguments to pass to the prompt (as string values)
//
// Returns:
// - *mcp.GetPromptResult: The prompt result with messages
// - error: Error if execution fails
GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)
// ListServersRequiringAuth returns a list of servers that require authentication
// for the current session. This enables the list_tools meta-tool to inform users
// about servers that are available but require authentication.
//
// Args:
// - ctx: Context containing session information
//
// Returns:
// - []ServerAuthInfo: List of servers requiring authentication
ListServersRequiringAuth(ctx context.Context) []ServerAuthInfo
}
MetaToolsHandler provides server-side meta-tool functionality for AI assistants. It enables tool discovery, execution, and resource/prompt access through a unified interface that can be used by the metatools package.
The handler provides the data access layer that the metatools package handlers use to retrieve and manipulate tools, resources, and prompts. It abstracts the underlying aggregator implementation, following the API service locator pattern.
This interface will be implemented by the aggregator or a dedicated adapter in a future integration issue.
func GetMetaTools ¶
func GetMetaTools() MetaToolsHandler
GetMetaTools returns the registered meta-tools handler. This provides access to server-side meta-tool functionality for tool discovery, execution, and resource/prompt access.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- MetaToolsHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
metaToolsHandler := api.GetMetaTools()
if metaToolsHandler == nil {
return fmt.Errorf("meta-tools handler not available")
}
tools, err := metaToolsHandler.ListTools(ctx)
type NotFoundError ¶
type NotFoundError struct {
// ResourceType categorizes the type of resource that was not found
// (e.g., "workflow", "serviceclass", "service")
ResourceType string
// ResourceName is the specific identifier of the resource that was not found
ResourceName string
// Message provides a custom error message if the default format is insufficient
Message string
}
NotFoundError represents a resource not found error with contextual information. This standardized error type provides consistent error handling across all API operations for cases where requested resources don't exist in the system.
The error includes resource type and name for precise error reporting and supports custom error messages for specific use cases.
func NewNotFoundError ¶
func NewNotFoundError(resourceType, resourceName string) *NotFoundError
IsNotFound checks if an error is a NotFoundError using error unwrapping. This function provides a type-safe way to check for not found conditions in error handling code, supporting wrapped errors.
Args:
- err: The error to check
Returns:
- bool: true if the error is or wraps a NotFoundError, false otherwise
Example:
NewNotFoundError creates a new NotFoundError with the specified resource type and name. This is the standard way to create not found errors throughout the API.
Args:
- resourceType: The category of resource (e.g., "workflow", "service")
- resourceName: The specific identifier of the resource
Returns:
- *NotFoundError: A new NotFoundError instance
Example:
return api.NewNotFoundError("workflow", "deploy-app")
func (*NotFoundError) Error ¶
func (e *NotFoundError) Error() string
Error implements the error interface for NotFoundError. Returns either the custom message if provided, or a formatted default message using the resource type and name.
Returns:
- string: The error message describing the not found condition
type OAuthHandler ¶
type OAuthHandler interface {
// IsEnabled returns whether OAuth proxy functionality is active.
IsEnabled() bool
// GetToken retrieves a valid token for the given session and server.
// Returns nil if no valid token exists.
GetToken(sessionID, serverName string) *OAuthToken
// GetTokenByIssuer retrieves a valid token for the given session and issuer.
// This is used for SSO when we have the issuer from a 401 response.
GetTokenByIssuer(sessionID, issuer string) *OAuthToken
// GetFullTokenByIssuer retrieves the full token (including ID token if available)
// for the given session and issuer. Returns nil if no valid token exists.
// The IDToken field may be empty if the token was obtained without an ID token.
GetFullTokenByIssuer(sessionID, issuer string) *OAuthToken
// FindTokenWithIDToken searches for any token in the session that has an ID token.
// This is used as a fallback when the muster issuer is not explicitly configured.
// Returns the first token found with an ID token, or nil if none exists.
FindTokenWithIDToken(sessionID string) *OAuthToken
// StoreToken persists a token for the given session and issuer.
// This is the write path used by mcp-go's transport.TokenStore.SaveToken()
// after a successful token refresh.
StoreToken(sessionID, issuer string, token *OAuthToken)
// ClearTokenByIssuer removes all tokens for a given session and issuer.
// This is used to clear invalid/expired tokens before requesting fresh authentication.
ClearTokenByIssuer(sessionID, issuer string)
// CreateAuthChallenge creates an authentication challenge for a 401 response.
// Returns the challenge containing the auth URL for the user to visit.
CreateAuthChallenge(ctx context.Context, sessionID, serverName, issuer, scope string) (*AuthChallenge, error)
// GetHTTPHandler returns the HTTP handler for OAuth callback endpoints.
GetHTTPHandler() http.Handler
// GetCallbackPath returns the configured callback path (e.g., "/oauth/proxy/callback").
GetCallbackPath() string
// GetCIMDPath returns the path for serving the CIMD (e.g., "/.well-known/oauth-client.json").
GetCIMDPath() string
// ShouldServeCIMD returns true if muster should serve its own CIMD.
ShouldServeCIMD() bool
// GetCIMDHandler returns the HTTP handler for serving the CIMD.
GetCIMDHandler() http.HandlerFunc
// RegisterServer registers OAuth configuration for a remote MCP server.
RegisterServer(serverName, issuer, scope string)
// SetAuthCompletionCallback sets the callback to be called after successful authentication.
// The aggregator uses this to establish session connections after browser OAuth completes.
SetAuthCompletionCallback(callback AuthCompletionCallback)
// ExchangeTokenForRemoteCluster exchanges a local token for one valid on a remote cluster.
// This implements RFC 8693 Token Exchange for cross-cluster SSO scenarios.
//
// Args:
// - ctx: Context for the operation
// - localToken: The local ID token to exchange
// - userID: The user's unique identifier (from validated JWT 'sub' claim)
// - config: Token exchange configuration for the remote cluster
//
// Returns the exchanged access token, or an error if exchange fails.
ExchangeTokenForRemoteCluster(ctx context.Context, localToken, userID string, config *TokenExchangeConfig) (string, error)
// ExchangeTokenForRemoteClusterWithClient exchanges a local token for one valid on a remote cluster
// using a custom HTTP client. This is used when the token exchange endpoint is accessed via
// Teleport Application Access, which requires mutual TLS authentication.
//
// The httpClient parameter should be configured with the appropriate TLS certificates
// (e.g., Teleport Machine ID certificates). If nil, uses the default HTTP client.
//
// Args:
// - ctx: Context for the operation
// - localToken: The local ID token to exchange
// - userID: The user's unique identifier (from validated JWT 'sub' claim)
// - config: Token exchange configuration for the remote cluster
// - httpClient: Custom HTTP client with Teleport TLS certificates (or nil for default)
//
// Returns the exchanged access token, or an error if exchange fails.
ExchangeTokenForRemoteClusterWithClient(ctx context.Context, localToken, userID string, config *TokenExchangeConfig, httpClient *http.Client) (string, error)
// Stop stops the OAuth handler and cleans up resources.
Stop()
}
OAuthHandler defines the interface for OAuth proxy functionality. This handler manages OAuth authentication flows for remote MCP servers, including token storage, authentication challenges, and callback handling.
The OAuth handler acts as a proxy, managing OAuth flows on behalf of users without exposing sensitive tokens to the Muster Agent.
func GetOAuthHandler ¶
func GetOAuthHandler() OAuthHandler
GetOAuthHandler returns the registered OAuth handler. This provides access to OAuth proxy functionality for remote MCP server authentication.
Returns nil if no handler has been registered yet or if OAuth is disabled. Callers should always check for nil before using the returned handler.
Returns:
- OAuthHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by oauthMutex read lock.
type OAuthToken ¶
type OAuthToken struct {
// AccessToken is the bearer token used for authorization.
AccessToken string `json:"access_token"`
// TokenType is typically "Bearer".
TokenType string `json:"token_type"`
// RefreshToken is used to obtain new access tokens.
// Required by mcp-go's transport layer for automatic token refresh.
RefreshToken string `json:"refresh_token,omitempty"`
// ExpiresAt is the calculated expiration timestamp.
// Required by mcp-go's transport layer to decide when to refresh.
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Scope is the granted scope(s).
Scope string `json:"scope,omitempty"`
// IDToken is the OIDC ID token (if available).
// Used for SSO token forwarding to downstream MCP servers.
IDToken string `json:"id_token,omitempty"`
// Issuer is the token issuer (Identity Provider URL).
// Used for SSO detection and token forwarding.
Issuer string `json:"issuer,omitempty"`
}
OAuthToken represents an OAuth access token for use by handlers.
type ObjectReference ¶
type ObjectReference struct {
// APIVersion is the API version of the object (e.g., "muster.giantswarm.io/v1alpha1")
APIVersion string `json:"apiVersion,omitempty"`
// Kind is the kind of the object (e.g., "MCPServer", "ServiceClass", "Workflow")
Kind string `json:"kind"`
// Name is the name of the object
Name string `json:"name"`
// Namespace is the namespace of the object (required for namespaced objects)
Namespace string `json:"namespace"`
// UID is the unique identifier of the object (optional, helps with precision)
UID string `json:"uid,omitempty"`
}
ObjectReference represents a reference to a Kubernetes object for event creation. This structure is used to identify the object that an event relates to.
type OperationDefinition ¶
type OperationDefinition struct {
// Description provides human-readable documentation for the operation's purpose
Description string `yaml:"description" json:"description"`
// Args defines the input arguments accepted by this operation.
// Used for validation and documentation generation.
Args map[string]Arg `yaml:"args" json:"args"`
// Requires lists the tools or capabilities that must be available for this operation.
// This is used for availability checking and dependency validation.
Requires []string `yaml:"requires" json:"requires"`
}
OperationDefinition defines an operation that can be performed within a workflow. Operations represent discrete actions that can be invoked, with their own argument requirements and execution logic (either direct workflow calls or references).
type OrchestratorAPI ¶
type OrchestratorAPI interface {
// StartService initiates the startup process for the specified service.
// It returns an error if the service cannot be started or if the service manager
// is not available.
//
// Args:
// - name: The unique name of the service to start
//
// Returns:
// - error: nil on success, or an error describing why the service could not be started
StartService(name string) error
// StopService initiates the shutdown process for the specified service.
// It returns an error if the service cannot be stopped or if the service manager
// is not available.
//
// Args:
// - name: The unique name of the service to stop
//
// Returns:
// - error: nil on success, or an error describing why the service could not be stopped
StopService(name string) error
// RestartService performs a stop followed by a start operation on the specified service.
// This is equivalent to calling StopService followed by StartService, but may be
// implemented more efficiently by the underlying service manager.
//
// Args:
// - name: The unique name of the service to restart
//
// Returns:
// - error: nil on success, or an error describing why the service could not be restarted
RestartService(name string) error
// GetServiceStatus retrieves the current status and metadata for a specific service.
// The returned ServiceStatus includes state, health, error information, and metadata.
//
// Args:
// - name: The unique name of the service to query
//
// Returns:
// - *ServiceStatus: Current status of the service, or nil if service not found
// - error: nil on success, or an error if the service could not be found or queried
GetServiceStatus(name string) (*ServiceStatus, error)
// GetAllServices returns the status of all registered services in the system.
// This provides a snapshot of the current state of all services managed by
// the orchestrator.
//
// Returns:
// - []ServiceStatus: Slice containing status of all services (empty if none exist)
GetAllServices() []ServiceStatus
// SubscribeToStateChanges returns a channel that receives service state change events.
// Clients can listen to this channel to be notified when any service changes state.
// The channel will be closed if the service manager becomes unavailable.
//
// Returns:
// - <-chan ServiceStateChangedEvent: Read-only channel for receiving state change events
SubscribeToStateChanges() <-chan ServiceStateChangedEvent
}
OrchestratorAPI defines the interface for orchestrating service lifecycle management. It provides methods to start, stop, restart services, check their status, and subscribe to state change events. The orchestrator acts as a high-level controller that coordinates service operations through the underlying service management infrastructure.
func NewOrchestratorAPI ¶
func NewOrchestratorAPI() OrchestratorAPI
NewOrchestratorAPI creates a new orchestrator API instance. The returned instance uses the API service locator pattern to access service management functionality without direct coupling to implementations.
Returns:
- OrchestratorAPI: A new orchestrator API instance
type ReconcileManagerHandler ¶
type ReconcileManagerHandler interface {
// IsRunning returns whether the reconciliation manager is active.
IsRunning() bool
// GetQueueLength returns the current number of items in the reconciliation queue.
GetQueueLength() int
// GetStatus returns the reconciliation status for a specific resource.
GetStatus(resourceType, name, namespace string) (*ReconcileStatusInfo, bool)
// GetAllStatuses returns all reconciliation statuses.
GetAllStatuses() []ReconcileStatusInfo
// TriggerReconcile manually triggers reconciliation for a resource.
TriggerReconcile(resourceType, name, namespace string)
// GetWatchMode returns the current watch mode (kubernetes/filesystem).
GetWatchMode() string
// GetEnabledResourceTypes returns the list of resource types with reconciliation enabled.
GetEnabledResourceTypes() []string
}
ReconcileManagerHandler provides access to reconciliation status and control. This handler enables monitoring and manual triggering of reconciliation operations for resources managed by the reconciliation system.
The reconciliation system ensures that resource definitions (CRDs or YAML files) are automatically synchronized with running services.
func GetReconcileManager ¶
func GetReconcileManager() ReconcileManagerHandler
GetReconcileManager returns the registered reconcile manager handler. This provides access to reconciliation status and control functionality.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- ReconcileManagerHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
type ReconcileOverview ¶
type ReconcileOverview struct {
// Running indicates whether the reconciliation system is active.
Running bool `json:"running"`
// WatchMode is the current mode (kubernetes or filesystem).
WatchMode string `json:"watch_mode"`
// QueueLength is the current number of items awaiting reconciliation.
QueueLength int `json:"queue_length"`
// EnabledResourceTypes lists the resource types being watched.
EnabledResourceTypes []string `json:"enabled_resource_types"`
// StatusSummary provides counts by state.
StatusSummary ReconcileStatusSummary `json:"status_summary"`
}
ReconcileOverview provides a summary of the reconciliation system status.
type ReconcileStatusInfo ¶
type ReconcileStatusInfo struct {
// ResourceType is the type of the resource (MCPServer, ServiceClass, Workflow).
ResourceType string `json:"resource_type"`
// Name is the name of the resource.
Name string `json:"name"`
// Namespace is the Kubernetes namespace (empty for filesystem mode).
Namespace string `json:"namespace,omitempty"`
// LastReconcileTime is when the resource was last successfully reconciled.
LastReconcileTime *string `json:"last_reconcile_time,omitempty"`
// LastError is the most recent error, if any.
LastError string `json:"last_error,omitempty"`
// RetryCount is the number of retry attempts.
RetryCount int `json:"retry_count"`
// State describes the current reconciliation state (Pending, Reconciling, Synced, Error, Failed).
State string `json:"state"`
}
ReconcileStatusInfo represents the reconciliation status for a resource. This is exposed via the API for observability.
type ReconcileStatusSummary ¶
type ReconcileStatusSummary struct {
// Total is the total number of tracked resources.
Total int `json:"total"`
// Synced is the count of successfully synced resources.
Synced int `json:"synced"`
// Pending is the count of resources awaiting reconciliation.
Pending int `json:"pending"`
// Reconciling is the count of resources currently being reconciled.
Reconciling int `json:"reconciling"`
// Error is the count of resources that failed but may be retried.
Error int `json:"error"`
// Failed is the count of resources that failed permanently.
Failed int `json:"failed"`
}
ReconcileStatusSummary provides aggregate counts of reconciliation states.
type SchemaProperty ¶
type SchemaProperty struct {
// Type specifies the JSON schema type for validation.
// Valid values: "string", "number", "boolean", "object", "array"
Type string `yaml:"type" json:"type"`
// Description provides human-readable documentation for this property,
// explaining its purpose and expected format
Description string `yaml:"description" json:"description"`
// Default specifies the default value used when the property is not provided.
// Must be compatible with the specified Type.
Default interface{} `yaml:"default,omitempty" json:"default,omitempty"`
}
SchemaProperty defines a single property in a JSON schema. This is used for input validation and documentation in workflows and capabilities, providing structured arg definition and validation rules.
Schema properties enable automatic validation of inputs and help generate helpful error messages and documentation for users.
type SecretCredentialsHandler ¶
type SecretCredentialsHandler interface {
// LoadClientCredentials loads OAuth client credentials from a Kubernetes secret.
//
// The secret should contain:
// - clientIdKey: The OAuth client ID (defaults to "client-id")
// - clientSecretKey: The OAuth client secret (defaults to "client-secret")
//
// Args:
// - ctx: Context for Kubernetes API calls
// - secretRef: Reference to the secret containing credentials
// - defaultNamespace: Namespace to use if not specified in secretRef
//
// Returns:
// - *ClientCredentials: The loaded credentials
// - error: Error if the secret cannot be found or is missing required keys
LoadClientCredentials(ctx context.Context, secretRef *ClientCredentialsSecretRef, defaultNamespace string) (*ClientCredentials, error)
}
SecretCredentialsHandler defines the interface for loading OAuth client credentials from Kubernetes secrets. This is used for token exchange authentication with remote Dex instances.
Implementations should:
- Load credentials from the specified Kubernetes secret
- Validate that required keys exist in the secret
- Handle missing secrets or keys gracefully with clear error messages
Thread-safe: All methods must be safe for concurrent use.
func GetSecretCredentialsHandler ¶
func GetSecretCredentialsHandler() SecretCredentialsHandler
GetSecretCredentialsHandler returns the registered secret credentials handler.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- SecretCredentialsHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by secretCredentialsMutex read lock.
type ServerAuthInfo ¶
type ServerAuthInfo struct {
// Name is the server name (e.g., "kubernetes", "github")
Name string `json:"name"`
// Status is the current auth status (typically "auth_required")
Status string `json:"status"`
// AuthTool is the tool to use for authentication (typically "core_auth_login")
AuthTool string `json:"auth_tool"`
}
ServerAuthInfo contains information about a server requiring authentication. This is used to inform users which servers need authentication via core_auth_login.
type ServiceClass ¶
type ServiceClass struct {
// Name is the unique identifier for this ServiceClass.
// This name is used when creating service instances and for ServiceClass management operations.
Name string `yaml:"name" json:"name"`
// Version specifies the version of this ServiceClass definition.
// This can be used for compatibility checks and configuration migration.
Version string `yaml:"version" json:"version"`
// Description provides a human-readable explanation of what this ServiceClass does
// and what kind of services can be created from it.
Description string `yaml:"description" json:"description"`
// Args defines the validation rules and metadata for service creation arguments.
// These definitions are used to validate arguments when creating service instances
// and to provide documentation for the service creation API.
Args map[string]ArgDefinition `yaml:"args,omitempty" json:"args,omitempty"`
// ServiceConfig contains the core configuration for service lifecycle management.
// This defines how services created from this class should be managed, including
// tool mappings, health checks, and operational args.
ServiceConfig ServiceConfig `yaml:"serviceConfig" json:"serviceConfig"`
// ServiceType indicates the general category of service that this class creates.
// This is runtime information derived from the ServiceConfig and used for categorization.
ServiceType string `json:"serviceType,omitempty" yaml:"-"`
// Available indicates whether this ServiceClass can currently be used to create service instances.
// This depends on the availability of required tools and other dependencies.
Available bool `json:"available,omitempty" yaml:"-"`
// State represents the current operational state of the ServiceClass itself.
// This is runtime information and not persisted to YAML files.
State string `json:"state,omitempty" yaml:"-"`
// Health indicates the health status of the ServiceClass.
// This reflects whether the class's dependencies and tools are functioning properly.
Health string `json:"health,omitempty" yaml:"-"`
// Error contains any error message related to this ServiceClass.
// This is populated if the class cannot be used due to missing tools or other issues.
Error string `json:"error,omitempty" yaml:"-"`
// CreateToolAvailable indicates whether the tool required for service creation is available.
// This is part of the tool availability assessment for the ServiceClass.
CreateToolAvailable bool `json:"createToolAvailable,omitempty" yaml:"-"`
// DeleteToolAvailable indicates whether the tool required for service deletion is available.
// This is part of the tool availability assessment for the ServiceClass.
DeleteToolAvailable bool `json:"deleteToolAvailable,omitempty" yaml:"-"`
// HealthCheckToolAvailable indicates whether the health check tool is available.
// This affects whether health monitoring can be performed for services created from this class.
HealthCheckToolAvailable bool `json:"healthCheckToolAvailable,omitempty" yaml:"-"`
// StatusToolAvailable indicates whether the status query tool is available.
// This affects whether detailed status information can be retrieved for services.
StatusToolAvailable bool `json:"statusToolAvailable,omitempty" yaml:"-"`
// RequiredTools lists all tools that must be available for this ServiceClass to function.
// This is computed from the ServiceConfig tool mappings and used for dependency checking.
RequiredTools []string `json:"requiredTools,omitempty" yaml:"-"`
// MissingTools lists any required tools that are currently not available.
// This is used to determine why a ServiceClass might not be available for use.
MissingTools []string `json:"missingTools,omitempty" yaml:"-"`
}
ServiceClass represents a service template that defines how to create and manage service instances. It consolidates ServiceClassDefinition, ServiceClassInfo, and ServiceClassConfig into a unified type that serves as both a configuration blueprint and runtime information container.
ServiceClasses are templates that define the structure, args, and lifecycle management for services. They specify which tools should be called for various lifecycle events and provide arg validation for service instance creation.
func (*ServiceClass) ValidateServiceArgs ¶
func (sc *ServiceClass) ValidateServiceArgs(args map[string]interface{}) error
ValidateServiceArgs validates service creation args against ServiceClass arg definitions. This method ensures that all required args are provided, that arg types are correct, and that no unknown args are specified. It also applies default values where appropriate.
The validation process: 1. Checks that all required args are provided 2. Applies default values for missing optional args 3. Validates arg types match their definitions 4. Rejects unknown args not defined in the ServiceClass
Args:
- args: The arg map to validate and potentially modify
Returns:
- error: nil if validation succeeds, or a descriptive error if validation fails
type ServiceClassCreateRequest ¶
type ServiceClassCreateRequest struct {
// Name is the unique identifier for the ServiceClass (required).
// Must be unique across all ServiceClasses. Should be descriptive
// and follow naming conventions.
Name string `json:"name" validate:"required"`
// Version indicates the ServiceClass version for compatibility tracking.
// Recommended to use semantic versioning for better change management.
Version string `json:"version,omitempty"`
// Description provides human-readable documentation for the ServiceClass.
// Should explain what type of service this class creates and its purpose.
Description string `json:"description,omitempty"`
// Args defines the validation rules and metadata for service creation arguments.
// These definitions are used to validate arguments when creating service instances
// and to provide documentation for the service creation API.
Args map[string]ArgDefinition `json:"args,omitempty"`
// ServiceConfig defines the service lifecycle management configuration (required).
// Specifies how services created from this class should be managed.
ServiceConfig ServiceConfig `json:"serviceConfig" validate:"required"`
}
ServiceClassCreateRequest represents a request to create a new ServiceClass definition. ServiceClasses serve as templates for creating service instances with predefined lifecycle tools, arg validation, and configuration.
Example:
request := ServiceClassCreateRequest{
Name: "postgres-database",
Version: "1.0",
Description: "PostgreSQL database service",
ServiceConfig: ServiceConfig{
ServiceType: "database",
DefaultName: "postgres",
LifecycleTools: LifecycleTools{
Start: ToolCall{
Tool: "docker_run",
Arguments: map[string]interface{}{
"image": "postgres:13",
},
},
},
},
}
type ServiceClassManagerHandler ¶
type ServiceClassManagerHandler interface {
// ListServiceClasses returns information about all available ServiceClasses.
// This includes both configuration and runtime availability information.
//
// Returns:
// - []ServiceClass: Slice of ServiceClass information (empty if none exist)
ListServiceClasses() []ServiceClass
// GetServiceClass retrieves detailed information about a specific ServiceClass.
// This includes configuration, runtime state, and tool availability information.
//
// Args:
// - name: The unique name of the ServiceClass to retrieve
//
// Returns:
// - *ServiceClass: ServiceClass information, or nil if not found
// - error: nil on success, or an error if the ServiceClass could not be retrieved
GetServiceClass(name string) (*ServiceClass, error)
// IsServiceClassAvailable checks whether a ServiceClass can currently be used.
// This verifies that all required tools are available and dependencies are met.
//
// Args:
// - name: The unique name of the ServiceClass to check
//
// Returns:
// - bool: true if the ServiceClass is available for use, false otherwise
IsServiceClassAvailable(name string) bool
// GetStartTool retrieves the tool configuration for starting services of this class.
// This provides the orchestrator with the information needed to start service instances.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - toolName: The name of the tool to call
// - arguments: Tool arguments to use
// - outputs: JSON path mappings to extract values from tool response
// - err: nil on success, or an error if the tool configuration could not be retrieved
GetStartTool(name string) (toolName string, args map[string]interface{}, outputs map[string]string, err error)
// GetStopTool retrieves the tool configuration for stopping services of this class.
// This provides the orchestrator with the information needed to stop service instances.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - toolName: The name of the tool to call
// - arguments: Tool arguments to use
// - outputs: JSON path mappings to extract values from tool response
// - err: nil on success, or an error if the tool configuration could not be retrieved
GetStopTool(name string) (toolName string, args map[string]interface{}, outputs map[string]string, err error)
// GetRestartTool retrieves the tool configuration for restarting services of this class.
// If no restart tool is configured, this may return an indication to use stop+start.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - toolName: The name of the tool to call
// - arguments: Tool arguments to use
// - outputs: JSON path mappings to extract values from tool response
// - err: nil on success, or an error if the tool configuration could not be retrieved
GetRestartTool(name string) (toolName string, args map[string]interface{}, outputs map[string]string, err error)
// GetHealthCheckTool retrieves the tool configuration for health checking services of this class.
// This provides the health monitoring system with the information needed to check service health.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - toolName: The name of the tool to call
// - arguments: Tool arguments to use
// - expectation: Health check expectation conditions for determining service health
// - err: nil on success, or an error if the tool configuration could not be retrieved
GetHealthCheckTool(name string) (toolName string, args map[string]interface{}, expectation *HealthCheckExpectation, err error)
// GetHealthCheckConfig retrieves the health check configuration for services of this class.
// This provides the health monitoring system with timing and threshold information.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - enabled: Whether health checking is enabled for this service class
// - interval: How often health checks should be performed
// - failureThreshold: Number of consecutive failures before marking unhealthy
// - successThreshold: Number of consecutive successes before marking healthy
// - err: nil on success, or an error if the configuration could not be retrieved
GetHealthCheckConfig(name string) (enabled bool, interval time.Duration, failureThreshold, successThreshold int, err error)
// GetServiceDependencies retrieves the list of dependencies for services of this class.
// This is used by the orchestrator to ensure proper startup ordering.
//
// Args:
// - name: The unique name of the ServiceClass
//
// Returns:
// - []string: List of dependency ServiceClass names
// - error: nil on success, or an error if dependencies could not be retrieved
GetServiceDependencies(name string) ([]string, error)
// ToolProvider interface for exposing ServiceClass management tools.
// This allows ServiceClass operations to be performed through the aggregator
// tool system, enabling programmatic and user-driven class management.
ToolProvider
}
ServiceClassManagerHandler defines the interface for service class management operations. This interface provides functionality for managing ServiceClass definitions, validating their availability, and accessing their configuration for service orchestration.
func GetServiceClassManager ¶
func GetServiceClassManager() ServiceClassManagerHandler
GetServiceClassManager returns the registered service class manager handler. This provides access to ServiceClass definition management and lifecycle tool access.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- ServiceClassManagerHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
manager := api.GetServiceClassManager()
if manager == nil {
return fmt.Errorf("service class manager not available")
}
classes := manager.ListServiceClasses()
type ServiceClassUpdateRequest ¶
type ServiceClassUpdateRequest struct {
// Name is the unique identifier of the ServiceClass to update (required).
Name string `json:"name" validate:"required"`
// Version can be updated to reflect changes in the ServiceClass.
Version string `json:"version,omitempty"`
// Description can be updated to improve documentation.
Description string `json:"description,omitempty"`
// Args defines the validation rules and metadata for service creation arguments.
// These definitions are used to validate arguments when creating service instances
// and to provide documentation for the service creation API.
Args map[string]ArgDefinition `json:"args,omitempty"`
// ServiceConfig can be updated to modify lifecycle behavior.
// Changes may affect existing service instances.
ServiceConfig ServiceConfig `json:"serviceConfig,omitempty"`
}
ServiceClassUpdateRequest represents a request to update an existing ServiceClass. This allows modification of ServiceClass configuration and lifecycle tools.
type ServiceClassValidateRequest ¶
type ServiceClassValidateRequest struct {
// Name for validation (required).
Name string `json:"name" validate:"required"`
// Version for validation.
Version string `json:"version,omitempty"`
// Description for validation.
Description string `json:"description,omitempty"`
// Args defines the validation rules and metadata for service creation arguments.
// These definitions are used to validate arguments when creating service instances
// and to provide documentation for the service creation API.
Args map[string]ArgDefinition `json:"args,omitempty"`
// ServiceConfig to validate (required). All lifecycle tools will be checked
// for availability and proper configuration.
ServiceConfig ServiceConfig `json:"serviceConfig" validate:"required"`
}
ServiceClassValidateRequest represents a request to validate a ServiceClass definition without creating it. Useful for testing configurations before deployment.
type ServiceConfig ¶
type ServiceConfig struct {
// ServiceType categorizes the kind of service this configuration manages.
// This is used for grouping, filtering, and applying type-specific logic.
ServiceType string `yaml:"serviceType" json:"serviceType"`
// DefaultName provides a default name pattern for services created from this class.
// This can include templating placeholders that are replaced during service creation.
DefaultName string `yaml:"defaultName" json:"defaultName"`
// Dependencies lists other ServiceClasses that must be available before services
// of this type can be created. This ensures proper ordering and availability.
Dependencies []string `yaml:"dependencies" json:"dependencies"`
// LifecycleTools maps service lifecycle events to specific aggregator tools.
// These tools are called by the orchestrator to manage service instances.
LifecycleTools LifecycleTools `yaml:"lifecycleTools" json:"lifecycleTools"`
// HealthCheck configures health monitoring for services created from this class.
// This determines how often health checks are performed and what constitutes healthy/unhealthy states.
HealthCheck HealthCheckConfig `yaml:"healthCheck" json:"healthCheck"`
// Timeout specifies timeout values for various service operations.
// These timeouts help prevent operations from hanging indefinitely.
Timeout TimeoutConfig `yaml:"timeout" json:"timeout"`
// Outputs defines template-based outputs that should be generated when service instances are created.
// These templates are resolved using service instance arguments and runtime data (like sessionID).
// The resolved outputs are made available in service creation results and can be used by workflows.
Outputs map[string]interface{} `yaml:"outputs,omitempty" json:"outputs,omitempty"`
}
ServiceConfig defines the service lifecycle management configuration for a ServiceClass. This structure specifies how services should be managed, including tool mappings for lifecycle events, health check configuration, and arg handling.
type ServiceInfo ¶
type ServiceInfo interface {
// GetName returns the unique name/identifier of the service
GetName() string
// GetType returns the service type (e.g., TypeMCPServer, TypeAggregator)
GetType() ServiceType
// GetState returns the current operational state of the service
GetState() ServiceState
// GetHealth returns the current health status of the service
GetHealth() HealthStatus
// GetLastError returns the last error encountered by the service, or nil if none
GetLastError() error
// GetServiceData returns additional metadata and runtime information about the service
GetServiceData() map[string]interface{}
}
ServiceInfo provides information about a service instance. This interface defines the contract for accessing service metadata and state regardless of whether the service is static or ServiceClass-based.
All service implementations must provide this interface to be managed by the service registry and orchestrator.
type ServiceInstance ¶
type ServiceInstance struct {
// Name is the human-readable name for this service instance.
// If not provided, it may be derived from the ServiceClass defaultName or ID.
Name string `json:"name,omitempty" yaml:"name"`
// ServiceClassName specifies which ServiceClass template this instance was created from.
// This establishes the relationship between the instance and its blueprint.
ServiceClassName string `json:"serviceClassName" yaml:"serviceClassName"`
// ServiceClassType indicates the type of service as defined in the ServiceClass.
// This is populated from the ServiceClass configuration and used for categorization.
ServiceClassType string `json:"serviceClassType,omitempty" yaml:"serviceClassType,omitempty"`
// Args contains the configuration values provided when creating this service instance.
// These values are validated against the ServiceClass arg definitions.
Args map[string]interface{} `yaml:"args" json:"args"`
// Dependencies lists other service instances that must be running before this instance can start.
// The orchestrator uses this for proper startup ordering.
Dependencies []string `yaml:"dependencies,omitempty" json:"dependencies,omitempty"`
// AutoStart determines whether this service instance should be automatically started
// when the system boots up or when dependencies become available.
AutoStart bool `yaml:"autoStart,omitempty" json:"autoStart,omitempty"`
// Enabled indicates whether this service instance is enabled for operation.
// Disabled instances will not be started even if AutoStart is true.
Enabled bool `yaml:"enabled" json:"enabled"`
// Description provides a human-readable description of this service instance's purpose.
Description string `yaml:"description,omitempty" json:"description,omitempty"`
// Version tracks the version of this service instance configuration.
// This can be used for configuration migration and compatibility checks.
Version string `yaml:"version,omitempty" json:"version,omitempty"`
// State represents the current operational state of the service instance.
// This is runtime information and not persisted to YAML files.
State ServiceState `json:"state,omitempty" yaml:"-"`
// Health indicates the current health status of the service instance.
// This is determined by health checks and not persisted to YAML files.
Health HealthStatus `json:"health,omitempty" yaml:"-"`
// LastError contains the most recent error message from service operations.
// This is runtime information and not persisted to YAML files.
LastError string `json:"lastError,omitempty" yaml:"-"`
// ServiceData contains additional runtime data specific to this service instance.
// This might include connection information, status details, or other service-specific data.
ServiceData map[string]interface{} `json:"serviceData,omitempty" yaml:"-"`
// Outputs contains the resolved outputs from the ServiceClass outputs definition.
// These are generated during service creation by resolving templates with service args and runtime data.
// Outputs are available for workflows and other consumers that need access to service-generated values.
Outputs map[string]interface{} `json:"outputs,omitempty" yaml:"-"`
// CreatedAt records when this service instance was initially created.
// This timestamp is persisted and used for auditing and lifecycle management.
CreatedAt time.Time `json:"createdAt,omitempty" yaml:"createdAt,omitempty"`
// UpdatedAt tracks when this service instance configuration was last modified.
// This is runtime information and not persisted to YAML files.
UpdatedAt time.Time `json:"updatedAt,omitempty" yaml:"-"`
// LastChecked records the timestamp of the most recent health check.
// This is runtime information and not persisted to YAML files.
LastChecked *time.Time `json:"lastChecked,omitempty" yaml:"-"`
// HealthCheckFailures counts consecutive health check failures.
// This is used by the health monitoring system to determine service health trends.
HealthCheckFailures int `json:"healthCheckFailures,omitempty" yaml:"-"`
// HealthCheckSuccesses counts consecutive health check successes.
// This is used by the health monitoring system to determine recovery patterns.
HealthCheckSuccesses int `json:"healthCheckSuccesses,omitempty" yaml:"-"`
}
ServiceInstance represents a single service instance that is created from a ServiceClass template. It combines both the configuration data (for persistence) and runtime state (for API responses). ServiceInstance consolidates ServiceInstance, ServiceInstanceDefinition, and ServiceClassInstanceInfo into a unified type that can be used across different layers of the system.
The struct is designed to support both YAML serialization for persistence and JSON serialization for API responses, with some fields excluded from YAML when they represent transient runtime state.
type ServiceInstanceEvent ¶
type ServiceInstanceEvent struct {
// Name is the unique name of the service instance that triggered this event.
Name string `json:"name"`
// ServiceType indicates the type of service as defined in the ServiceClass.
ServiceType string `json:"serviceType"`
// OldState represents the previous operational state before this event.
OldState string `json:"oldState"`
// NewState represents the current operational state after this event.
NewState string `json:"newState"`
// OldHealth represents the previous health status before this event.
OldHealth string `json:"oldHealth"`
// NewHealth represents the current health status after this event.
NewHealth string `json:"newHealth"`
// Error contains any error message associated with this state change event.
// This field is only populated if the state change was caused by an error condition.
Error string `json:"error,omitempty"`
// Timestamp records when this event occurred.
// This is used for event ordering and audit trails.
Timestamp time.Time `json:"timestamp"`
// Metadata contains additional context-specific information about this event.
// The content depends on the service type and the nature of the state change.
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
ServiceInstanceEvent represents a state change event for a ServiceClass-based service instance. These events are emitted when service instances change state, health status, or encounter errors. Clients can subscribe to these events to monitor service instance lifecycle and health.
type ServiceListResponse ¶
type ServiceListResponse struct {
// Services contains the list of service status information
Services []ServiceStatus `json:"services"`
}
ServiceListResponse represents a list of services in API responses. This is used by endpoints that return multiple service status information.
type ServiceManagerHandler ¶
type ServiceManagerHandler interface {
// StartService starts a service by name.
// Works for both static services and ServiceClass-based service instances.
//
// Args:
// - name: The name of the service to start
//
// Returns:
// - error: Error if the service doesn't exist or fails to start
StartService(name string) error
// StopService stops a running service by name.
// Works for both static services and ServiceClass-based service instances.
//
// Args:
// - name: The name of the service to stop
//
// Returns:
// - error: Error if the service doesn't exist or fails to stop
StopService(name string) error
// RestartService restarts a service by name (stop followed by start).
// Works for both static services and ServiceClass-based service instances.
//
// Args:
// - name: The name of the service to restart
//
// Returns:
// - error: Error if the service doesn't exist or fails to restart
RestartService(name string) error
// GetServiceStatus returns the current status of a service.
//
// Args:
// - name: The name of the service to query
//
// Returns:
// - *ServiceStatus: Current status information including state, health, and metadata
// - error: Error if the service doesn't exist
GetServiceStatus(name string) (*ServiceStatus, error)
// GetAllServices returns the status of all services in the system.
//
// Returns:
// - []ServiceStatus: List of status information for all services
GetAllServices() []ServiceStatus
// GetService returns detailed information about a specific service.
// This provides more comprehensive information than GetServiceStatus.
//
// Args:
// - name: The name of the service to query
//
// Returns:
// - *ServiceInstance: Detailed service information including configuration
// - error: Error if the service doesn't exist
GetService(name string) (*ServiceInstance, error)
// CreateService creates a new service instance from a ServiceClass template.
// This operation is only applicable to ServiceClass-based services.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - req: Request containing ServiceClass name, instance name, and args
//
// Returns:
// - *ServiceInstance: The created service instance information
// - error: Error if the ServiceClass doesn't exist or creation fails
CreateService(ctx context.Context, req CreateServiceInstanceRequest) (*ServiceInstance, error)
// DeleteService removes a ServiceClass-based service instance.
// This operation is only applicable to ServiceClass-based services.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - name: The name of the service instance to delete
//
// Returns:
// - error: Error if the service doesn't exist or deletion fails
DeleteService(ctx context.Context, name string) error
// SubscribeToStateChanges returns a channel for receiving service state change events.
// This allows components to react to service lifecycle events.
//
// Returns:
// - <-chan ServiceStateChangedEvent: Channel that receives state change notifications
//
// Note: The returned channel should be consumed to prevent blocking the event system.
SubscribeToStateChanges() <-chan ServiceStateChangedEvent
// SubscribeToServiceInstanceEvents returns a channel for receiving ServiceClass instance events.
// This provides notifications specific to ServiceClass-based service instances.
//
// Returns:
// - <-chan ServiceInstanceEvent: Channel that receives instance-specific events
//
// Note: The returned channel should be consumed to prevent blocking the event system.
SubscribeToServiceInstanceEvents() <-chan ServiceInstanceEvent
// ToolProvider integration for exposing service management as MCP tools
ToolProvider
}
ServiceManagerHandler provides unified management for both static and ServiceClass-based services. This is the primary interface for service lifecycle operations in the Service Locator Pattern.
The handler abstracts the differences between static services (defined in configuration) and dynamic ServiceClass-based services (created at runtime), providing a unified API for service management operations.
func GetServiceManager ¶
func GetServiceManager() ServiceManagerHandler
GetServiceManager returns the registered service manager handler. This provides access to unified service lifecycle management for both static services and ServiceClass-based service instances.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- ServiceManagerHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
manager := api.GetServiceManager()
if manager == nil {
return fmt.Errorf("service manager not available")
}
err := manager.StartService("my-service")
type ServiceRegistryHandler ¶
type ServiceRegistryHandler interface {
// Get retrieves a service by name from the registry.
//
// Args:
// - name: The unique name of the service to retrieve
//
// Returns:
// - ServiceInfo: The service information if found
// - bool: true if the service exists, false otherwise
Get(name string) (ServiceInfo, bool)
// GetAll returns all services currently registered in the system.
//
// Returns:
// - []ServiceInfo: List of all registered services (both static and dynamic)
GetAll() []ServiceInfo
// GetByType returns all services of a specific type.
//
// Args:
// - serviceType: The type of services to retrieve (e.g., TypeMCPServer)
//
// Returns:
// - []ServiceInfo: List of services matching the specified type
GetByType(serviceType ServiceType) []ServiceInfo
}
ServiceRegistryHandler provides access to registered services in the system. This handler implements the service discovery aspect of the Service Locator Pattern, allowing components to find and access service information without direct coupling.
The registry maintains both static services (defined in configuration) and dynamic ServiceClass-based service instances.
func GetServiceRegistry ¶
func GetServiceRegistry() ServiceRegistryHandler
GetServiceRegistry returns the registered service registry handler. This provides access to the service discovery and information interface.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- ServiceRegistryHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
registry := api.GetServiceRegistry()
if registry == nil {
return fmt.Errorf("service registry not available")
}
services := registry.GetAll()
type ServiceState ¶
type ServiceState string
ServiceState represents the current operational state of a service. This provides a standardized way to track service lifecycle across all service types.
const ( // StateStopped indicates the service is not running StateStopped ServiceState = "stopped" // StateStarting indicates the service is in the process of starting up StateStarting ServiceState = "starting" // StateRunning indicates the service is running and operational StateRunning ServiceState = "running" // StateStopping indicates the service is in the process of shutting down StateStopping ServiceState = "stopping" // StateError indicates the service encountered an error and may not be functional StateError ServiceState = "error" // StateFailed indicates the service failed to start or operate correctly StateFailed ServiceState = "failed" // StateUnknown indicates the service state cannot be determined StateUnknown ServiceState = "unknown" // StateWaiting indicates the service is waiting for dependencies or resources StateWaiting ServiceState = "waiting" // StateRetrying indicates the service is retrying a failed operation StateRetrying ServiceState = "retrying" // StateUnreachable indicates the service endpoint cannot be reached after multiple attempts. // This state is used for remote MCP servers (streamable-http, sse) that fail to connect // due to network issues, DNS failures, or decommissioned endpoints. // Servers in this state will use exponential backoff for retry attempts. // // Related constants: // - aggregator.StatusUnreachable (internal/aggregator/types.go) // - pkgoauth.ServerStatusUnreachable (pkg/oauth/types.go) StateUnreachable ServiceState = "unreachable" // StateAuthRequired indicates the service requires OAuth authentication before connecting. // This state is used for remote MCP servers that returned a 401 Unauthorized response // during initialization, indicating they support OAuth and require authentication. // Users should run `muster auth login --server <name>` to authenticate. // // This is distinct from StateUnreachable: // - StateAuthRequired: Server is reachable but requires authentication // - StateUnreachable: Server cannot be reached (network/connectivity issue) // // Related constants: // - aggregator.StatusAuthRequired (internal/aggregator/types.go) // - pkgoauth.ServerStatusAuthRequired (pkg/oauth/types.go) StateAuthRequired ServiceState = "auth_required" // StateConnected indicates the service is connected and authenticated. // This is an alias for StateRunning for semantic clarity with remote servers. // For remote MCP servers, "connected" is more intuitive than "running" since // the server itself is running remotely - we are just connected to it. // // Related constants: // - aggregator.StatusConnected (internal/aggregator/types.go) // - pkgoauth.ServerStatusConnected (pkg/oauth/types.go) StateConnected ServiceState = "connected" // StateDisconnected indicates the service was previously connected but is now disconnected. // This state is used for remote MCP servers that were successfully connected but // the connection was lost (session ended, server restart, etc.). // // Related constants: // - aggregator.StatusDisconnected (internal/aggregator/types.go) StateDisconnected ServiceState = "disconnected" )
type ServiceStateChangedEvent ¶
type ServiceStateChangedEvent struct {
// Name is the unique identifier of the service that changed state
Name string `json:"name"`
// ServiceType indicates the type of service (e.g., "MCPServer", "Aggregator")
ServiceType string `json:"service_type"`
// OldState is the previous state before the transition
OldState string `json:"old_state"`
// NewState is the current state after the transition
NewState string `json:"new_state"`
// Health is the current health status of the service
Health string `json:"health"`
// Error contains error information if the state change was due to an error
Error error `json:"error,omitempty"`
// Timestamp indicates when the state change occurred
Timestamp time.Time `json:"timestamp"`
}
ServiceStateChangedEvent represents a service state transition event. These events are published whenever a service changes state, allowing components to react to service lifecycle changes.
type ServiceStatus ¶
type ServiceStatus struct {
// Name is the unique identifier of the service
Name string `json:"name"`
// ServiceType indicates the type of service (e.g., "MCPServer", "Aggregator")
ServiceType string `json:"service_type"`
// State is the current operational state of the service
State ServiceState `json:"state"`
// Health is the current health status of the service
Health HealthStatus `json:"health"`
// Error contains error information if the service is in an error state
Error string `json:"error,omitempty"`
// Metadata contains additional runtime information about the service
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Outputs contains the resolved outputs from the ServiceClass outputs definition.
// Only populated for ServiceClass-based services that have outputs configured.
Outputs map[string]interface{} `json:"outputs,omitempty"`
}
ServiceStatus represents the current status of a service for API responses. This is a simplified view of service information suitable for status queries and monitoring dashboards.
type ServiceType ¶
type ServiceType string
ServiceType represents the type/category of a service. This classification helps with service organization and type-specific operations.
const ( // TypeMCPServer represents MCP (Model Context Protocol) server services TypeMCPServer ServiceType = "MCPServer" // TypeAggregator represents aggregator services that coordinate multiple MCP servers TypeAggregator ServiceType = "Aggregator" )
type ServiceValidateRequest ¶
type ServiceValidateRequest struct {
// Name is the proposed name for the service instance (required).
// Must be unique across all services (both static and ServiceClass-based).
Name string `json:"name" validate:"required"`
// ServiceClassName specifies which ServiceClass to use as template (required).
// Must reference an existing ServiceClass.
ServiceClassName string `json:"serviceClassName" validate:"required"`
// Args provides the configuration for service creation.
// Must match the argument definitions in the ServiceClass.
Args map[string]interface{} `json:"args,omitempty"`
// AutoStart determines if the service should start automatically after creation.
AutoStart bool `json:"autoStart,omitempty"`
// Description provides optional documentation for this service instance.
Description string `json:"description,omitempty"`
}
ServiceValidateRequest represents a request to validate service creation args against a ServiceClass definition. This is useful for validating args before actually creating a service instance.
Example:
request := ServiceValidateRequest{
Name: "my-database",
ServiceClassName: "postgres-database",
Args: map[string]interface{}{
"database_name": "myapp",
"username": "dbuser",
"port": 5432,
},
}
type SessionInitCallback ¶
SessionInitCallback is called when a new session is first seen with a valid muster token. The aggregator registers this callback to trigger proactive SSO connections to all SSO-enabled servers (forwardToken: true) using muster's ID token.
This callback is triggered on the first authenticated MCP request for a session, enabling seamless SSO: users authenticate once to muster (via `muster auth login`) and automatically gain access to all SSO-enabled MCP servers.
Args:
- ctx: Context containing the ID token for forwarding
- sessionID: The MCP session ID
The callback should not return an error - SSO connection failures are logged but don't prevent the request from proceeding.
func GetSessionInitCallback ¶
func GetSessionInitCallback() SessionInitCallback
GetSessionInitCallback returns the registered session initialization callback. Returns nil if no callback has been registered.
Thread-safe: Yes, protected by sessionInitMutex read lock.
type StateUpdater ¶
type StateUpdater interface {
// UpdateState updates the service's operational state.
//
// Args:
// - state: The new service state
// - health: The new health status
// - err: Optional error associated with the state change
UpdateState(state ServiceState, health HealthStatus, err error)
}
StateUpdater is an optional interface for services that allow external state updates. This is used to update service state when external events occur, such as SSO authentication succeeding at the session level.
Not all services implement this interface; callers should type-assert before use.
type TeleportAuth ¶
type TeleportAuth struct {
// IdentityDir is the directory containing Teleport identity files.
// In filesystem mode, this is the tbot output directory.
// In Kubernetes mode, this is where the identity secret is mounted.
// Example: /var/run/tbot/identity
IdentityDir string `yaml:"identityDir,omitempty" json:"identityDir,omitempty"`
// IdentitySecretName is the name of the Kubernetes Secret containing
// tbot identity files. Used when running in Kubernetes mode.
// The secret should contain: tlscert, key, teleport-application-ca.pem
// Example: tbot-identity-output
IdentitySecretName string `yaml:"identitySecretName,omitempty" json:"identitySecretName,omitempty"`
// IdentitySecretNamespace is the Kubernetes namespace where the identity
// secret is located. Defaults to the MCPServer's namespace if not specified.
IdentitySecretNamespace string `yaml:"identitySecretNamespace,omitempty" json:"identitySecretNamespace,omitempty"`
// AppName is the Teleport application name for routing.
// This is used to identify which Teleport-protected application to connect to.
// Example: mcp-kubernetes
AppName string `yaml:"appName,omitempty" json:"appName,omitempty"`
}
TeleportAuth configures Teleport authentication for an MCP server. This enables access to MCP servers on private installations via Teleport Application Access using Machine ID certificates.
type TeleportClientConfig ¶
type TeleportClientConfig struct {
// IdentityDir is the filesystem path to the tbot identity directory.
// Used when running in filesystem mode (local development).
IdentityDir string
// IdentitySecretName is the name of the Kubernetes Secret containing
// tbot identity files. Used when running in Kubernetes mode.
IdentitySecretName string
// IdentitySecretNamespace is the namespace of the identity secret.
// Defaults to "default" if not specified.
IdentitySecretNamespace string
// AppName is the Teleport application name for routing.
// When specified, the HTTP client will include the appropriate Host header.
AppName string
}
TeleportClientConfig provides configuration for obtaining a Teleport HTTP client. It supports both filesystem-based identity directories and Kubernetes secrets.
type TeleportClientHandler ¶
type TeleportClientHandler interface {
// GetHTTPClientForIdentity returns an HTTP client configured with Teleport
// certificates from the specified identity directory.
//
// The identity directory should contain tbot application output files:
// - tlscert: Client certificate
// - key: Client private key
// - teleport-application-ca.pem: Teleport CA certificate
//
// The returned client uses mutual TLS and trusts the Teleport CA.
//
// Args:
// - identityDir: Path to the directory containing Teleport identity files
//
// Returns:
// - *http.Client: HTTP client configured with Teleport certificates
// - error: Error if certificates cannot be loaded or are invalid
GetHTTPClientForIdentity(identityDir string) (*http.Client, error)
// GetHTTPTransportForIdentity returns an HTTP transport configured with
// Teleport certificates. This is useful when you need to customize the
// HTTP client further or share the transport across multiple clients.
//
// Args:
// - identityDir: Path to the directory containing Teleport identity files
//
// Returns:
// - *http.Transport: HTTP transport configured with Teleport certificates
// - error: Error if certificates cannot be loaded or are invalid
GetHTTPTransportForIdentity(identityDir string) (*http.Transport, error)
// GetHTTPClientForConfig returns an HTTP client configured with Teleport
// certificates based on TeleportClientConfig, supporting both filesystem
// identity directories and Kubernetes secrets.
//
// When IdentitySecretName is specified, certificates are loaded from the
// Kubernetes secret. Otherwise, certificates are loaded from IdentityDir.
//
// Args:
// - ctx: Context for Kubernetes API calls
// - config: Configuration specifying identity source and options
//
// Returns:
// - *http.Client: HTTP client configured with Teleport certificates
// - error: Error if certificates cannot be loaded or are invalid
GetHTTPClientForConfig(ctx context.Context, config TeleportClientConfig) (*http.Client, error)
}
TeleportClientHandler defines the interface for providing HTTP clients configured with Teleport Machine ID certificates.
This handler enables access to MCP servers on private installations that are only reachable via Teleport Application Access. The handler manages TLS certificate loading, hot-reloading, and HTTP client lifecycle.
Implementations should:
- Load TLS certificates from the specified identity directory or Kubernetes secret
- Monitor certificates for changes and reload automatically
- Provide HTTP clients configured with mutual TLS
Thread-safe: All methods must be safe for concurrent use.
func GetTeleportClient ¶
func GetTeleportClient() TeleportClientHandler
GetTeleportClient returns the registered Teleport client handler. This provides access to HTTP clients configured with Teleport certificates for accessing private installations.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- TeleportClientHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
teleportHandler := api.GetTeleportClient()
if teleportHandler == nil {
return fmt.Errorf("Teleport client handler not available")
}
httpClient, err := teleportHandler.GetHTTPClientForIdentity("/var/run/tbot/identity")
type TimeoutConfig ¶
type TimeoutConfig struct {
// Create specifies the maximum time to wait for resource creation operations.
// Includes service instance creation, capability initialization, etc.
Create time.Duration `yaml:"create" json:"create"`
// Delete specifies the maximum time to wait for resource deletion operations.
// Includes service instance cleanup, resource deallocation, etc.
Delete time.Duration `yaml:"delete" json:"delete"`
// HealthCheck specifies the maximum time to wait for health check operations.
// Individual health checks should complete within this time limit.
HealthCheck time.Duration `yaml:"healthCheck" json:"healthCheck"`
}
TimeoutConfig defines timeout behavior for various operations. This ensures operations don't hang indefinitely and provides predictable behavior across different components and operations.
Timeouts are essential for maintaining system stability and preventing resource leaks from stuck operations.
type TokenExchangeConfig ¶
type TokenExchangeConfig struct {
// Enabled determines whether token exchange should be attempted.
// Default: false
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"`
// DexTokenEndpoint is the URL used to connect to the remote cluster's Dex token endpoint.
// This may differ from the issuer URL when access goes through a proxy (e.g., Teleport).
// Required when Enabled is true.
// Example: https://dex.cluster-b.example.com/token (direct)
// Example: https://dex-cluster.proxy.example.com/token (via proxy)
DexTokenEndpoint string `yaml:"dexTokenEndpoint,omitempty" json:"dexTokenEndpoint,omitempty"`
// ExpectedIssuer is the expected issuer URL in the exchanged token's "iss" claim.
// This should match the remote Dex's configured issuer URL.
// When access goes through a proxy, this differs from DexTokenEndpoint.
// If not specified, the issuer is derived from DexTokenEndpoint (backward compatible).
// Example: https://dex.cluster-b.example.com
ExpectedIssuer string `yaml:"expectedIssuer,omitempty" json:"expectedIssuer,omitempty"`
// ConnectorID is the ID of the OIDC connector on the remote Dex that
// trusts the local cluster's Dex.
// Required when Enabled is true.
// Example: "cluster-a-dex"
ConnectorID string `yaml:"connectorId,omitempty" json:"connectorId,omitempty"`
// Scopes are the scopes to request for the exchanged token.
// Default: "openid profile email groups"
Scopes string `yaml:"scopes,omitempty" json:"scopes,omitempty"`
// ClientCredentialsSecretRef references a Kubernetes Secret containing
// client credentials for authenticating with the remote Dex's token endpoint.
// This is required when the remote Dex requires client authentication for
// token exchange (RFC 8693).
ClientCredentialsSecretRef *ClientCredentialsSecretRef `yaml:"clientCredentialsSecretRef,omitempty" json:"clientCredentialsSecretRef,omitempty"`
// ClientID is the resolved client ID from the secret (populated at runtime).
// This field is not persisted and is populated when loading credentials.
ClientID string `yaml:"-" json:"-"`
// ClientSecret is the resolved client secret from the secret (populated at runtime).
// This field is not persisted and is populated when loading credentials.
ClientSecret string `yaml:"-" json:"-"`
}
TokenExchangeConfig configures RFC 8693 Token Exchange for cross-cluster SSO. This enables muster to exchange its local token for a token valid on a remote cluster's Identity Provider (typically Dex).
The remote Dex must be configured with an OIDC connector that trusts the local cluster's Dex. For example:
# On remote cluster's Dex (cluster-b)
connectors:
- type: oidc
id: cluster-a-dex
name: "Cluster A"
config:
issuer: https://dex.cluster-a.example.com
getUserInfo: true
insecureEnableGroups: true
type ToolCall ¶
type ToolCall struct {
// Tool specifies the name of the tool to call.
// Must correspond to an available tool in the aggregator.
Tool string `yaml:"tool" json:"tool"`
// Args provides static arguments to pass to the tool.
// These can be combined with dynamic arguments from service args.
Args map[string]interface{} `yaml:"args" json:"args"`
// Outputs defines how to extract values from tool responses using JSON paths.
// These outputs can be referenced in subsequent tool calls via templating.
// Format: outputName: "json.path.to.value"
Outputs map[string]string `yaml:"outputs,omitempty" json:"outputs,omitempty"`
}
ToolCall defines how to call an aggregator tool for a lifecycle event. This is used in ServiceClass definitions to specify which tools should be called for service lifecycle operations (start, stop, restart, etc.).
ToolCall provides the declarative configuration for how ServiceClass lifecycle operations map to actual tool executions, including argument preparation and response processing.
type ToolCaller ¶
type ToolCaller struct{}
ToolCaller implements tool calling interfaces using the API layer. It serves as an adapter that allows different subsystems (serviceclasses,workflows) to call tools through the aggregator without directly coupling to the aggregator implementation. This follows the service locator pattern to maintain architectural boundaries.
func NewToolCaller ¶
func NewToolCaller() *ToolCaller
NewToolCaller creates a new API-based tool caller instance. The returned ToolCaller uses the API service locator pattern to access aggregator functionality without direct coupling to implementations.
Returns:
- *ToolCaller: A new tool caller instance
func (*ToolCaller) CallTool ¶
func (atc *ToolCaller) CallTool(ctx context.Context, toolName string, args map[string]interface{}) (map[string]interface{}, error)
CallTool implements the ToolCaller interface by delegating to the aggregator handler. It provides a standardized way to execute tools with proper error handling and result formatting. The method converts the aggregator's result format to the format expected by the workflow system.
func (*ToolCaller) CallToolInternal ¶
func (atc *ToolCaller) CallToolInternal(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
CallToolInternal implements the workflow ToolCaller interface by delegating to the aggregator handler. This method provides direct access to the raw MCP tool result format, which is used by workflow execution and other internal subsystems that need unmodified tool results.
func (*ToolCaller) GetAvailableTools ¶
func (atc *ToolCaller) GetAvailableTools() []string
GetAvailableTools returns all available tools from the aggregator. This method provides tool discovery functionality for clients that need to understand what tools are currently available in the system.
func (*ToolCaller) IsToolAvailable ¶
func (atc *ToolCaller) IsToolAvailable(toolName string) bool
IsToolAvailable checks if a tool is available through the aggregator. This method provides a convenient way to validate tool availability without attempting to execute the tool, which is useful for validation and UI purposes.
type ToolChecker ¶
type ToolChecker struct{}
ToolChecker implements config.ToolAvailabilityChecker using the API layer. It provides a way for the configuration system to validate tool availability without direct coupling to the aggregator implementation. This is particularly useful for ServiceClass validation where tools need to be checked before service instances can be created.
func NewToolChecker ¶
func NewToolChecker() *ToolChecker
NewToolChecker creates a new API-based tool checker instance. The returned ToolChecker uses the API service locator pattern to access aggregator functionality for tool availability checking.
Returns:
- *ToolChecker: A new tool checker instance
func (*ToolChecker) GetAvailableTools ¶
func (atc *ToolChecker) GetAvailableTools() []string
GetAvailableTools returns all available tools using the aggregator API handler. This method enables the config package to perform comprehensive tool availability checks and provide detailed validation feedback.
func (*ToolChecker) IsToolAvailable ¶
func (atc *ToolChecker) IsToolAvailable(toolName string) bool
IsToolAvailable checks if a tool is available using the aggregator API handler. This method provides the config package with a way to validate tool availability during configuration loading and validation without creating tight coupling.
type ToolMetadata ¶
type ToolMetadata struct {
// Name is the unique identifier for the tool (e.g., "workflow_list", "auth_login")
// Must be unique within the scope of the tool provider
Name string
// Description provides human-readable documentation about what the tool does
// and how to use it effectively
Description string
// Args defines the input arguments accepted by this tool,
// including validation rules and documentation
Args []ArgMetadata
}
ToolMetadata describes a tool that can be exposed through the MCP protocol. This metadata is used for tool discovery, documentation generation, and argument validation during tool execution.
Tools are the primary mechanism for exposing functionality through muster, allowing workflows, capabilities, and other components to be discoverable and executable through the standard MCP protocol.
type ToolProvider ¶
type ToolProvider interface {
// GetTools returns metadata for all tools this provider offers.
// This is used for tool discovery and documentation generation.
//
// The returned metadata should be stable and consistent across calls,
// as it's used for caching and tool registration purposes.
//
// Returns:
// - []ToolMetadata: List of all tools provided by this component
//
// Example:
//
// func (p *MyProvider) GetTools() []ToolMetadata {
// return []ToolMetadata{
// {
// Name: "my_operation",
// Description: "Performs my custom operation",
// Args: []ArgMetadata{
// {
// Name: "input",
// Type: "string",
// Required: true,
// Description: "Input data for processing",
// },
// },
// },
// }
// }
GetTools() []ToolMetadata
// ExecuteTool executes a specific tool by name with the provided arguments.
// This is the main entry point for tool execution and must handle
// arg validation, execution logic, and result formatting.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - toolName: The name of the tool to execute (must match a tool from GetTools)
// - args: Arguments for the tool execution, should be validated against tool metadata
//
// Returns:
// - *CallToolResult: The result of the tool execution
// - error: Error if the tool doesn't exist or execution fails
//
// Example:
//
// func (p *MyProvider) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*CallToolResult, error) {
// switch toolName {
// case "my_operation":
// input, ok := args["input"].(string)
// if !ok {
// return &CallToolResult{
// Content: []interface{}{"input arg must be a string"},
// IsError: true,
// }, nil
// }
// // Perform operation
// result := processInput(input)
// return &CallToolResult{
// Content: []interface{}{result},
// IsError: false,
// }, nil
// default:
// return nil, fmt.Errorf("unknown tool: %s", toolName)
// }
// }
ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (*CallToolResult, error)
}
ToolProvider interface defines the contract for components that can provide tools to the MCP ecosystem. This interface is implemented by workflow, serviceclass, and other tool-providing packages.
Components implementing this interface can expose their functionality as MCP tools that can be discovered and executed through the aggregator, making them available to external clients and internal orchestration.
All tool providers must implement both tool discovery (GetTools) and execution (ExecuteTool) to participate in the tool ecosystem.
type ToolUpdateEvent ¶
type ToolUpdateEvent struct {
// Type specifies the kind of tool update event.
// Valid values: "server_registered", "server_deregistered", "tools_updated"
Type string `json:"type"`
// ServerName identifies the MCP server that triggered this event
ServerName string `json:"server_name"`
// Tools contains the list of tool names affected by this event.
// For "server_registered": all tools provided by the server
// For "server_deregistered": all tools that were removed
// For "tools_updated": the current complete tool list
Tools []string `json:"tools"`
// Timestamp records when this event occurred
Timestamp time.Time `json:"timestamp"`
// Error contains error information if the event represents a failure condition.
// Only populated for error events, empty for successful operations.
Error string `json:"error,omitempty"`
}
ToolUpdateEvent represents a tool availability change event in the MCP ecosystem. These events are published when MCP servers are registered/deregistered or when their available tools change, allowing components to react to tool availability changes.
Events are delivered asynchronously through the tool update subscription system, enabling real-time reactivity to changes in the tool landscape.
Example event types:
- "server_registered": A new MCP server has been registered
- "server_deregistered": An MCP server has been removed
- "tools_updated": Tools available from a server have changed
type ToolUpdateSubscriber ¶
type ToolUpdateSubscriber interface {
// OnToolsUpdated is called when tool availability changes in the system.
// Implementations should be non-blocking as this is called from goroutines.
//
// This method will be called for various tool availability events:
// - MCP server registration/deregistration
// - Changes in available tools from existing servers
// - Tool configuration updates
//
// Args:
// - event: ToolUpdateEvent containing details about what changed
//
// Note: This method is called asynchronously and should not block.
// Panics in this method are recovered and logged as errors.
//
// Example:
//
// func (s *MySubscriber) OnToolsUpdated(event api.ToolUpdateEvent) {
// switch event.Type {
// case "server_registered":
// log.Printf("New server %s registered with %d tools",
// event.ServerName, len(event.Tools))
// s.refreshCapabilities()
// case "server_deregistered":
// log.Printf("Server %s deregistered", event.ServerName)
// s.markToolsUnavailable(event.Tools)
// case "tools_updated":
// log.Printf("Tools updated for server %s", event.ServerName)
// s.updateToolCache(event.ServerName, event.Tools)
// }
// }
OnToolsUpdated(event ToolUpdateEvent)
}
ToolUpdateSubscriber interface defines the contract for components that want to receive notifications about tool availability changes.
Components implementing this interface can react to changes in the tool landscape, such as updating their own availability status, refreshing cached tool lists, or triggering recalculation of dependent functionality.
Subscribers are called asynchronously and should implement non-blocking operations to prevent affecting the overall system performance.
type Workflow ¶
type Workflow struct {
// Name is the unique identifier for this workflow
Name string `yaml:"name" json:"name"`
// Description provides human-readable documentation for the workflow's purpose
Description string `yaml:"description" json:"description"`
// Args defines the validation rules and metadata for workflow execution arguments.
// These definitions are used to validate arguments when executing workflows
// and to provide documentation for the workflow execution API.
Args map[string]ArgDefinition `yaml:"args,omitempty" json:"args,omitempty"`
// Steps defines the sequence of operations to be performed during workflow execution.
// Each step represents a tool call with its arguments and processing logic.
Steps []WorkflowStep `yaml:"steps" json:"steps"`
// Available indicates whether this workflow is currently available for execution
Available bool `json:"available,omitempty" yaml:"-"`
// CreatedAt indicates when this workflow was created
CreatedAt time.Time `yaml:"createdAt,omitempty" json:"createdAt"`
// LastModified indicates when this workflow was last updated
LastModified time.Time `yaml:"lastModified,omitempty" json:"lastModified"`
}
Workflow represents a single workflow definition and runtime state. This consolidates WorkflowDefinition, WorkflowInfo, and WorkflowConfig into one type to provide a unified view of workflow information across configuration and runtime contexts.
Workflows define multi-step processes that can be executed by the system, orchestrating tool calls, arg templating, and conditional logic. They provide a way to compose complex operations from simpler tool calls.
type WorkflowCondition ¶
type WorkflowCondition struct {
// Tool specifies the name of the tool to execute for condition evaluation.
// Must correspond to an available tool in the aggregator.
// Optional when FromStep is used.
Tool string `yaml:"tool,omitempty" json:"tool,omitempty"`
// Args provides the arguments to pass to the condition tool.
// Can include templated values that are resolved at runtime.
Args map[string]interface{} `yaml:"args,omitempty" json:"args,omitempty"`
// FromStep specifies the step ID to reference for condition evaluation.
// When specified, the condition evaluates against the result of the referenced step
// instead of executing a new tool call.
FromStep string `yaml:"from_step,omitempty" json:"from_step,omitempty"`
// Expect defines the expected result for the condition to be considered true.
// If the condition tool result matches these expectations, the step will execute.
// If not, the step will be skipped.
Expect WorkflowConditionExpectation `yaml:"expect,omitempty" json:"expect,omitempty"`
// ExpectNot defines the negated expected result for the condition to be considered true.
// If the condition tool result does NOT match these expectations, the step will execute.
// If it matches, the step will be skipped.
ExpectNot WorkflowConditionExpectation `yaml:"expect_not,omitempty" json:"expect_not,omitempty"`
}
WorkflowCondition defines a condition that determines whether a workflow step should execute. Conditions allow for dynamic workflow execution based on runtime state evaluation.
type WorkflowConditionExpectation ¶
type WorkflowConditionExpectation struct {
// Success indicates whether the condition tool should succeed (true) or fail (false)
// for the condition to be met.
Success bool `yaml:"success" json:"success"`
// JsonPath defines optional JSON path expressions that must match specific values
// in the condition tool's response. All specified paths must match for the condition
// to be considered true. This allows for content-based condition validation beyond
// just success/failure status.
JsonPath map[string]interface{} `yaml:"json_path,omitempty" json:"json_path,omitempty"`
}
WorkflowConditionExpectation defines what result is expected from a condition tool for the condition to be considered true.
type WorkflowCreateRequest ¶
type WorkflowCreateRequest struct {
// Name is the unique identifier for the workflow (required).
// Must be unique across all workflows in the system.
Name string `json:"name" validate:"required"`
// Version indicates the workflow version for compatibility tracking.
// Recommended to use semantic versioning.
Version string `json:"version,omitempty"`
// Description provides human-readable documentation for the workflow.
// Should explain the workflow's purpose and expected outcomes.
Description string `json:"description,omitempty"`
// Args defines the expected input arguments for workflow execution.
// Used for arg validation and documentation generation.
// If not specified, the workflow accepts any args.
Args map[string]ArgDefinition `json:"args,omitempty"`
// Steps defines the sequence of operations to perform (required).
// Each step executes a tool with specified arguments and processing logic.
// Must contain at least one step for a valid workflow.
Steps []WorkflowStep `json:"steps" validate:"required"`
}
WorkflowCreateRequest represents a request to create a new workflow definition. Workflows define multi-step processes that orchestrate tool calls with arg templating and conditional logic.
Example:
request := WorkflowCreateRequest{
Name: "deploy-service",
Description: "Deploy a service to production",
Args: map[string]ArgDefinition{
"service_name": {
Type: "string",
Required: true,
Description: "Name of the service to deploy",
},
},
Steps: []WorkflowStep{
{
ID: "build",
Tool: "docker_build",
Args: map[string]interface{}{
"name": "{{service_name}}",
},
},
},
}
type WorkflowExecution ¶
type WorkflowExecution struct {
// ExecutionID is the unique identifier for this execution (UUID v4)
ExecutionID string `json:"execution_id"`
// WorkflowName is the name of the workflow that was executed
WorkflowName string `json:"workflow_name"`
// Status indicates the current state of the execution
Status WorkflowExecutionStatus `json:"status"`
// StartedAt is the timestamp when the execution began
StartedAt time.Time `json:"started_at"`
// CompletedAt is the timestamp when the execution finished (nil if still running)
CompletedAt *time.Time `json:"completed_at,omitempty"`
// DurationMs is the total execution duration in milliseconds
DurationMs int64 `json:"duration_ms"`
// Input contains the original arguments passed to the workflow
Input map[string]interface{} `json:"input"`
// Result contains the final result of the workflow execution (nil if failed or in progress)
Result interface{} `json:"result,omitempty"`
// Error contains error information if the execution failed (nil if successful)
Error *string `json:"error,omitempty"`
// Steps contains detailed information about each step execution
Steps []WorkflowExecutionStep `json:"steps"`
}
WorkflowExecution represents a complete workflow execution record. This provides comprehensive information about a workflow execution including timing, results, errors, and detailed step-by-step execution information.
Workflow executions are automatically tracked when workflows are executed via the workflow_<name> tools, enabling debugging, auditing, and monitoring.
type WorkflowExecutionStatus ¶
type WorkflowExecutionStatus string
WorkflowExecutionStatus represents the status of a workflow execution
const ( // WorkflowExecutionInProgress indicates the execution is currently running WorkflowExecutionInProgress WorkflowExecutionStatus = "inprogress" // WorkflowExecutionCompleted indicates the execution finished successfully WorkflowExecutionCompleted WorkflowExecutionStatus = "completed" // WorkflowExecutionFailed indicates the execution failed with an error WorkflowExecutionFailed WorkflowExecutionStatus = "failed" )
type WorkflowExecutionStep ¶
type WorkflowExecutionStep struct {
// StepID is the unique identifier for this step within the workflow
StepID string `json:"step_id"`
// Tool is the name of the tool that was executed for this step
Tool string `json:"tool"`
// Status indicates the current state of the step execution
Status WorkflowExecutionStatus `json:"status"`
// StartedAt is the timestamp when the step execution began
StartedAt time.Time `json:"started_at"`
// CompletedAt is the timestamp when the step execution finished (nil if still running)
CompletedAt *time.Time `json:"completed_at,omitempty"`
// DurationMs is the step execution duration in milliseconds
DurationMs int64 `json:"duration_ms"`
// Input contains the resolved arguments passed to the tool for this step
Input map[string]interface{} `json:"input"`
// Result contains the result returned by the tool execution (nil if failed or in progress)
Result interface{} `json:"result,omitempty"`
// Error contains error information if the step failed (nil if successful)
Error *string `json:"error,omitempty"`
// StoredAs is the variable name where the step result was stored (from workflow definition)
StoredAs string `json:"stored_as,omitempty"`
}
WorkflowExecutionStep represents a single step execution within a workflow. This provides detailed information about individual step execution including timing, arguments, results, and any errors that occurred.
Step execution information enables granular debugging and understanding of workflow execution flow and data transformation.
type WorkflowExecutionSummary ¶
type WorkflowExecutionSummary struct {
// ExecutionID is the unique identifier for this execution
ExecutionID string `json:"execution_id"`
// WorkflowName is the name of the workflow that was executed
WorkflowName string `json:"workflow_name"`
// Status indicates the current state of the execution
Status WorkflowExecutionStatus `json:"status"`
// StartedAt is the timestamp when the execution began
StartedAt time.Time `json:"started_at"`
// CompletedAt is the timestamp when the execution finished (nil if still running)
CompletedAt *time.Time `json:"completed_at,omitempty"`
// DurationMs is the total execution duration in milliseconds
DurationMs int64 `json:"duration_ms"`
// StepCount is the total number of steps in the workflow
StepCount int `json:"step_count"`
// Error contains error information if the execution failed (nil if successful)
Error *string `json:"error,omitempty"`
}
WorkflowExecutionSummary represents a summary of a workflow execution for use in list responses. This contains essential information without the detailed step execution data to optimize performance.
Summary information is sufficient for most listing and overview use cases, with detailed information available via GetWorkflowExecution.
type WorkflowHandler ¶
type WorkflowHandler interface {
// ExecuteWorkflow executes a workflow with the provided arguments.
// This is the primary method for invoking workflow functionality.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - workflowName: The name of the workflow to execute
// - args: Arguments for the workflow execution, validated against input schema
//
// Returns:
// - *CallToolResult: The result of the workflow execution
// - error: Error if the workflow doesn't exist or execution fails
//
// Example:
//
// result, err := handler.ExecuteWorkflow(ctx, "deploy-service", map[string]interface{}{
// "service_name": "my-api",
// "environment": "production",
// "replicas": 3,
// })
ExecuteWorkflow(ctx context.Context, workflowName string, args map[string]interface{}) (*CallToolResult, error)
// ListWorkflowExecutions returns paginated list of workflow executions with optional filtering.
// This enables users to view execution history and track workflow usage patterns.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - req: Request args for filtering, pagination, and sorting
//
// Returns:
// - *ListWorkflowExecutionsResponse: Paginated list of execution records
// - error: Error if the request is invalid or operation fails
ListWorkflowExecutions(ctx context.Context, req *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
// GetWorkflowExecution returns detailed information about a specific workflow execution.
// This enables users to examine execution results, step details, and debug failed executions.
//
// Args:
// - ctx: Context for the operation, including cancellation and timeout
// - req: Request args specifying execution ID and optional filtering
//
// Returns:
// - *WorkflowExecution: Complete execution record with step details
// - error: Error if the execution doesn't exist or operation fails
GetWorkflowExecution(ctx context.Context, req *GetWorkflowExecutionRequest) (*WorkflowExecution, error)
// GetWorkflows returns information about all available workflows in the system.
// This includes both static workflow definitions and runtime availability status.
//
// Returns:
// - []Workflow: List of all workflow definitions with runtime information
GetWorkflows() []Workflow
// GetWorkflow returns detailed information about a specific workflow.
// This provides comprehensive information including steps, input schema, and metadata.
//
// Args:
// - name: The name of the workflow to retrieve
//
// Returns:
// - *Workflow: Detailed workflow information including definition and runtime state
// - error: Error if the workflow doesn't exist
GetWorkflow(name string) (*Workflow, error)
// CreateWorkflowFromStructured creates a new workflow from structured args.
// This allows dynamic workflow creation at runtime.
//
// Args:
// - args: Structured workflow definition args
//
// Returns:
// - error: Error if the workflow definition is invalid or creation fails
CreateWorkflowFromStructured(args map[string]interface{}) error
// UpdateWorkflowFromStructured updates an existing workflow from structured args.
// This allows runtime modification of workflow definitions.
//
// Args:
// - name: The name of the workflow to update
// - args: Updated workflow definition args
//
// Returns:
// - error: Error if the workflow doesn't exist or update fails
UpdateWorkflowFromStructured(name string, args map[string]interface{}) error
// DeleteWorkflow removes a workflow definition from the system.
//
// Args:
// - name: The name of the workflow to delete
//
// Returns:
// - error: Error if the workflow doesn't exist or deletion fails
DeleteWorkflow(name string) error
// ValidateWorkflowFromStructured validates a workflow definition without creating it.
// This is useful for validation during workflow development and testing.
//
// Args:
// - args: Workflow definition args to validate
//
// Returns:
// - error: Error if the workflow definition is invalid
ValidateWorkflowFromStructured(args map[string]interface{}) error
// ToolProvider integration for exposing workflows as discoverable MCP tools.
// This allows workflows to be discovered and executed through the standard
// tool discovery and execution mechanisms.
ToolProvider
}
WorkflowHandler defines the interface for workflow operations within the Service Locator Pattern. This handler provides the primary interface for workflow management, execution, and discovery.
The handler abstracts workflow complexity behind a simple interface, allowing components to execute multi-step processes without knowing the underlying orchestration details. It integrates with the ToolProvider interface to expose workflows as discoverable MCP tools.
func GetWorkflow ¶
func GetWorkflow() WorkflowHandler
GetWorkflow returns the registered workflow handler. This provides access to workflow definition management and execution functionality.
Returns nil if no handler has been registered yet. Callers should always check for nil before using the returned handler.
Returns:
- WorkflowHandler: The registered handler, or nil if not registered
Thread-safe: Yes, protected by handlerMutex read lock.
Example:
workflow := api.GetWorkflow()
if workflow == nil {
return fmt.Errorf("workflow handler not available")
}
result, err := workflow.ExecuteWorkflow(ctx, "deploy-app", args)
type WorkflowInputSchema ¶
type WorkflowInputSchema struct {
// Type specifies the overall schema type (typically "object" for workflow inputs)
Type string `yaml:"type" json:"type"`
// Args defines the individual input arguments and their schemas.
// Each property corresponds to a workflow input argument.
Args map[string]SchemaProperty `yaml:"args" json:"args"`
// Required lists the argument names that must be provided for workflow execution
Required []string `yaml:"required,omitempty" json:"required,omitempty"`
}
WorkflowInputSchema defines the input argument schema for a workflow. This provides structured validation and documentation for workflow inputs, following JSON Schema conventions for argument definition.
DEPRECATED: Use Args map[string]ArgDefinition instead
type WorkflowStep ¶
type WorkflowStep struct {
// ID is a unique identifier for this step within the workflow.
// Used for step referencing, error reporting, and execution flow control.
ID string `yaml:"id" json:"id"`
// Condition defines an optional condition that determines whether this step should execute.
// If specified, the condition tool is executed first. If the condition is not met,
// the step is skipped and marked as "skipped" in the execution results.
Condition *WorkflowCondition `yaml:"condition,omitempty" json:"condition,omitempty"`
// Tool specifies the name of the tool to execute for this step.
// Must correspond to an available tool in the aggregator.
Tool string `yaml:"tool" json:"tool"`
// Args provides the arguments to pass to the tool.
// Can include templated values that are resolved at runtime using previous step results.
Args map[string]interface{} `yaml:"args,omitempty" json:"args,omitempty"`
// AllowFailure indicates whether this step is allowed to fail without failing the workflow.
// When true, step failures are recorded but the workflow continues execution.
// The step result will be available for subsequent step conditions to reference.
AllowFailure bool `yaml:"allow_failure,omitempty" json:"allow_failure,omitempty"`
// Outputs defines how step results should be stored and made available to subsequent steps.
// Maps output variable names to result field paths.
Outputs map[string]interface{} `yaml:"outputs,omitempty" json:"outputs,omitempty"`
// Store indicates whether the step result should be stored in workflow results.
// When true, the step result is stored and accessible in subsequent steps and conditions.
Store bool `yaml:"store,omitempty" json:"store,omitempty"`
// Description provides human-readable documentation for this step's purpose
Description string `yaml:"description,omitempty" json:"description,omitempty"`
}
WorkflowStep defines a single step in a workflow execution. Each step represents a tool call with its arguments, result processing, and conditional execution logic.
type WorkflowUpdateRequest ¶
type WorkflowUpdateRequest struct {
// Name of the workflow to update (required).
Name string `json:"name" validate:"required"`
// Version can be updated to reflect changes.
Version string `json:"version,omitempty"`
// Description can be updated to improve documentation.
Description string `json:"description,omitempty"`
// Args can be modified to change arg requirements.
// Changes may affect existing callers of this workflow.
Args map[string]ArgDefinition `json:"args,omitempty"`
// Steps can be added, modified, or reordered.
// Changes affect workflow execution behavior.
Steps []WorkflowStep `json:"steps,omitempty"`
}
WorkflowUpdateRequest represents a request to update an existing workflow definition. This allows modification of workflow steps, input args, and metadata.
type WorkflowValidateRequest ¶
type WorkflowValidateRequest struct {
// Name for validation (required).
Name string `json:"name" validate:"required"`
// Version for validation.
Version string `json:"version,omitempty"`
// Description for validation.
Description string `json:"description,omitempty"`
// Args for validation.
Args map[string]ArgDefinition `json:"args,omitempty"`
// Steps for validation (required). All referenced tools will be checked for availability.
Steps []WorkflowStep `json:"steps" validate:"required"`
}
WorkflowValidateRequest represents a request to validate a workflow definition without creating it. Validates step configuration, tool availability, and arg schemas.