sse

package
v0.0.0-...-ba52af2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: MIT Imports: 8 Imported by: 0

README

SSE Reconnection with State Recovery

Overview

This package implements a robust Server-Sent Events (SSE) system with automatic reconnection and state recovery capabilities. It solves the fundamental problem of maintaining real-time connections across network interruptions, browser refreshes, and server restarts.

The Hard Problem We Solved

Traditional SSE implementations lose events when clients disconnect. Our solution provides:

  • Persistent Sessions: Clients maintain identity across reconnections
  • Event Buffering: Missed events are stored and replayed on reconnection
  • Automatic Recovery: Seamless reconnection without data loss
  • Memory-Bounded: Ring buffers prevent unbounded memory growth
  • Production-Ready: Handles edge cases like rapid reconnects and buffer overflow

Architecture

Core Components
┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Handler   │────▶│    Broker    │────▶│   Session   │
│  (HTTP/SSE) │     │  (Routing)   │     │  Manager    │
└─────────────┘     └──────────────┘     └─────────────┘
       │                    │                     │
       ▼                    ▼                     ▼
   [Clients]          [Active Conns]        [Buffers]
Key Features
  1. Session Management

    • Cryptographically secure session IDs
    • Configurable buffer size and TTL
    • Automatic cleanup of abandoned sessions
  2. Event Buffering

    • Ring buffer for memory-bounded storage
    • Event sequence numbering
    • Selective replay based on Last-Event-ID
  3. Connection Handling

    • Cookie and header-based session persistence
    • Graceful degradation when buffers disabled
    • Heartbeat system to maintain connections

Usage

Basic Setup
import "github.com/johnjansen/buffkit/sse"

// Configure SSE with reconnection support
config := sse.SessionConfig{
    BufferSize:         1000,              // Events per session
    BufferTTL:          30 * time.Second,  // Keep disconnected sessions
    EnableReconnection: true,
    CleanupInterval:    10 * time.Second,
}

// Create handler
handler := sse.NewHandler(config)

// Register routes
http.HandleFunc("/events", handler.ServeHTTP)

// Broadcast events
handler.Broadcast("update", `{"message":"Hello World"}`)

// Target specific clients
handler.SendToClient(sessionID, "notification", `{"alert":"Important"}`)
Client-Side Connection
const eventSource = new EventSource('/events', {
    withCredentials: true  // Include cookies for session persistence
});

eventSource.addEventListener('connected', (e) => {
    const data = JSON.parse(e.data);
    console.log('Connected with session:', data.sessionId);
});

eventSource.addEventListener('message', (e) => {
    const data = JSON.parse(e.data);
    if (data._replayed) {
        console.log('Replayed event from', new Date(data._originalTime * 1000));
    }
});

eventSource.onerror = (e) => {
    // EventSource automatically reconnects
    console.log('Connection lost, reconnecting...');
};

Configuration Options

Option Default Description
BufferSize 1000 Maximum events to buffer per session
BufferTTL 30s How long to keep disconnected sessions
EnableReconnection true Enable session persistence and replay
CleanupInterval 10s How often to clean expired sessions

Performance Characteristics

  • Memory Usage: O(clients × buffer_size × event_size)
  • Event Broadcasting: O(active_clients)
  • Reconnection: O(buffer_size) for replay
  • Cleanup: O(total_sessions) every cleanup interval

Testing

Unit Tests
go test ./sse/...
BDD Scenarios
cd features
go test -v -run TestSSEReconnection
Load Testing
// See sse/benchmark_test.go for performance tests
go test -bench=. ./sse/...

Production Considerations

Scaling Horizontally

For multi-server deployments, implement Redis-based session storage:

// TODO: Implement RedisSessionManager
sessionManager := sse.NewRedisSessionManager(redisClient, config)
Security
  1. Session Validation: Implement IP and user-agent checking
  2. Rate Limiting: Add exponential backoff for reconnections
  3. Authentication: Integrate with your auth system
  4. HTTPS Only: Always use TLS in production
Monitoring

Track these metrics:

  • Active connections
  • Buffer utilization
  • Reconnection rate
  • Memory usage per session
  • Event throughput

Limitations

  1. In-Memory Only: Current implementation doesn't persist across server restarts
  2. No Compression: Large events consume bandwidth
  3. No Binary Data: Text-only events (SSE limitation)
  4. Single Server: Requires Redis for multi-server setup

Future Enhancements

  • Redis session storage for horizontal scaling
  • Event compression for bandwidth optimization
  • Client SDKs for easier integration
  • Prometheus metrics integration
  • WebSocket fallback support
  • Event replay from persistent storage

Contributing

See CONTRIBUTING.md for guidelines.

License

Part of the Buffkit framework. See LICENSE for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker manages SSE connections with reconnection support

func NewBroker

func NewBroker(config SessionConfig) *Broker

NewBroker creates a new SSE broker with reconnection support

func (*Broker) Broadcast

func (broker *Broker) Broadcast(eventType, data string)

Broadcast sends an event to all connected clients

func (*Broker) GetClientCount

func (broker *Broker) GetClientCount() int

GetClientCount returns the number of connected clients

func (*Broker) GetSessionStats

func (broker *Broker) GetSessionStats() map[string]interface{}

GetSessionStats returns statistics about sessions

func (*Broker) RegisterClient

func (broker *Broker) RegisterClient(w http.ResponseWriter, r *http.Request) (*Client, error)

RegisterClient registers a new SSE client

func (*Broker) SendToClient

func (broker *Broker) SendToClient(sessionID, eventType, data string)

SendToClient sends an event to a specific client

func (*Broker) SendToClients

func (broker *Broker) SendToClients(sessionIDs []string, eventType, data string)

SendToClients sends an event to specific clients

func (*Broker) Stop

func (broker *Broker) Stop()

Stop gracefully shuts down the broker

func (*Broker) UnregisterClient

func (broker *Broker) UnregisterClient(sessionID string)

UnregisterClient unregisters an SSE client

type Client

type Client struct {
	SessionID    string        // Persistent session identifier
	EventChannel chan Event    // Channel for sending events to client
	Done         chan struct{} // Channel to signal client disconnection
	Request      *http.Request // Original HTTP request
	Writer       http.ResponseWriter
	Flusher      http.Flusher
}

Client represents a connected SSE client with session support

type ClientSession

type ClientSession struct {
	ID            string      // Unique session identifier
	LastEventID   string      // Last event ID received by client
	LastSeen      time.Time   // Last time client was connected
	Created       time.Time   // When session was created
	EventBuffer   *ring.Ring  // Circular buffer of missed events
	Metadata      SessionMeta // Custom session metadata
	Reconnections int         // Number of times client has reconnected
	Active        bool        // Whether client is currently connected
	// contains filtered or unexported fields
}

ClientSession represents a persistent SSE client session

type Event

type Event struct {
	ID        string    // Unique event ID for deduplication
	Type      string    // Event type (e.g., "message", "update", "ping")
	Data      string    // Event payload
	Timestamp time.Time // When the event was created
	Replayed  bool      // Whether this is a replayed event
}

Event represents an SSE event with metadata for replay

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler provides HTTP endpoints for SSE with reconnection support

func NewHandler

func NewHandler(config SessionConfig) *Handler

NewHandler creates a new SSE handler with the given configuration

func (*Handler) Broadcast

func (h *Handler) Broadcast(eventType, data string)

Broadcast sends an event to all connected clients

func (*Handler) GetStats

func (h *Handler) GetStats() map[string]interface{}

GetStats returns statistics about the SSE connections

func (*Handler) SendToClient

func (h *Handler) SendToClient(sessionID, eventType, data string)

SendToClient sends an event to a specific client

func (*Handler) SendToClients

func (h *Handler) SendToClients(sessionIDs []string, eventType, data string)

SendToClients sends an event to specific clients

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles SSE connections with reconnection support

func (*Handler) Stop

func (h *Handler) Stop()

Stop gracefully shuts down the SSE handler

type Message

type Message struct {
	Event     Event    // The event to broadcast
	TargetIDs []string // Specific session IDs to target (empty = broadcast to all)
}

Message represents a broadcast message with targeting capabilities

type SessionConfig

type SessionConfig struct {
	// BufferSize is the maximum number of events to buffer per session
	BufferSize int
	// BufferTTL is how long to keep a disconnected session alive
	BufferTTL time.Duration
	// EnableReconnection enables session persistence and event replay
	EnableReconnection bool
	// CleanupInterval is how often to run the cleanup goroutine
	CleanupInterval time.Duration
}

SessionConfig defines configuration for SSE session management

func DefaultSessionConfig

func DefaultSessionConfig() SessionConfig

DefaultSessionConfig returns sensible defaults for session management

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages all SSE client sessions

func NewSessionManager

func NewSessionManager(config SessionConfig) *SessionManager

NewSessionManager creates a new session manager with the given configuration

func (*SessionManager) BufferEvent

func (sm *SessionManager) BufferEvent(sessionID string, event Event)

BufferEvent adds an event to a session's buffer if disconnected

func (*SessionManager) CreateSession

func (sm *SessionManager) CreateSession(metadata SessionMeta) (*ClientSession, error)

CreateSession creates a new client session

func (*SessionManager) DisconnectSession

func (sm *SessionManager) DisconnectSession(sessionID string)

DisconnectSession marks a session as disconnected

func (*SessionManager) GetActiveSessionCount

func (sm *SessionManager) GetActiveSessionCount() int

GetActiveSessionCount returns the number of active sessions

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(sessionID string) (*ClientSession, bool)

GetSession retrieves a session by ID

func (*SessionManager) GetTotalSessionCount

func (sm *SessionManager) GetTotalSessionCount() int

GetTotalSessionCount returns the total number of sessions (active and buffered)

func (*SessionManager) ReconnectSession

func (sm *SessionManager) ReconnectSession(sessionID string, lastEventID string) (*ClientSession, []Event, error)

ReconnectSession handles a client reconnection

func (*SessionManager) RemoveSession

func (sm *SessionManager) RemoveSession(sessionID string)

RemoveSession completely removes a session

func (*SessionManager) Stop

func (sm *SessionManager) Stop()

Stop gracefully shuts down the session manager

func (*SessionManager) ValidateSessionOwnership

func (sm *SessionManager) ValidateSessionOwnership(sessionID string, metadata SessionMeta) bool

ValidateSessionOwnership checks if a session can be claimed by a connection This prevents session hijacking by validating metadata

type SessionMeta

type SessionMeta struct {
	UserAgent     string            // Client user agent
	RemoteAddr    string            // Client IP address
	Headers       map[string]string // Custom headers from client
	QueryParams   map[string]string // Query parameters from connection
	Subscriptions []string          // Event types client is subscribed to
	ClientVersion string            // Client application version
}

SessionMeta holds custom metadata for a session

type TestEndpoints

type TestEndpoints struct {
	// contains filtered or unexported fields
}

TestEndpoints provides HTTP endpoints for testing SSE functionality

func NewTestEndpoints

func NewTestEndpoints(handler *Handler) *TestEndpoints

NewTestEndpoints creates test endpoints for SSE

func (*TestEndpoints) RegisterRoutes

func (te *TestEndpoints) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers test routes on an HTTP mux

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL