agent

package
v0.1.90 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package agent provides comprehensive MCP (Model Context Protocol) client and server implementations for debugging, testing, and integrating with the muster aggregator.

The agent package enables multiple interaction modes with MCP servers, from interactive debugging to AI assistant integration, with built-in caching, notifications, and comprehensive error handling.

Quick Start

For immediate MCP server interaction:

logger := agent.NewLogger(true, true, false)
client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
repl := agent.NewREPL(client, logger)
ctx := context.Background()
repl.Run(ctx) // Interactive REPL

For programmatic tool execution:

client := agent.NewClient("http://localhost:8090/sse", nil, agent.TransportSSE)
defer client.Close()
if err := client.Connect(ctx); err != nil {
    log.Fatal(err)
}
result, err := client.CallToolSimple(ctx, "core_service_list", nil)

Architecture Overview

The agent package follows a modular architecture with clear separation of concerns:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   REPL Mode     │    │   Agent Mode    │    │ MCP Server Mode │
│  (Interactive)  │    │  (Monitoring)   │    │ (AI Assistant)  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘
                                 │
                    ┌─────────────────┐
                    │     Client      │
                    │   (Core MCP)    │
                    └─────────────────┘
                                 │
                    ┌─────────────────┐
                    │   Transport     │
                    │ (SSE/HTTP-S)    │
                    └─────────────────┘

Core Components

## Client

The foundational MCP client handles protocol communication, connection management, caching, and notification processing. It supports both SSE and Streamable HTTP transports with automatic selection and graceful degradation.

Key features:

  • Thread-safe caching with diff tracking
  • Configurable timeouts and retry logic
  • Multiple output formats (text, JSON, structured)
  • Real-time notification handling

## REPL

Interactive Read-Eval-Print Loop with a modular command system, tab completion, command history, and real-time notification display. Commands are self-contained and extensible through the Command interface.

Available commands:

  • help (?): Command documentation and usage
  • list (ls): List tools, resources, prompts with filtering
  • describe (desc): Detailed capability information
  • call: Execute tools with argument validation
  • get: Retrieve resources and execute prompts
  • filter: Advanced pattern-based tool filtering
  • notifications: Toggle and manage real-time updates
  • prompt: Template-based prompt execution
  • exit (quit): Graceful session termination

## MCP Server

Exposes all agent functionality as MCP tools for AI assistant integration. Uses stdio transport for seamless integration with AI systems like Claude Desktop.

Exposed tools:

  • list_tools, list_resources, list_prompts: Capability discovery
  • describe_tool, describe_resource, describe_prompt: Detailed schemas
  • call_tool: Execute any available tool with validation
  • get_resource, get_prompt: Content retrieval and template execution
  • filter_tools, list_core_tools: Advanced filtering and categorization

## Logger

Structured logging system with multiple output modes:

  • Simple mode: User-friendly status messages
  • JSON-RPC mode: Complete protocol debugging
  • Verbose mode: Detailed operation tracking
  • Custom writers: Flexible output routing

## Formatters

Consistent formatting utilities for MCP data with support for both human-readable console output and structured JSON responses.

Transport Support

## SSE (Server-Sent Events) - Recommended

Real-time bidirectional communication with full notification support:

  • Persistent connection for continuous monitoring

  • Event streaming for immediate capability updates

  • Ideal for interactive use and AI assistant integration

    client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)

## Streamable HTTP - Compatibility

Request-response pattern for restricted environments:

  • No persistent connection or real-time notifications

  • Stateless operations suitable for automation

  • Ideal for CLI scripts and batch processing

    client := agent.NewClient("http://localhost:8090/streamable-http", logger, agent.TransportStreamableHTTP)

Operation Modes

## Agent Mode (Monitoring)

Connects to an MCP aggregator and monitors for real-time changes:

logger := agent.NewLogger(true, true, false)
client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
if err := client.Run(ctx); err != nil {
    log.Fatal(err)
}

Use cases:

  • Debugging aggregator behavior
  • Monitoring tool availability changes
  • Development and testing workflows
  • Real-time capability tracking

## REPL Mode (Interactive)

Provides an interactive command-line interface:

logger := agent.NewLogger(true, true, false)
client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
repl := agent.NewREPL(client, logger)
if err := repl.Run(ctx); err != nil {
    log.Fatal(err)
}

Features:

  • Tab completion for commands and tool names
  • Persistent command history
  • Real-time notification display
  • Flexible output formats
  • Graceful error recovery

## MCP Server Mode (AI Assistant Integration)

Exposes agent functionality as MCP tools:

logger := agent.NewLogger(true, true, false)
client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
server, err := agent.NewMCPServer(client, logger, false)
if err != nil {
    log.Fatal(err)
}
if err := server.Start(ctx); err != nil {
    log.Fatal(err)
}

AI assistant configuration:

{
  "mcpServers": {
    "muster": {
      "command": "muster",
      "args": ["agent", "--mcp-server", "--endpoint", "http://localhost:8090/sse"]
    }
  }
}

## CLI Mode (Programmatic)

Direct client usage for automation:

client := agent.NewClient("http://localhost:8090/sse", nil, agent.TransportSSE)
defer client.Close()
if err := client.Connect(ctx); err != nil {
    return err
}

// Execute multiple tools
tools := []string{"core_service_list", "core_serviceclass_list"}
for _, tool := range tools {
    result, err := client.CallToolSimple(ctx, tool, nil)
    if err != nil {
        log.Printf("Tool %s failed: %v", tool, err)
        continue
    }
    fmt.Printf("Tool %s result: %s\n", tool, result)
}

Caching and Performance

## Intelligent Caching

The agent implements multi-level caching for optimal performance:

  • Tools, resources, and prompts cached after initial retrieval

  • Diff tracking shows exactly what changed during notifications

  • Thread-safe access with RWMutex protection

  • Optional disable for always-fresh data

    client := agent.NewClient(endpoint, logger, transport) client.SetCacheEnabled(false) // Disable caching for testing

## Notification System

Real-time updates with intelligent cache management:

  • Automatic refresh triggered by change notifications
  • Change visualization shows added/removed/modified items
  • Background processing keeps UI responsive
  • Configurable notification filtering

## Performance Characteristics

  • Lazy loading: Data fetched only when needed
  • Concurrent operations: Multiple requests handled simultaneously
  • Connection pooling: Efficient resource utilization
  • Configurable timeouts: Prevent hanging operations

Advanced Features

## Custom Logging

// Silent mode for automation
logger := agent.NewDevNullLogger()

// Custom output destination
var buffer bytes.Buffer
logger := agent.NewLoggerWithWriter(true, false, false, &buffer)

// JSON-RPC protocol debugging
logger := agent.NewLogger(true, true, true)

## Tool Filtering and Search

Advanced filtering capabilities:

// REPL pattern matching
filter_tools pattern:core_* description_filter:service

// Programmatic filtering
tools := client.GetToolCache()
filtered := filterTools(tools, "core_*", "service", false)

## Resource Management

Comprehensive resource handling:

// Automatic MIME type detection
resource, err := client.GetResource(ctx, "file://config.yaml")
if err != nil {
    return err
}

// Handle different content types
for _, content := range resource.Contents {
    switch content.MIMEType {
    case "application/json":
        // Process JSON
    case "text/plain":
        // Process text
    default:
        // Handle binary or unknown
    }
}

## Prompt Templating

Dynamic prompt execution:

result, err := client.GetPrompt(ctx, "code_review", map[string]string{
    "language": "go",
    "style":    "google",
    "file":     "client.go",
})

Error Handling and Recovery

## Connection Recovery

  • Automatic reconnection on connection failures
  • Graceful degradation when notifications unavailable
  • Configurable timeout handling
  • Proper resource cleanup

## Error Classification

  • Protocol errors: MCP handshake and communication failures
  • Tool errors: Tool execution failures with context
  • Network errors: Transport-level failures with retry logic
  • Validation errors: Arg and schema mismatches

## Debugging Support

// Enable comprehensive debugging
logger := agent.NewLogger(true, true, true) // verbose, color, jsonRPC
client := agent.NewClient(endpoint, logger, transport)

// Inspect client state
tools := client.GetToolCache()
resources := client.GetResourceCache()
prompts := client.GetPromptCache()

Testing Patterns

## Unit Testing

func TestToolExecution(t *testing.T) {
    logger := agent.NewDevNullLogger()
    client := agent.NewClient(testServerURL, logger, agent.TransportSSE)
    defer client.Close()

    if err := client.Connect(ctx); err != nil {
        t.Fatalf("Connection failed: %v", err)
    }

    result, err := client.CallToolSimple(ctx, "test_tool", nil)
    if err != nil {
        t.Fatalf("Tool execution failed: %v", err)
    }

    if result != "expected_result" {
        t.Errorf("Expected 'expected_result', got '%s'", result)
    }
}

## Integration Testing

func TestREPLIntegration(t *testing.T) {
    logger := agent.NewDevNullLogger()
    client := agent.NewClient(testServerURL, logger, agent.TransportSSE)
    repl := agent.NewREPL(client, logger)

    // Test command execution
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    go func() {
        if err := repl.Run(ctx); err != nil {
            t.Errorf("REPL failed: %v", err)
        }
    }()

    // Verify functionality
    tools := client.GetToolCache()
    if len(tools) == 0 {
        t.Error("No tools available")
    }
}

## Mock Testing

func TestWithMockServer(t *testing.T) {
    mockServer := startMockMCPServer(t)
    defer mockServer.Close()

    client := agent.NewClient(mockServer.URL, nil, agent.TransportSSE)
    defer client.Close()

    // Test with predictable responses
    result, err := client.CallToolSimple(ctx, "mock_tool", nil)
    assert.NoError(t, err)
    assert.Equal(t, "mock_result", result)
}

Performance Optimization

## Connection Management

// Reuse connections for multiple operations
client := agent.NewClient(endpoint, logger, transport)
defer client.Close()

if err := client.Connect(ctx); err != nil {
    return err
}

// Multiple operations on same connection
for _, operation := range operations {
    result, err := client.CallToolSimple(ctx, operation.Tool, operation.Args)
    // Process result
}

## Batch Operations

// Concurrent tool execution
var wg sync.WaitGroup
results := make(chan toolResult, len(tools))

for _, tool := range tools {
    wg.Add(1)
    go func(toolName string) {
        defer wg.Done()
        result, err := client.CallToolSimple(ctx, toolName, nil)
        results <- toolResult{tool: toolName, result: result, err: err}
    }(tool)
}

wg.Wait()
close(results)

## Memory Management

// Disable caching for memory-constrained environments
client.SetCacheEnabled(false)

// Use streaming for large resources
resource, err := client.GetResource(ctx, "large://dataset.json")
if err != nil {
    return err
}
// Process resource content in chunks

Thread Safety

All operations are designed for concurrent use:

## Cache Protection

  • RWMutex locks protect cache data during reads and writes
  • Atomic operations for simple state changes
  • Consistent lock ordering prevents deadlocks

## Notification Handling

  • Background goroutines process notifications without blocking
  • Channel communication for safe cross-goroutine data sharing
  • Context cancellation for graceful shutdown

## Resource Management

  • Proper cleanup ensures resources are released
  • Context propagation enables operation cancellation
  • Timeout handling prevents resource leaks

Integration Examples

## CI/CD Pipeline Integration

#!/bin/bash
# Deploy services using muster agent
muster agent --endpoint http://muster:8090/streamable-http --execute <<EOF
call core_service_create '{"name": "web-app", "serviceclass": "webapp", "args": {"replicas": 3}}'
call core_service_start '{"name": "web-app"}'
EOF

## Monitoring Script

func monitorServices(ctx context.Context) error {
    client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
    defer client.Close()

    if err := client.Connect(ctx); err != nil {
        return err
    }

    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            services, err := client.CallToolJSON(ctx, "core_service_list", nil)
            if err != nil {
                log.Printf("Failed to list services: %v", err)
                continue
            }
            // Process services
        }
    }
}

## Custom AI Assistant Tool

func createCustomMCPTool() {
    client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
    server, err := agent.NewMCPServer(client, logger, true)
    if err != nil {
        log.Fatal(err)
    }

    // Add custom tools
    server.AddCustomTool("deploy_stack", func(args map[string]interface{}) (interface{}, error) {
        // Custom deployment logic
        return client.CallToolJSON(ctx, "core_service_create", args)
    })

    server.Start(context.Background())
}

This package provides a comprehensive foundation for MCP client development, testing, and integration within the muster ecosystem, with a focus on reliability, performance, and developer experience.

Index

Constants

View Source
const AuthMetaKey = "giantswarm.io/auth_required"

AuthMetaKey is the namespaced key for auth metadata in MCP _meta field. Following MCP specification for custom metadata keys.

View Source
const AuthPollInterval = 30 * time.Second

AuthPollInterval is the interval for polling auth status from the server.

View Source
const AuthStatusResourceURI = "auth://status"

AuthStatusResourceURI is the URI for the auth status MCP resource.

View Source
const StateAuthRequired = "[AUTH REQUIRED]"

StateAuthRequired is the indicator shown in the REPL prompt when servers require authentication. This is displayed prominently in uppercase because it requires user action (running 'auth login'). Exported for use by external tools that need to interpret REPL output.

Variables

This section is empty.

Functions

func RegisterClientToolsOnServer

func RegisterClientToolsOnServer(mcpServer *server.MCPServer, client *Client)

RegisterClientToolsOnServer registers all meta-tools from a connected client onto an MCP server. This is used to upgrade a pending auth server to a full server after authentication.

The tools use the transport bridge pattern (Issue #344) where each handler forwards to the corresponding server meta-tool via the client.

Types

type AuthRequiredInfo

type AuthRequiredInfo struct {
	Server   string
	Issuer   string
	Scope    string
	AuthTool string
}

AuthRequiredInfo contains information about a server requiring authentication.

type AuthenticateResponse

type AuthenticateResponse struct {
	Status       string `json:"status"`
	AuthURL      string `json:"auth_url"`
	ClickableURL string `json:"clickable_url"`
	Message      string `json:"message"`
}

AuthenticateResponse is the structured response from the authenticate_muster tool.

type Client

type Client struct {
	NotificationChan chan mcp.JSONRPCNotification
	// contains filtered or unexported fields
}

Client represents an MCP client that provides comprehensive Model Context Protocol support. It handles protocol communication, connection management, caching, and notification processing. The client supports multiple transport types (SSE and Streamable HTTP) and can operate in various modes: agent monitoring, interactive REPL, programmatic CLI, and MCP server backend.

Key features:

  • Protocol handshake and session management
  • Tool, resource, and prompt operations with caching
  • Real-time notification handling (SSE transport)
  • Thread-safe concurrent operations
  • Configurable timeouts and error handling
  • Multiple output formats (text, JSON)
  • OAuth 2.1 authentication support via mcp-go's transport layer

func NewClient

func NewClient(endpoint string, logger *Logger, transport TransportType) *Client

NewClient creates a new MCP client with the specified endpoint, logger, and transport type.

Args:

  • endpoint: The MCP server endpoint URL (e.g., "http://localhost:8090/sse")
  • logger: Logger instance for structured logging, or nil to disable logging
  • transport: Transport type (TransportSSE or TransportStreamableHTTP)

The client is created with default settings:

  • 30-second timeout for operations
  • Caching enabled for tools, resources, and prompts
  • 10-item notification channel buffer

Example:

logger := agent.NewLogger(true, true, false)
client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)

func (*Client) CallTool

func (c *Client) CallTool(ctx context.Context, name string, args map[string]interface{}) (*mcp.CallToolResult, error)

CallTool executes a tool on the MCP server with the provided arguments. This method transparently wraps all non-meta-tool calls through the call_tool meta-tool, implementing the server-side meta-tools pattern (Issue #343).

Meta-tools (list_tools, call_tool, describe_tool, etc.) are called directly. All other tools (core_*, workflow_*, x_*) are wrapped through call_tool.

Args:

  • ctx: Context for cancellation and timeout control
  • name: The exact name of the tool to execute
  • args: Tool arguments as a map of arg names to values

Returns:

  • CallToolResult: Complete tool execution result including content and metadata
  • error: Any execution or communication errors

The method performs connection validation, constructs the proper MCP request, applies timeout controls, and returns the raw result for further processing. Use CallToolSimple() or CallToolJSON() for more convenient result handling.

Example:

result, err := client.CallTool(ctx, "core_service_list", map[string]interface{}{
    "namespace": "default",
})
if err != nil {
    return fmt.Errorf("tool execution failed: %w", err)
}

func (*Client) CallToolJSON

func (c *Client) CallToolJSON(ctx context.Context, name string, args map[string]interface{}) (interface{}, error)

CallToolJSON executes a tool and returns the result as parsed JSON. This is a convenience method for tools that return structured data. If the result is not valid JSON, it returns the text content as-is.

Args:

  • ctx: Context for cancellation and timeout control
  • name: The exact name of the tool to execute
  • args: Tool arguments as a map of arg names to values

Returns:

  • interface{}: Parsed JSON data structure, or string if not valid JSON
  • error: Tool execution errors or parsing errors

The method handles JSON parsing gracefully - if the tool result is not valid JSON, it returns the raw text instead of failing. This makes it suitable for tools that may return either structured or unstructured data.

Example:

result, err := client.CallToolJSON(ctx, "core_service_get", map[string]interface{}{
    "name": "web-app",
})
if err != nil {
    return err
}
// result is now a parsed JSON structure
serviceData := result.(map[string]interface{})

func (*Client) CallToolSimple

func (c *Client) CallToolSimple(ctx context.Context, name string, args map[string]interface{}) (string, error)

CallToolSimple executes a tool and returns the first text content as a string. This is a convenience method that handles the most common use case of tool execution where you expect a simple text response.

Args:

  • ctx: Context for cancellation and timeout control
  • name: The exact name of the tool to execute
  • args: Tool arguments as a map of arg names to values

Returns:

  • string: The first text content from the tool result, or empty string if no text content
  • error: Tool execution errors or tool-reported errors

The method automatically handles:

  • Tool execution errors (network, timeout, etc.)
  • Tool-reported errors (IsError flag in result)
  • Content extraction (returns first text content found)
  • Empty result handling

Use CallTool() for full result access or CallToolJSON() for structured data.

Example:

result, err := client.CallToolSimple(ctx, "core_service_list", nil)
if err != nil {
    return fmt.Errorf("failed to list services: %w", err)
}
fmt.Println("Services:", result)

func (*Client) CallToolWithTimeout

func (c *Client) CallToolWithTimeout(ctx context.Context, name string, args map[string]interface{}, timeout time.Duration) (*mcp.CallToolResult, error)

CallToolWithTimeout executes a tool with a custom timeout. Like CallTool, this method transparently wraps non-meta-tool calls through call_tool.

func (*Client) Close

func (c *Client) Close() error

Close closes the MCP client connection and cleans up resources. This method should be called when the client is no longer needed to ensure proper cleanup of network connections and background goroutines.

It's safe to call Close multiple times; subsequent calls are no-ops.

Example:

client := agent.NewClient(endpoint, logger, transport)
defer client.Close()

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect establishes a connection to the MCP aggregator for programmatic CLI usage. Unlike Run(), this method performs only the connection and initialization steps without entering the monitoring loop, making it suitable for scripting and automation.

The method performs:

  • Transport-specific client creation and connection
  • MCP protocol handshake and session initialization
  • Connection validation and error handling

After successful connection, the client is ready for tool execution via CallTool, CallToolSimple, CallToolJSON methods, as well as resource and prompt operations.

Example:

client := agent.NewClient("http://localhost:8090/sse", nil, agent.TransportSSE)
defer client.Close()
if err := client.Connect(ctx); err != nil {
    return fmt.Errorf("connection failed: %w", err)
}
// Now ready for operations
result, err := client.CallToolSimple(ctx, "core_service_list", nil)

func (*Client) GetAgentTokenStore added in v0.0.231

func (c *Client) GetAgentTokenStore() *agentoauth.AgentTokenStore

GetAgentTokenStore returns the agent token store, if configured.

func (*Client) GetAuthRequired

func (c *Client) GetAuthRequired() []AuthRequiredInfo

GetAuthRequired fetches the auth://status resource and returns a list of servers requiring authentication. This method is used by the agent to detect which remote MCP servers need OAuth authentication.

Returns an empty slice if no servers require authentication or if the resource cannot be fetched.

func (*Client) GetEndpoint

func (c *Client) GetEndpoint() string

GetEndpoint returns the current endpoint URL.

func (*Client) GetFormatters

func (c *Client) GetFormatters() interface{}

GetFormatters returns the formatters instance used by this client. The formatters provide consistent formatting utilities for MCP data, supporting both human-readable console output and structured JSON responses.

This method is primarily used by the command system to access formatting capabilities for tools, resources, and prompts.

func (*Client) GetHeaders

func (c *Client) GetHeaders() map[string]string

GetHeaders returns a copy of the current headers.

func (*Client) GetPrompt

func (c *Client) GetPrompt(ctx context.Context, name string, args map[string]string) (*mcp.GetPromptResult, error)

GetPrompt retrieves a prompt template from the MCP server and executes it with the given arguments. Prompts are template-based text generation tools that can be argeterized for different contexts.

Args:

  • ctx: Context for cancellation and timeout control
  • name: The exact name of the prompt to retrieve
  • args: Template arguments as a map of arg names to string values

Returns:

  • GetPromptResult: Complete prompt result including generated messages and metadata
  • error: Any retrieval, templating, or communication errors

The method handles:

  • Connection validation
  • Template argument processing
  • Request logging with prompt name context
  • Timeout management
  • Error handling and logging

Prompt results contain a Messages field with generated content that can include text, images, or other media types depending on the prompt implementation.

Example:

result, err := client.GetPrompt(ctx, "code_review", map[string]string{
    "language": "go",
    "style":    "google",
    "file":     "client.go",
})
if err != nil {
    return fmt.Errorf("failed to get prompt: %w", err)
}
for _, message := range result.Messages {
    fmt.Printf("Role: %s, Content: %v\n", message.Role, message.Content)
}

func (*Client) GetPromptByName

func (c *Client) GetPromptByName(name string) *mcp.Prompt

GetPromptByName finds a specific prompt by name from the cached prompt list. This method does not refresh the cache; call ListPromptsFromServer first if you need fresh data.

Args:

  • name: The exact name of the prompt to find

Returns:

  • *mcp.Prompt: Pointer to the found prompt, or nil if not found

func (*Client) GetPromptCache

func (c *Client) GetPromptCache() []mcp.Prompt

GetPromptCache returns a copy of the currently cached prompts. This method is thread-safe and returns the prompts that were last retrieved from the MCP server. The cache is automatically updated when notifications are received (for SSE transport) or can be manually refreshed by calling prompt listing operations.

Returns an empty slice if no prompts have been cached yet or if caching is disabled.

func (*Client) GetResource

func (c *Client) GetResource(ctx context.Context, uri string) (*mcp.ReadResourceResult, error)

GetResource reads a resource from the MCP server and returns its content. Resources are identified by URI and can contain various types of content including text, binary data, or structured information.

Args:

  • ctx: Context for cancellation and timeout control
  • uri: The resource URI to retrieve (e.g., "file://config.yaml", "memory://cache/data")

Returns:

  • ReadResourceResult: Complete resource data including content and metadata
  • error: Any retrieval or communication errors

The method handles:

  • Connection validation
  • Request logging (if logger available)
  • Timeout management
  • Error handling and logging

Resource content can be accessed through the Contents field, which may contain multiple content items with different MIME types.

Example:

resource, err := client.GetResource(ctx, "file://config.yaml")
if err != nil {
    return fmt.Errorf("failed to read config: %w", err)
}
for _, content := range resource.Contents {
    if textContent, ok := mcp.AsTextResourceContents(content); ok {
        fmt.Println("Config:", textContent.Text)
    }
}

func (*Client) GetResourceByURI

func (c *Client) GetResourceByURI(uri string) *mcp.Resource

GetResourceByURI finds a specific resource by URI from the cached resource list. This method does not refresh the cache; call ListResourcesFromServer first if you need fresh data.

Args:

  • uri: The exact URI of the resource to find

Returns:

  • *mcp.Resource: Pointer to the found resource, or nil if not found

func (*Client) GetResourceCache

func (c *Client) GetResourceCache() []mcp.Resource

GetResourceCache returns a copy of the currently cached resources. This method is thread-safe and returns the resources that were last retrieved from the MCP server. The cache is automatically updated when notifications are received (for SSE transport) or can be manually refreshed by calling resource listing operations.

Returns an empty slice if no resources have been cached yet or if caching is disabled.

func (*Client) GetServerInfo

func (c *Client) GetServerInfo() *ServerInfo

GetServerInfo returns the server information obtained during MCP initialization. This includes the server's name and version as reported by the server.

Returns nil if the client has not been initialized yet.

Example:

client := agent.NewClient(endpoint, nil, agent.TransportStreamableHTTP)
if err := client.Connect(ctx); err != nil {
    return err
}
defer client.Close()
info := client.GetServerInfo()
fmt.Printf("Connected to %s version %s\n", info.Name, info.Version)

func (*Client) GetToolByName

func (c *Client) GetToolByName(name string) *mcp.Tool

GetToolByName finds a specific tool by name from the cached tool list. This method does not refresh the cache; call ListToolsFromServer first if you need fresh data.

Args:

  • name: The exact name of the tool to find

Returns:

  • *mcp.Tool: Pointer to the found tool, or nil if not found

func (*Client) GetToolCache

func (c *Client) GetToolCache() []mcp.Tool

GetToolCache returns a copy of the currently cached tools. This method is thread-safe and returns the tools that were last retrieved from the MCP server. The cache is automatically updated when notifications are received (for SSE transport) or can be manually refreshed by calling tool listing operations.

Returns an empty slice if no tools have been cached yet or if caching is disabled.

func (*Client) InitializeAndLoadData

func (c *Client) InitializeAndLoadData(ctx context.Context) error

InitializeAndLoadData performs the standard initialization and data loading sequence for agent mode operation. This method combines protocol initialization with initial cache population for tools, resources, and prompts.

The method performs these steps in sequence:

  1. MCP protocol handshake and session initialization
  2. Initial tool listing and cache population
  3. Initial resource listing and cache population
  4. Initial prompt listing and cache population

This method is typically used by Run() for agent mode operation, but can also be called directly when you need both connection and initial cache loading.

Use Connect() instead if you only need connection without cache pre-loading.

func (*Client) ListPromptsFromServer

func (c *Client) ListPromptsFromServer(ctx context.Context) ([]mcp.Prompt, error)

ListPromptsFromServer retrieves all prompts from the MCP server using the native MCP protocol. This method refreshes the cache and returns the complete list of prompts.

Args:

  • ctx: Context for cancellation and timeout control

Returns:

  • []mcp.Prompt: Slice of all available prompts from the server
  • error: Any connection or retrieval errors

func (*Client) ListResourcesFromServer

func (c *Client) ListResourcesFromServer(ctx context.Context) ([]mcp.Resource, error)

ListResourcesFromServer retrieves all resources from the MCP server using the native MCP protocol. This method refreshes the cache and returns the complete list of resources.

Args:

  • ctx: Context for cancellation and timeout control

Returns:

  • []mcp.Resource: Slice of all available resources from the server
  • error: Any connection or retrieval errors

func (*Client) ListToolsFromServer

func (c *Client) ListToolsFromServer(ctx context.Context) ([]mcp.Tool, error)

ListToolsFromServer retrieves all tools from the MCP server using the native MCP protocol. This method refreshes the cache and returns the complete list of tools.

Args:

  • ctx: Context for cancellation and timeout control

Returns:

  • []mcp.Tool: Slice of all available tools from the server
  • error: Any connection or retrieval errors

func (*Client) Reconnect

func (c *Client) Reconnect(ctx context.Context, newEndpoint string) error

Reconnect closes the current connection and reconnects to a new endpoint. This method is used when switching contexts to connect to a different muster aggregator without restarting the REPL.

Args:

  • ctx: Context for cancellation and timeout control
  • newEndpoint: The new MCP server endpoint URL to connect to

The method performs:

  • Closes the existing connection
  • Updates the endpoint
  • Creates a new connection
  • Reinitializes the session and reloads all cached data

Example:

if err := client.Reconnect(ctx, "https://new-server.example.com/mcp"); err != nil {
    return fmt.Errorf("reconnection failed: %w", err)
}

func (*Client) RefreshPromptCache

func (c *Client) RefreshPromptCache(ctx context.Context) error

RefreshPromptCache forces a refresh of the prompt cache from the MCP server.

func (*Client) RefreshResourceCache

func (c *Client) RefreshResourceCache(ctx context.Context) error

RefreshResourceCache forces a refresh of the resource cache from the MCP server.

func (*Client) RefreshToolCache

func (c *Client) RefreshToolCache(ctx context.Context) error

RefreshToolCache forces a refresh of the tool cache from the MCP server.

func (*Client) Run

func (c *Client) Run(ctx context.Context) error

Run executes the complete agent workflow for monitoring mode. This method establishes connection, performs initialization, loads initial data, and then enters a monitoring loop to handle real-time notifications.

The workflow consists of:

  1. Connect to the MCP aggregator using the configured transport
  2. Perform MCP protocol handshake
  3. Load initial tools, resources, and prompts into cache
  4. Enter notification listening loop (SSE transport only)
  5. Handle capability change notifications and update caches

For SSE transport, the method will block until the context is cancelled, continuously monitoring for server notifications. For Streamable HTTP transport, the method returns immediately after initial data loading since notifications are not supported.

Use Connect() instead for programmatic CLI usage without monitoring.

func (*Client) SetCacheEnabled

func (c *Client) SetCacheEnabled(enabled bool)

SetCacheEnabled enables or disables client-side caching of tools, resources, and prompts. When caching is disabled, every operation will fetch fresh data from the server, which is useful for testing scenarios or when you need always-current data.

Args:

  • enabled: Whether to enable caching (true) or disable it (false)

Disabling caching also disables diff tracking and change notifications since there's no previous state to compare against. This affects the behavior of notification handling in agent mode.

Example:

client := agent.NewClient(endpoint, logger, transport)
client.SetCacheEnabled(false) // Disable caching for testing
defer client.Close()

func (*Client) SetHeader

func (c *Client) SetHeader(key, value string)

SetHeader sets a custom HTTP header for requests.

func (*Client) SetOAuthConfig added in v0.0.231

func (c *Client) SetOAuthConfig(config transport.OAuthConfig, tokenStore *agentoauth.AgentTokenStore)

SetOAuthConfig configures the client to use mcp-go's built-in OAuth transport. When set, the transport automatically injects Bearer tokens and returns typed OAuthAuthorizationRequiredError on 401 instead of raw HTTP errors.

The AgentTokenStore provides tokens to mcp-go, and mcp-go handles refresh and 401 detection automatically.

func (*Client) SetTimeout

func (c *Client) SetTimeout(timeout time.Duration)

SetTimeout configures the timeout duration for MCP operations. This timeout applies to all network operations including tool calls, resource retrieval, prompt execution, and capability listing.

Args:

  • timeout: The timeout duration for operations (e.g., 30*time.Second)

The default timeout is 30 seconds. Setting a shorter timeout can help with responsive UX but may cause failures with slow operations. Setting a longer timeout is useful for complex tools or slow networks.

Example:

client := agent.NewClient(endpoint, logger, transport)
client.SetTimeout(60 * time.Second) // 1 minute timeout for slow operations
defer client.Close()

func (*Client) SetTimeoutForComplexOperations

func (c *Client) SetTimeoutForComplexOperations()

SetTimeoutForComplexOperations sets a longer timeout specifically for complex operations like workflow execution that may take longer than the default timeout.

func (*Client) SupportsNotifications

func (c *Client) SupportsNotifications() bool

SupportsNotifications returns whether the current transport supports real-time notifications. SSE transport supports notifications for real-time capability updates, while Streamable HTTP transport does not support notifications and operates in request-response mode only.

This method is used by the REPL and command system to determine whether to enable notification-dependent features like real-time updates and change monitoring.

type Formatters

type Formatters struct{}

Formatters provides utilities for formatting MCP data consistently. It supports both human-readable console output and structured JSON responses for tools, resources, and prompts. The formatters ensure consistent presentation across different output modes (REPL, CLI, MCP server).

Key features:

  • Console-friendly formatting with numbering and alignment
  • JSON formatting for structured data consumption
  • Search and lookup utilities for cached data
  • Consistent error handling and fallback formatting

func NewFormatters

func NewFormatters() *Formatters

NewFormatters creates a new formatters instance. The formatters instance is stateless and can be safely used concurrently.

func (*Formatters) FindPrompt

func (f *Formatters) FindPrompt(prompts []mcp.Prompt, name string) *mcp.Prompt

FindPrompt searches for a prompt by name in the provided prompt list. This is a utility method for command implementations and internal lookups.

Args:

  • prompts: Slice of prompts to search in
  • name: Exact name of the prompt to find

Returns:

  • Pointer to the found prompt, or nil if not found

The search is case-sensitive and requires exact name matching.

func (*Formatters) FindResource

func (f *Formatters) FindResource(resources []mcp.Resource, uri string) *mcp.Resource

FindResource searches for a resource by URI in the provided resource list. This is a utility method for command implementations and internal lookups.

Args:

  • resources: Slice of resources to search in
  • uri: Exact URI of the resource to find

Returns:

  • Pointer to the found resource, or nil if not found

The search is case-sensitive and requires exact URI matching.

func (*Formatters) FindTool

func (f *Formatters) FindTool(tools []mcp.Tool, name string) *mcp.Tool

FindTool searches for a tool by name in the provided tool list. This is a utility method for command implementations and internal lookups.

Args:

  • tools: Slice of tools to search in
  • name: Exact name of the tool to find

Returns:

  • Pointer to the found tool, or nil if not found

The search is case-sensitive and requires exact name matching.

func (*Formatters) FormatAuthChallenge

func (f *Formatters) FormatAuthChallenge(challenge *api.AuthChallenge) string

FormatAuthChallenge formats an authentication challenge for user display. This produces a user-friendly message with clear instructions on how to authenticate.

Args:

  • challenge: The authentication challenge to format

Returns:

  • Formatted string with authentication instructions

Output format:

[Authentication Required]
Server: mcp-kubernetes

Authentication is required to access this resource.
Please visit the following URL to authenticate:

https://auth.example.com/authorize?...

After authenticating, return here and retry your request.

func (*Formatters) FormatAuthChallengeWithBrowserStatus

func (f *Formatters) FormatAuthChallengeWithBrowserStatus(challenge *api.AuthChallenge, browserOpened bool) string

FormatAuthChallengeWithBrowserStatus formats an authentication challenge for user display. This produces a user-friendly message with clear instructions on how to authenticate, including information about whether the browser was opened.

Args:

  • challenge: The authentication challenge to format
  • browserOpened: Whether the browser was successfully opened

Returns:

  • Formatted string with authentication instructions

func (*Formatters) FormatPromptDetail

func (f *Formatters) FormatPromptDetail(prompt mcp.Prompt) string

FormatPromptDetail formats detailed information about a single prompt for console display. This provides comprehensive information including the prompt's arguments and their requirements, which is essential for understanding how to use the prompt.

Args:

  • prompt: The prompt to format detailed information for

Returns:

  • Multi-line formatted string with prompt details and arguments

Output format:

Prompt: code_review
Description: Review code for quality
Arguments:
  - language (required): Programming language
  - style: Code style to enforce

Arguments are listed with their names, requirements, and descriptions.

func (*Formatters) FormatPromptDetailJSON

func (f *Formatters) FormatPromptDetailJSON(prompt mcp.Prompt) (string, error)

FormatPromptDetailJSON formats detailed prompt information as structured JSON. This format includes argument specifications and is used for programmatic consumption and prompt introspection.

Args:

  • prompt: The prompt to format detailed information for

Returns:

  • JSON string containing complete prompt information including arguments
  • error: JSON marshaling errors (should be rare)

Output format:

{
  "name": "code_review",
  "description": "Review code for quality",
  "arguments": [
    {
      "name": "language",
      "description": "Programming language",
      "required": true
    }
  ]
}

The arguments field is only included if the prompt has arguments.

func (*Formatters) FormatPromptsList

func (f *Formatters) FormatPromptsList(prompts []mcp.Prompt) string

FormatPromptsList formats a list of prompts for human-readable console output. The output includes numbering, aligned columns, and descriptions to make it easy to browse available prompts interactively.

Args:

  • prompts: Slice of prompts to format

Returns:

  • Formatted string with numbered list of prompts and descriptions

Output format:

Available prompts (N):
  1. code_review                  - Review code for quality
  2. documentation                - Generate documentation

If no prompts are available, returns a user-friendly message.

func (*Formatters) FormatPromptsListJSON

func (f *Formatters) FormatPromptsListJSON(prompts []mcp.Prompt) (string, error)

FormatPromptsListJSON formats a list of prompts as structured JSON. This format is used for programmatic consumption, MCP server responses, and integration with external tools that expect structured data.

Args:

  • prompts: Slice of prompts to format

Returns:

  • JSON string containing array of prompt objects with name and description
  • error: JSON marshaling errors (should be rare)

Output format:

[
  {
    "name": "code_review",
    "description": "Review code for quality"
  }
]

If no prompts are available, returns a simple message string.

func (*Formatters) FormatResourceDetail

func (f *Formatters) FormatResourceDetail(resource mcp.Resource) string

FormatResourceDetail formats detailed information about a single resource for console display. This provides comprehensive metadata about the resource including URI, name, description, and MIME type when available.

Args:

  • resource: The resource to format detailed information for

Returns:

  • Multi-line formatted string with resource details

Output format:

Resource: file://config.yaml
Name: config.yaml
Description: Configuration file
MIME Type: application/yaml

Optional fields (description, MIME type) are only included if present.

func (*Formatters) FormatResourceDetailJSON

func (f *Formatters) FormatResourceDetailJSON(resource mcp.Resource) (string, error)

FormatResourceDetailJSON formats detailed resource information as structured JSON. This format includes all available resource metadata and is used for programmatic consumption and resource introspection.

Args:

  • resource: The resource to format detailed information for

Returns:

  • JSON string containing complete resource information
  • error: JSON marshaling errors (should be rare)

Output format:

{
  "uri": "file://config.yaml",
  "name": "config.yaml",
  "description": "Configuration file",
  "mimeType": "application/yaml"
}

func (*Formatters) FormatResourcesList

func (f *Formatters) FormatResourcesList(resources []mcp.Resource) string

FormatResourcesList formats a list of resources for human-readable console output. The output includes numbering, aligned columns for URIs, and descriptions to make it easy to browse available resources interactively.

Args:

  • resources: Slice of resources to format

Returns:

  • Formatted string with numbered list of resources and descriptions

Output format:

Available resources (N):
  1. file://config.yaml                    - Configuration file
  2. memory://cache/data                   - Cached data

If a resource has no description, the name field is used as a fallback. If no resources are available, returns a user-friendly message.

func (*Formatters) FormatResourcesListJSON

func (f *Formatters) FormatResourcesListJSON(resources []mcp.Resource) (string, error)

FormatResourcesListJSON formats a list of resources as structured JSON. This format is used for programmatic consumption, MCP server responses, and integration with external tools that expect structured data.

Args:

  • resources: Slice of resources to format

Returns:

  • JSON string containing array of resource objects with URI, name, description, and MIME type
  • error: JSON marshaling errors (should be rare)

Output format:

[
  {
    "uri": "file://config.yaml",
    "name": "config.yaml",
    "description": "Configuration file",
    "mimeType": "application/yaml"
  }
]

If a resource has no description, the name field is used as a fallback. If no resources are available, returns a simple message string.

func (*Formatters) FormatToolDetail

func (f *Formatters) FormatToolDetail(tool mcp.Tool) string

FormatToolDetail formats detailed information about a single tool for console display. This provides comprehensive information including the tool's input schema, which is essential for understanding how to use the tool.

Args:

  • tool: The tool to format detailed information for

Returns:

  • Multi-line formatted string with tool details and schema

Output format:

Tool: tool_name
Description: Tool description
Input Schema:
{
  "type": "object",
  "properties": { ... }
}

func (*Formatters) FormatToolDetailJSON

func (f *Formatters) FormatToolDetailJSON(tool mcp.Tool) (string, error)

FormatToolDetailJSON formats detailed tool information as structured JSON. This format includes the complete tool schema and is used for programmatic consumption and tool introspection.

Args:

  • tool: The tool to format detailed information for

Returns:

  • JSON string containing complete tool information including schema
  • error: JSON marshaling errors (should be rare)

Output format:

{
  "name": "tool_name",
  "description": "Tool description",
  "inputSchema": { ... }
}

func (*Formatters) FormatToolsList

func (f *Formatters) FormatToolsList(tools []mcp.Tool) string

FormatToolsList formats a list of tools for human-readable console output. The output includes numbering, aligned columns, and descriptive text to make it easy to browse available tools interactively.

Args:

  • tools: Slice of tools to format

Returns:

  • Formatted string with numbered list of tools and descriptions

Output format:

Available tools (N):
  1. tool_name                    - Tool description
  2. another_tool                 - Another description

If no tools are available, returns a user-friendly message.

func (*Formatters) FormatToolsListJSON

func (f *Formatters) FormatToolsListJSON(tools []mcp.Tool) (string, error)

FormatToolsListJSON formats a list of tools as structured JSON. This format is used for programmatic consumption, MCP server responses, and integration with external tools that expect structured data.

Args:

  • tools: Slice of tools to format

Returns:

  • JSON string containing array of tool objects with name and description
  • error: JSON marshaling errors (should be rare)

Output format:

[
  {
    "name": "tool_name",
    "description": "Tool description"
  }
]

If no tools are available, returns a simple message string.

func (*Formatters) IsAuthChallenge

func (f *Formatters) IsAuthChallenge(result *mcp.CallToolResult) *api.AuthChallenge

IsAuthChallenge checks if a tool result contains an authentication challenge. It parses the first text content as JSON and checks for the "auth_required" status.

Args:

  • result: The tool result to check

Returns:

  • *api.AuthChallenge: The parsed challenge if found, nil otherwise

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger provides structured logging for the agent with multiple output modes. It supports different logging levels, output formats, and destinations to accommodate various use cases from interactive debugging to automated testing.

Key features:

  • Multiple output modes: simple, verbose, JSON-RPC protocol debugging
  • Color-coded output for better readability in terminals
  • Configurable output destinations (stdout, files, custom writers)
  • MCP protocol-aware formatting for requests, responses, and notifications
  • User-facing output separation from system logging

Logging modes:

  • Simple mode: User-friendly status messages without technical details
  • Verbose mode: Detailed operation tracking and debug information
  • JSON-RPC mode: Complete protocol debugging with full message content

func NewDevNullLogger

func NewDevNullLogger() *Logger

NewDevNullLogger creates a logger that discards all output. This is useful for testing scenarios and automated scripts where logging output should be suppressed completely to avoid noise in test output or production logs.

Returns:

  • Logger instance that discards all output

Example:

logger := agent.NewDevNullLogger()
client := agent.NewClient(endpoint, logger, transport)
// All logging output will be discarded

func NewLogger

func NewLogger(verbose, useColor, jsonRPCMode bool) *Logger

NewLogger creates a new logger with the specified configuration. This is the primary constructor for logger instances with customizable behavior for different use cases.

Args:

  • verbose: Enable detailed debug output and operation tracking
  • useColor: Use ANSI color codes for enhanced terminal readability
  • jsonRPCMode: Enable complete JSON-RPC protocol message logging

Returns:

  • Configured logger instance writing to stdout by default

Example:

// Interactive debugging with colors and full protocol logging
logger := agent.NewLogger(true, true, true)

// Production mode with minimal output
logger := agent.NewLogger(false, false, false)

func (*Logger) Debug

func (l *Logger) Debug(format string, args ...interface{})

Debug logs a debug message that is only shown in verbose mode. This is used for detailed operation tracking and troubleshooting information that would be too noisy for normal operation.

Args:

  • format: Printf-style format string
  • args: Arguments for the format string

The message is colored gray when colors are enabled and only appears when verbose mode is active.

func (*Logger) Error

func (l *Logger) Error(format string, args ...interface{})

Error logs an error message with timestamp and red coloring. This is used for error conditions, failures, and other problems that need immediate attention.

Args:

  • format: Printf-style format string
  • args: Arguments for the format string

Output format: [timestamp] message (in red when colors enabled)

func (*Logger) Info

func (l *Logger) Info(format string, args ...interface{})

Info logs an informational message with timestamp. This is used for general status updates and operational information that should be visible in normal operation.

Args:

  • format: Printf-style format string
  • args: Arguments for the format string

Output format: [timestamp] message

func (*Logger) Notification

func (l *Logger) Notification(method string, params interface{})

Notification logs an incoming MCP notification with appropriate formatting. Notifications are typically sent by the server to indicate capability changes or other events that the client should be aware of.

Args:

  • method: The notification method name (e.g., "notifications/tools/list_changed")
  • params: The notification args (logged in JSON-RPC mode only)

Some notifications like keepalive are filtered in simple mode unless verbose output is enabled. JSON-RPC mode shows all notification details.

func (*Logger) Output

func (l *Logger) Output(format string, args ...interface{})

Output writes user-facing output directly to stdout without timestamps. This method is used for command results, formatted data, and other content that should be displayed to users without logging metadata.

Unlike other logging methods, Output always writes to stdout regardless of the configured writer, ensuring user-facing content is properly displayed.

Args:

  • format: Printf-style format string (or plain text if no args provided)
  • args: Arguments for the format string

This method is primarily used by command implementations to display results and formatted data to the user.

func (*Logger) OutputLine

func (l *Logger) OutputLine(format string, args ...interface{})

OutputLine writes user-facing output with a newline. This is a convenience wrapper around Output that automatically adds a newline.

Args:

  • format: Printf-style format string (or plain text if no args provided)
  • args: Arguments for the format string

func (*Logger) Request

func (l *Logger) Request(method string, params interface{})

Request logs an outgoing MCP request with appropriate formatting. The behavior depends on the logging mode: simple mode shows user-friendly messages, while JSON-RPC mode shows complete protocol details.

Args:

  • method: The MCP method name (e.g., "initialize", "resources/list")
  • params: The request args (logged in JSON-RPC mode only)

In simple mode, this maps method names to user-friendly messages. In JSON-RPC mode, this shows complete request details with proper formatting.

Note: tool listing is handled via the list_tools meta-tool and logs directly through Info/Success rather than through this method.

func (*Logger) Response

func (l *Logger) Response(method string, result interface{})

Response logs an incoming MCP response with appropriate formatting. The behavior depends on the logging mode: simple mode shows user-friendly summaries, while JSON-RPC mode shows complete response details.

Args:

  • method: The MCP method name this response corresponds to
  • result: The response result (logged in JSON-RPC mode only)

In simple mode, this extracts meaningful information like counts and status. In JSON-RPC mode, this shows complete response details.

func (*Logger) SetVerbose

func (l *Logger) SetVerbose(verbose bool)

SetVerbose enables or disables verbose logging mode. When verbose mode is enabled, the logger will output debug messages and more detailed information about operations, including protocol details and internal state changes.

Args:

  • verbose: Whether to enable verbose output

This is useful for debugging and development scenarios where you need detailed insight into the agent's operation.

func (*Logger) SetWriter

func (l *Logger) SetWriter(w io.Writer)

SetWriter sets a custom writer for the logger output. By default, the logger writes to stdout, but this can be changed to write to files, buffers, or other destinations for testing or log aggregation purposes.

Args:

  • w: The io.Writer to use for log output

Example:

var buffer bytes.Buffer
logger.SetWriter(&buffer)
// Now all log output goes to the buffer instead of stdout

func (*Logger) Success

func (l *Logger) Success(format string, args ...interface{})

Success logs a success message with timestamp and green coloring. This is used for successful operations, completed tasks, and positive status updates.

Args:

  • format: Printf-style format string
  • args: Arguments for the format string

Output format: [timestamp] message (in green when colors enabled)

func (*Logger) Write

func (l *Logger) Write(p []byte) (n int, err error)

Write implements io.Writer for compatibility with other systems. This allows the logger to be used as a writer destination for other components that expect an io.Writer interface.

All writes are treated as debug messages and subject to verbose mode filtering.

type MCPServer

type MCPServer struct {
	// contains filtered or unexported fields
}

MCPServer wraps the agent functionality and exposes it as MCP tools for AI assistants. It acts as a bridge between AI assistants and the muster aggregator, enabling programmatic access to all MCP capabilities through the standard MCP protocol.

The server exposes comprehensive tool operations including:

  • Listing and describing tools, resources, and prompts
  • Executing tools with argument validation
  • Retrieving resource contents and prompt templates
  • Advanced filtering and search capabilities
  • Core tool identification and categorization

Key features:

  • Stdio transport for AI assistant integration
  • JSON-formatted responses for structured data consumption
  • Error handling with detailed error messages
  • Optional client notification support
  • Tool availability caching and refresh
  • Automatic re-authentication when tokens expire
  • Proactive auth status notification in tool responses (ADR-008)

func NewMCPServer

func NewMCPServer(client *Client, logger *Logger, notifyClients bool) (*MCPServer, error)

NewMCPServer creates a new MCP server that exposes agent functionality as MCP tools. This enables AI assistants to interact with muster through the standard MCP protocol using stdio transport.

Args:

  • client: MCP client for aggregator communication
  • logger: Logger instance for structured logging
  • notifyClients: Whether to enable client notifications for tool changes

The server is initialized with:

  • Complete tool registry for agent operations
  • Stdio transport for AI assistant integration
  • Tool, resource, and prompt capabilities
  • Optional notification support for dynamic updates

Exposed tools include: list_tools, describe_tool, call_tool, get_resource, get_prompt, filter_tools, list_core_tools, and more.

Example:

client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
server, err := agent.NewMCPServer(client, logger, false)
if err != nil {
    log.Fatal(err)
}
if err := server.Start(ctx); err != nil {
    log.Fatal(err)
}

func (*MCPServer) SetAuthManager

func (m *MCPServer) SetAuthManager(authManager *oauth.AuthManager, endpoint string)

SetAuthManager sets the auth manager for re-authentication support. When set, the server can automatically trigger browser-based re-authentication when tokens expire during operations.

func (*MCPServer) Start

func (m *MCPServer) Start(ctx context.Context) error

Start starts the MCP server using stdio transport for AI assistant integration. This method blocks until the server is terminated, handling MCP protocol communication over stdin/stdout. It's designed to be used as the main entry point when running as an MCP server for AI assistants.

The server will continue running until the context is cancelled or the stdio connection is closed by the client.

type NoSpaceDynamicCompleter

type NoSpaceDynamicCompleter struct {
	Callback func(string) []string
	Children []readline.PrefixCompleterInterface
}

NoSpaceDynamicCompleter is a custom completer that doesn't add trailing spaces for completions ending with special characters like "=".

WHY THIS EXISTS (Complexity Justification): The readline library's built-in PcItemDynamic always appends a trailing space after completions. This is problematic for key=value completions where we want:

call tool_name param=|  (cursor immediately after =)

Instead of:

call tool_name param= |  (unwanted space before cursor)

The doNoSpaceInternal function duplicates readline's prefix completion logic because readline doesn't expose a way to customize the trailing space behavior. We checked readline's API and configuration options - this workaround is necessary.

If readline adds this feature in the future, this code can be simplified.

func (*NoSpaceDynamicCompleter) Do

func (n *NoSpaceDynamicCompleter) Do(line []rune, pos int) ([][]rune, int)

Do implements the AutoCompleter interface

func (*NoSpaceDynamicCompleter) GetChildren

GetChildren returns the child completers

func (*NoSpaceDynamicCompleter) GetDynamicNames

func (n *NoSpaceDynamicCompleter) GetDynamicNames(line []rune) [][]rune

GetDynamicNames returns completions WITHOUT trailing spaces for items ending with "="

func (*NoSpaceDynamicCompleter) GetName

func (n *NoSpaceDynamicCompleter) GetName() []rune

GetName returns an empty name since this is a dynamic completer

func (*NoSpaceDynamicCompleter) IsDynamic

func (n *NoSpaceDynamicCompleter) IsDynamic() bool

IsDynamic returns true since this is a dynamic completer

func (*NoSpaceDynamicCompleter) Print

func (n *NoSpaceDynamicCompleter) Print(prefix string, level int, buf *bytes.Buffer)

Print implements the PrefixCompleterInterface

func (*NoSpaceDynamicCompleter) SetChildren

func (n *NoSpaceDynamicCompleter) SetChildren(children []readline.PrefixCompleterInterface)

SetChildren sets the child completers

type NotificationHandler

type NotificationHandler func(notification mcp.JSONRPCNotification)

NotificationHandler defines a function type for handling MCP notifications. It receives JSON-RPC notifications from the MCP server and can be used to implement custom notification processing logic.

The handler is called asynchronously when notifications are received, typically for events like tool list changes, resource updates, or server status changes.

Example:

handler := func(notification mcp.JSONRPCNotification) {
    fmt.Printf("Received notification: %s\n", notification.Method)
}

type PendingAuthMCPServer

type PendingAuthMCPServer struct {
	// contains filtered or unexported fields
}

PendingAuthMCPServer is an MCP server that exposes only the authenticate_muster tool. This is used when the Agent cannot connect to the Muster Server because it requires OAuth authentication. The server provides a synthetic tool that initiates the auth flow.

Once authentication is complete, the caller should replace this server with the full MCPServer that exposes all tools.

func NewPendingAuthMCPServer

func NewPendingAuthMCPServer(logger *Logger, authManager *oauth.AuthManager, serverURL string) (*PendingAuthMCPServer, error)

NewPendingAuthMCPServer creates a new MCP server in pending auth state. It exposes only the authenticate_muster synthetic tool.

func (*PendingAuthMCPServer) GetMCPServer

func (p *PendingAuthMCPServer) GetMCPServer() *server.MCPServer

GetMCPServer returns the underlying MCP server. This is used to send notifications when auth completes.

func (*PendingAuthMCPServer) IsAuthComplete

func (p *PendingAuthMCPServer) IsAuthComplete() bool

IsAuthComplete returns true if authentication has completed successfully.

func (*PendingAuthMCPServer) Start

func (p *PendingAuthMCPServer) Start(ctx context.Context) error

Start starts the MCP server using stdio transport.

type REPL

type REPL struct {
	// contains filtered or unexported fields
}

REPL represents an interactive Read-Eval-Print Loop for MCP interaction. It provides a command-line interface for exploring and testing MCP capabilities with features like tab completion, command history, and real-time notifications.

The REPL uses a modular command system where each command implements the Command interface, enabling extensible functionality and consistent user experience. Commands support aliases, usage documentation, and context-aware tab completion.

Key features:

  • Interactive command execution with argument parsing
  • Tab completion for commands, tool names, and args
  • Persistent command history across sessions
  • Real-time notification display (SSE transport)
  • Graceful error handling and recovery
  • Transport-aware feature adaptation
  • Stylish prompt with current context display

func NewREPL

func NewREPL(client *Client, logger *Logger) *REPL

NewREPL creates a new REPL instance with the specified client and logger. It initializes the command registry and registers all available commands with their respective aliases and completion handlers.

Args:

  • client: MCP client for server communication
  • logger: Logger instance for structured output and debugging

The REPL is created with:

  • Pre-registered command set (help, list, describe, call, etc.)
  • Notification channel for real-time updates
  • Command registry with alias support
  • Transport adapter for command execution
  • Current context detection for stylish prompt display

Example:

client := agent.NewClient("http://localhost:8090/sse", logger, agent.TransportSSE)
repl := agent.NewREPL(client, logger)
if err := repl.Run(ctx); err != nil {
    log.Fatal(err)
}

func (*REPL) Run

func (r *REPL) Run(ctx context.Context) error

Run starts the REPL and enters the main interaction loop. This method initializes the REPL environment, sets up notification handling, configures readline with history and completion, and enters the main command processing loop.

The method performs complete REPL initialization:

  • Notification channel routing for supported transports
  • Readline configuration with history file and tab completion
  • Background notification listener for real-time updates
  • Main command processing loop with graceful shutdown

Key features:

  • Persistent command history across sessions
  • Context-aware tab completion for commands and args
  • Real-time notification display (transport dependent)
  • Graceful shutdown handling for Ctrl+C and EOF
  • Transport capability adaptation

The REPL continues running until:

  • Context cancellation (Ctrl+C or external signal)
  • EOF input (Ctrl+D)
  • Explicit exit command
  • Fatal readline errors

Returns:

  • nil for normal shutdown
  • Error for initialization or fatal runtime errors

type ServerInfo

type ServerInfo struct {
	// Name is the server's name as reported during initialization
	Name string
	// Version is the server's version as reported during initialization
	Version string
}

ServerInfo contains information about the connected MCP server. This is populated during the MCP protocol handshake.

type TestMCPServer

type TestMCPServer struct {
	// contains filtered or unexported fields
}

TestMCPServer wraps the test framework functionality and exposes it via MCP

func NewTestMCPServer

func NewTestMCPServer(endpoint string, logger *Logger, configPath string, debug bool) (*TestMCPServer, error)

NewTestMCPServer creates a new test MCP server that exposes test functionality

func (*TestMCPServer) Start

func (t *TestMCPServer) Start(ctx context.Context) error

Start starts the test MCP server using stdio transport

type TransportType

type TransportType string

TransportType defines the transport type for MCP connections. It determines how the client communicates with the MCP server.

const (
	// TransportSSE enables real-time bidirectional communication with notification support.
	// This transport maintains a persistent connection and provides immediate updates
	// when server capabilities change. Best for interactive use and monitoring.
	TransportSSE TransportType = "sse"

	// TransportStreamableHTTP uses request-response pattern for environments that don't support SSE.
	// This transport doesn't maintain persistent connections or provide real-time notifications.
	// Best for CLI scripts, automation, and restricted network environments.
	TransportStreamableHTTP TransportType = "streamable-http"
)

Directories

Path Synopsis
Package commands provides a shared interface for REPL command implementations.
Package commands provides a shared interface for REPL command implementations.
Package oauth implements OAuth 2.1 client authentication for the Muster Agent.
Package oauth implements OAuth 2.1 client authentication for the Muster Agent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL