Documentation
¶
Overview ¶
Package websocket provides WebSocket input component for receiving federated data
Package websocket provides WebSocket input component for StreamKit federation.
Overview ¶
The WebSocket Input component enables StreamKit instances to receive data over WebSocket connections, completing the federation loop started by the WebSocket Output component. This unlocks edge-to-cloud, multi-region, and hierarchical processing topologies.
Key Features ¶
- Dual-mode operation: Server (listen) or Client (connect)
- Bidirectional communication: Request/reply patterns
- Authentication: Bearer token or Basic auth
- Automatic reconnection: Exponential backoff for client mode
- Backpressure handling: Drop oldest/newest or block
- Prometheus metrics: Comprehensive observability
Modes of Operation ¶
Server Mode (Listen):
Component acts as WebSocket server, accepting incoming connections from multiple remote StreamKit instances.
┌──────────────────┐ ┌──────────────────┐ │ Instance A │ │ Instance B │ │ (Edge Device) │ │ (THIS COMPONENT)│ │ │ │ │ │ WS Output ├─ ws:// ─►│ WS Input │ │ (client) │ │ (server) │ │ │ │ :8081/ingest │ └──────────────────┘ └──────────────────┘
Use case: Cloud hub receiving data from edge devices
Client Mode (Connect):
Component acts as WebSocket client, connecting to a remote WebSocket server.
┌──────────────────┐ ┌──────────────────┐ │ Instance B │ │ Instance A │ │ (Edge Device) │ │ (Cloud Hub) │ │ (THIS COMPONENT)│ │ │ │ WS Input ├─ ws:// ─►│ WS Output │ │ (client) │ │ (server) │ │ │ │ :8080/stream │ └──────────────────┘ └──────────────────┘
Use case: Edge device pulling data from cloud hub
Message Protocol ¶
All WebSocket messages use a JSON envelope to distinguish between data and control messages:
type MessageEnvelope struct {
Type string // "data", "request", "reply", "ack", "nack", "slow"
ID string // Unique message ID
Timestamp int64 // Unix milliseconds
Payload json.RawMessage // Actual message content (optional for ack/nack)
}
Data Message:
{
"type": "data",
"id": "data-001",
"timestamp": 1704844800000,
"payload": {"sensor_id": "temp-01", "value": 23.5}
}
Request Message:
{
"type": "request",
"id": "req-001",
"timestamp": 1704844800000,
"payload": {
"method": "backpressure",
"params": {"rate_limit": 100, "unit": "msg/sec"}
}
}
Reply Message:
{
"type": "reply",
"id": "req-001", // Matches request ID
"timestamp": 1704844800050,
"payload": {"status": "ok", "result": {...}}
}
Ack Message (Reliable Delivery):
{
"type": "ack",
"id": "data-001", // Matches data message ID
"timestamp": 1704844800100
}
Nack Message (Delivery Failed):
{
"type": "nack",
"id": "data-001", // Matches data message ID
"timestamp": 1704844800100,
"payload": {"reason": "publish_failed", "error": "..."}
}
Slow Message (Backpressure):
{
"type": "slow",
"id": "bp-001",
"timestamp": 1704844800200,
"payload": {"queue_depth": 850, "queue_capacity": 1000, "utilization": 0.85}
}
Configuration ¶
Server Mode Example:
{
"type": "input",
"name": "websocket_input",
"config": {
"mode": "server",
"server": {
"http_port": 8081,
"path": "/ingest",
"max_connections": 100,
"enable_compression": true
},
"auth": {
"type": "bearer",
"bearer_token_env": "WS_INGEST_TOKEN"
},
"backpressure": {
"enabled": true,
"queue_size": 1000,
"on_full": "drop_oldest"
},
"ports": {
"outputs": [
{
"name": "ws_data",
"subject": "federated.data",
"type": "nats"
}
]
}
}
}
Client Mode Example:
{
"type": "input",
"name": "websocket_input",
"config": {
"mode": "client",
"client": {
"url": "ws://edge-instance:8080/stream",
"reconnect": {
"enabled": true,
"max_retries": 10,
"initial_interval": "1s",
"max_interval": "60s",
"multiplier": 2.0
}
},
"ports": {
"outputs": [
{
"name": "ws_data",
"subject": "federated.data",
"type": "nats"
}
]
}
}
}
Bidirectional Communication ¶
When enabled, the component supports request/reply patterns over the WebSocket connection. This allows the receiving instance to send control messages back to the sender.
Supported Request Methods:
- Backpressure Control: Adjust upstream send rate
- Selective Subscription: Filter data at source
- Historical Query: Request replay of buffered messages
- Status Query: Request observability metrics
- Dynamic Routing: Announce capabilities for routing
Example: Backpressure Request
// Instance B sends request to Instance A
request := MessageEnvelope{
Type: "request",
ID: "req-bp-001",
Payload: json.RawMessage(`{
"method": "backpressure",
"params": {"rate_limit": 100, "unit": "msg/sec"}
}`),
}
// Instance A replies
reply := MessageEnvelope{
Type: "reply",
ID: "req-bp-001",
Payload: json.RawMessage(`{
"status": "ok",
"result": {"adjusted_to": 100}
}`),
}
Backpressure Handling ¶
The component maintains an internal message queue to decouple WebSocket reception from NATS publishing. When the queue fills up, backpressure policies are applied:
Drop Oldest (default):
Queue: [msg1, msg2, msg3, msg4, msg5] ← FULL New: msg6 Result: [msg2, msg3, msg4, msg5, msg6] Lost: msg1
Drop Newest:
Queue: [msg1, msg2, msg3, msg4, msg5] ← FULL New: msg6 Result: [msg1, msg2, msg3, msg4, msg5] Lost: msg6
Block:
Queue: [msg1, msg2, msg3, msg4, msg5] ← FULL New: msg6 Wait until queue has space...
Reconnection Logic (Client Mode) ¶
When operating in client mode, the component automatically reconnects if the connection is lost:
Attempt 1: Wait 1 second Attempt 2: Wait 2 seconds (1 * 2.0) Attempt 3: Wait 4 seconds (2 * 2.0) Attempt 4: Wait 8 seconds (4 * 2.0) ... Attempt N: Wait up to max_interval (60s)
Reconnection stops when:
- max_retries reached (if configured)
- component stopped
- connection succeeds
Metrics ¶
Prometheus metrics exposed:
# Message throughput
websocket_input_messages_received_total{component,type} - Total received
websocket_input_messages_published_total{component,subject} - Total published
websocket_input_messages_dropped_total{component,reason} - Total dropped
# Connection state
websocket_input_connections_active{component} - Active connections
websocket_input_connections_total{component} - Total connections
websocket_input_reconnect_attempts_total{component} - Reconnection attempts
# Request/Reply (bidirectional mode)
websocket_input_requests_sent_total{component,method} - Requests sent
websocket_input_replies_received_total{component,status} - Replies received
websocket_input_request_timeouts_total{component} - Request timeouts
websocket_input_request_duration_seconds{component,method} - Round-trip time
# Queue state
websocket_input_queue_depth{component} - Current queue depth
websocket_input_queue_utilization{component} - Queue utilization (0.0-1.0)
# Errors
websocket_input_errors_total{component,type} - Errors by type
Health Checks ¶
Component health response:
{
"healthy": true,
"status": "connected",
"details": {
"mode": "server",
"connections": {
"active": 5,
"total": 37
},
"queue": {
"depth": 45,
"utilization": 0.045
},
"throughput": {
"messages_per_second": 250
}
}
}
Unhealthy states:
- Server mode: No active connections for > 5 minutes
- Client mode: Not connected and max retries exceeded
- Queue full for > 30 seconds (backpressure issue)
Security ¶
Authentication:
Bearer Token (recommended):
export WS_INGEST_TOKEN="sk-1234567890abcdef"
Basic Auth (legacy):
export WS_USERNAME="semstreams" export WS_PASSWORD="secret123"
TLS/SSL:
Recommendation: Use reverse proxy (nginx, Caddy) for TLS termination.
Client ───HTTPS───► Nginx ───HTTP───► StreamKit
(TLS) (localhost)
Error Handling ¶
Errors are classified using the StreamKit error framework:
- Fatal: Invalid mode, missing required config
- Transient: Connection errors, read errors, publish errors
- Invalid: Message parse errors, unknown message types
Fatal errors prevent component startup. Transient errors trigger reconnection (client mode) or are logged (server mode). Invalid messages are dropped and counted in metrics.
Thread Safety ¶
All public methods are safe for concurrent use:
- Start(): Protected by lifecycleMu
- Stop(): Protected by lifecycleMu
- Internal state: Protected by appropriate mutexes
Message processing is handled by dedicated goroutines:
- Server mode: One goroutine per client connection
- Client mode: One read goroutine + one reconnect goroutine
- Common: One message processor goroutine
Federation Use Cases ¶
Edge-to-Cloud:
Edge Instance: UDP → Processors → WebSocket Output Cloud Instance: WebSocket Input → Storage/Analytics
Multi-Region Replication:
Region A: WebSocket Output → Region B: WebSocket Input Region B: WebSocket Output → Region A: WebSocket Input
Hierarchical Processing:
Edge: Raw data → Pre-processing → WebSocket Output Regional: WebSocket Input → Aggregation → WebSocket Output Cloud: WebSocket Input → Analytics/Storage
Integration Example ¶
import (
"github.com/c360studio/semstreams/input/websocket_input"
"github.com/c360studio/semstreams/component"
"github.com/c360studio/semstreams/natsclient"
"github.com/c360studio/semstreams/metric"
)
// Create NATS client
nats, _ := natsclient.NewClient("nats://localhost:4222")
nats.Connect(ctx)
// Create metrics registry
metrics := metric.NewMetricsRegistry()
// Create component
config := websocket_input.DefaultConfig()
config.Mode = websocket_input.ModeServer
config.ServerConfig.HTTPPort = 8081
input, _ := websocket_input.NewInput("ws_in", nats, config, metrics)
// Start component
input.Start(ctx)
defer input.Stop(ctx)
// Messages automatically published to NATS subject
See Also ¶
- output/websocket: WebSocket Output component (sends data)
- component: Component lifecycle interfaces
- natsclient: NATS client wrapper
- metric: Metrics registry and Prometheus integration
Package websocket provides component registration for WebSocket input ¶
Package websocket provides WebSocket input component for receiving federated data
Index ¶
- func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type AuthConfig
- type BackpressureConfig
- type BidirectionalConfig
- type ClientConfig
- type Config
- type Input
- func (i *Input) ConfigSchema() component.ConfigSchema
- func (i *Input) DataFlow() component.FlowMetrics
- func (i *Input) Health() component.HealthStatus
- func (i *Input) Initialize() error
- func (i *Input) InputPorts() []component.Port
- func (i *Input) Meta() component.Metadata
- func (i *Input) OutputPorts() []component.Port
- func (i *Input) Process(_ any) error
- func (i *Input) Start(ctx context.Context) error
- func (i *Input) Stop(timeout time.Duration) error
- type MessageEnvelope
- type Metrics
- type Mode
- type ReconnectConfig
- type ServerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateInput ¶
func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateInput is the factory function for creating WebSocket input components
Types ¶
type AuthConfig ¶
type AuthConfig struct {
Type string `json:"type" schema:"type:string,description:Authentication type (none bearer basic),category:basic"`
BearerTokenEnv string `json:"bearer_token_env,omitempty" schema:"type:string,description:Environment variable for bearer token,category:security"`
BasicUsernameEnv string `` /* 131-byte string literal not displayed */
BasicPasswordEnv string `` /* 131-byte string literal not displayed */
}
AuthConfig holds authentication configuration
type BackpressureConfig ¶
type BackpressureConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable backpressure handling,category:basic"`
QueueSize int `json:"queue_size" schema:"type:int,description:Internal message queue size,category:limits"`
OnFull string `json:"on_full" schema:"type:string,description:Action when queue full (drop_oldest drop_newest block),category:policy"`
}
BackpressureConfig holds backpressure handling configuration
type BidirectionalConfig ¶
type BidirectionalConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable request/reply patterns,category:basic"`
RequestTimeout time.Duration `json:"request_timeout" schema:"type:duration,description:Timeout for request/reply,category:timing"`
MaxConcurrentRequests int `json:"max_concurrent_requests" schema:"type:int,description:Maximum concurrent requests,category:limits"`
}
BidirectionalConfig holds bidirectional communication configuration
type ClientConfig ¶
type ClientConfig struct {
URL string `json:"url" schema:"type:string,description:WebSocket server URL to connect to,category:basic"`
Reconnect *ReconnectConfig `json:"reconnect,omitempty" schema:"type:object,description:Reconnection configuration,category:reliability"`
}
ClientConfig holds client mode configuration
type Config ¶
type Config struct {
// Mode determines if component acts as server (listen) or client (connect)
Mode Mode `json:"mode" schema:"type:string,description:Operation mode (server or client),category:basic"`
// Server mode configuration
ServerConfig *ServerConfig `json:"server,omitempty" schema:"type:object,description:Server mode configuration,category:server"`
// Client mode configuration
ClientConfig *ClientConfig `json:"client,omitempty" schema:"type:object,description:Client mode configuration,category:client"`
// Authentication configuration
Auth *AuthConfig `json:"auth,omitempty" schema:"type:object,description:Authentication configuration,category:security"`
// Bidirectional communication configuration
Bidirectional *BidirectionalConfig `json:"bidirectional,omitempty" schema:"type:object,description:Bidirectional request/reply configuration,category:advanced"`
// Backpressure configuration
Backpressure *BackpressureConfig `json:"backpressure,omitempty" schema:"type:object,description:Backpressure handling configuration,category:advanced"`
// Port configuration for inputs and outputs
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
}
Config holds configuration for WebSocket input component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default configuration for WebSocket input
type Input ¶
type Input struct {
// contains filtered or unexported fields
}
Input implements a WebSocket input component that receives federated data
func NewInput ¶
func NewInput( name string, natsClient *natsclient.Client, config Config, metricsRegistry *metric.MetricsRegistry, securityCfg security.Config, ) (*Input, error)
NewInput creates a new WebSocket input component
func (*Input) ConfigSchema ¶
func (i *Input) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Input) DataFlow ¶
func (i *Input) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Input) Health ¶
func (i *Input) Health() component.HealthStatus
Health returns current health status
func (*Input) Initialize ¶
Initialize initializes the component (no-op for WebSocket input)
func (*Input) InputPorts ¶
InputPorts returns the input ports (none for input components)
func (*Input) OutputPorts ¶
OutputPorts returns the output ports
func (*Input) Process ¶
Process implements component.LifecycleComponent (not used for input components)
type MessageEnvelope ¶
type MessageEnvelope struct {
Type string `json:"type"` // Message type (see above)
ID string `json:"id"` // Unique message ID (for correlation)
Timestamp int64 `json:"timestamp"` // Unix milliseconds
Payload json.RawMessage `json:"payload,omitempty"` // Optional payload (required for data/nack/slow)
}
MessageEnvelope wraps all WebSocket messages with type discrimination Supported types:
- "data": Application data to be published to NATS
- "request": Control plane request (future use)
- "reply": Control plane reply (future use)
- "ack": Acknowledge successful receipt/processing of data message
- "nack": Negative acknowledgment (processing failed, may retry)
- "slow": Backpressure signal indicating receiver is overloaded
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds Prometheus metrics for Input component
type ReconnectConfig ¶
type ReconnectConfig struct {
Enabled bool `json:"enabled" schema:"type:bool,description:Enable automatic reconnection,category:basic"`
MaxRetries int `json:"max_retries" schema:"type:int,description:Maximum reconnection attempts (0=unlimited),category:limits"`
InitialInterval time.Duration `json:"initial_interval" schema:"type:duration,description:Initial retry interval,category:timing"`
MaxInterval time.Duration `json:"max_interval" schema:"type:duration,description:Maximum retry interval,category:timing"`
Multiplier float64 `json:"multiplier" schema:"type:float,description:Backoff multiplier,category:advanced"`
}
ReconnectConfig holds reconnection configuration for client mode
type ServerConfig ¶
type ServerConfig struct {
HTTPPort int `json:"http_port" schema:"type:int,description:HTTP port to listen on,category:basic"`
Path string `json:"path" schema:"type:string,description:WebSocket endpoint path,category:basic"`
MaxConnections int `json:"max_connections" schema:"type:int,description:Maximum concurrent connections,category:limits"`
ReadBufferSize int `json:"read_buffer_size" schema:"type:int,description:WebSocket read buffer size,category:advanced"`
WriteBufferSize int `json:"write_buffer_size" schema:"type:int,description:WebSocket write buffer size,category:advanced"`
EnableCompression bool `json:"enable_compression" schema:"type:bool,description:Enable per-message compression,category:advanced"`
AllowedOrigins []string `json:"allowed_origins" schema:"type:array,description:Allowed origins for CORS (empty=same-origin only),category:security"`
}
ServerConfig holds server mode configuration