websocket

package
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 24 Imported by: 0

README

WebSocket Input

Receives federated data from remote SemStreams instances over WebSocket connections.

Purpose

The WebSocket Input component enables SemStreams federation by receiving event data from remote instances over WebSocket connections. It complements the WebSocket Output component to create edge-to-cloud, multi-region, and hierarchical processing topologies. The component handles authentication, backpressure, automatic reconnection, and bidirectional communication patterns.

Operation Modes

Server Mode

Listen for incoming WebSocket connections from multiple remote instances. Operates as a data hub receiving streams from distributed edge devices or regional aggregators.

flowchart LR
    A[Edge Instance A] -->|ws://| C[WebSocket Input<br/>Server Mode]
    B[Edge Instance B] -->|ws://| C
    C --> D[NATS JetStream]
Client Mode

Connect to a remote WebSocket server and receive data. Operates as a data consumer pulling streams from a central hub or upstream service.

flowchart LR
    A[Cloud Hub<br/>WebSocket Server] -->|ws://| B[WebSocket Input<br/>Client Mode]
    B --> C[NATS JetStream]

Configuration

Server Mode Example
type: input
name: websocket_input
config:
  mode: server
  server:
    http_port: 8081
    path: /ingest
    max_connections: 100
    read_buffer_size: 4096
    write_buffer_size: 4096
    enable_compression: true
    allowed_origins:
      - https://edge-device-01.example.com
      - https://edge-device-02.example.com
  auth:
    type: bearer
    bearer_token_env: WS_INGEST_TOKEN
  backpressure:
    enabled: true
    queue_size: 1000
    on_full: drop_oldest
  ports:
    outputs:
      - name: ws_data
        subject: federated.data
        type: nats
      - name: ws_control
        subject: federated.control
        type: nats
Client Mode Example
type: input
name: websocket_input
config:
  mode: client
  client:
    url: ws://hub.example.com:8080/stream
    reconnect:
      enabled: true
      max_retries: 10
      initial_interval: 1s
      max_interval: 60s
      multiplier: 2.0
  auth:
    type: bearer
    bearer_token_env: WS_CLIENT_TOKEN
  backpressure:
    enabled: true
    queue_size: 1000
    on_full: drop_oldest
  ports:
    outputs:
      - name: ws_data
        subject: federated.data
        type: nats

Authentication Options

Use environment variable for secure token storage.

export WS_INGEST_TOKEN="sk-1234567890abcdef"

Configuration:

auth:
  type: bearer
  bearer_token_env: WS_INGEST_TOKEN
Basic Authentication

Use environment variables for username and password.

export WS_USERNAME="semstreams"
export WS_PASSWORD="secret123"

Configuration:

auth:
  type: basic
  basic_username_env: WS_USERNAME
  basic_password_env: WS_PASSWORD
No Authentication
auth:
  type: none

Bidirectional Communication

Enable request/reply patterns for control messages. The component supports backpressure signaling, selective subscription, historical queries, status requests, and dynamic routing announcements.

bidirectional:
  enabled: true
  request_timeout: 5s
  max_concurrent_requests: 10
Message Protocol

All WebSocket messages use a JSON envelope with type discrimination:

{
  "type": "data",
  "id": "data-001",
  "timestamp": 1704844800000,
  "payload": {"sensor_id": "temp-01", "value": 23.5}
}

Supported message types:

  • data: Application data published to NATS
  • request: Control plane request
  • reply: Control plane reply
  • ack: Successful receipt acknowledgment
  • nack: Delivery failure notification
  • slow: Backpressure signal indicating overload

Backpressure Handling

The component maintains an internal message queue to decouple WebSocket reception from NATS publishing. When the queue fills up, configurable policies determine message handling.

Drop Oldest (Default)

Discard oldest message to make room for new arrivals.

backpressure:
  enabled: true
  queue_size: 1000
  on_full: drop_oldest
Drop Newest

Discard incoming message when queue is full.

backpressure:
  enabled: true
  queue_size: 1000
  on_full: drop_newest
Block

Wait until queue has space before accepting new messages.

backpressure:
  enabled: true
  queue_size: 1000
  on_full: block

When queue utilization exceeds 80%, the component automatically sends slow signals to upstream connections, enabling adaptive rate limiting.

Reconnection Logic (Client Mode)

Automatic reconnection with exponential backoff when connection is lost.

flowchart TD
    A[Connection Lost] --> B{Reconnect Enabled?}
    B -->|No| C[Stop]
    B -->|Yes| D{Retries < Max?}
    D -->|No| C
    D -->|Yes| E[Calculate Backoff Delay]
    E --> F[Wait]
    F --> G[Attempt Connection]
    G -->|Success| H[Reset Retry Count]
    G -->|Failure| D

Backoff calculation: delay = initial_interval * (multiplier ^ attempts) capped at max_interval.

Example progression with default settings:

  1. Attempt 1: Wait 1 second
  2. Attempt 2: Wait 2 seconds
  3. Attempt 3: Wait 4 seconds
  4. Attempt 4: Wait 8 seconds
  5. Attempt 5+: Wait 60 seconds (max)

Input/Output Ports

Input Ports

None. This is an input component that receives data from external WebSocket connections.

Output Ports
Port Subject Type Description
ws_data federated.data nats Data messages received via WebSocket
ws_control federated.control nats Control messages (requests/replies)

Both ports support NATS and JetStream types. Configure type: jetstream for durable message delivery.

Example Use Cases

Edge-to-Cloud Data Collection

Deploy WebSocket Input in server mode on cloud infrastructure to receive sensor data from distributed edge devices.

# Cloud instance
mode: server
server:
  http_port: 8081
  path: /ingest
  max_connections: 500
auth:
  type: bearer
  bearer_token_env: CLOUD_INGEST_TOKEN
Multi-Region Replication

Connect regional instances bidirectionally for data replication and disaster recovery.

# Region A (server)
mode: server
server:
  http_port: 8081

# Region B (client)
mode: client
client:
  url: ws://region-a.example.com:8081/stream
Hierarchical Processing Pipeline

Receive pre-processed data from regional aggregators for final analysis and storage.

flowchart TD
    A[Edge: Raw Data] --> B[Edge: Pre-processing]
    B --> C[Regional: WebSocket Output]
    C -->|ws://| D[Regional: WebSocket Input]
    D --> E[Regional: Aggregation]
    E --> F[Regional: WebSocket Output]
    F -->|ws://| G[Cloud: WebSocket Input]
    G --> H[Cloud: Analytics]
Real-Time Event Streaming

Subscribe to live event streams from external services or third-party providers.

mode: client
client:
  url: wss://events.provider.com/live
  reconnect:
    enabled: true
    max_retries: 0  # Unlimited retries
auth:
  type: bearer
  bearer_token_env: PROVIDER_API_KEY

Metrics

The component exposes comprehensive Prometheus metrics for observability:

# Message throughput
websocket_input_messages_received_total{component,type}
websocket_input_messages_published_total{component,subject}
websocket_input_messages_dropped_total{component,reason}

# Connection state
websocket_input_connections_active{component}
websocket_input_connections_total{component}
websocket_input_reconnect_attempts_total{component}

# Request/Reply (bidirectional mode)
websocket_input_requests_sent_total{component,method}
websocket_input_replies_received_total{component,status}
websocket_input_request_timeouts_total{component}
websocket_input_request_duration_seconds{component,method}

# Queue state
websocket_input_queue_depth{component}
websocket_input_queue_utilization{component}

# Errors
websocket_input_errors_total{component,type}

Health Checks

Component health status varies by operation mode:

Server Mode

Healthy if running and accepting connections. Reports connection count and queue utilization.

{
  "healthy": true,
  "status": "listening",
  "details": {
    "mode": "server",
    "connections": {
      "active": 5,
      "total": 37
    },
    "queue": {
      "depth": 45,
      "utilization": 0.045
    }
  }
}
Client Mode

Healthy if connected to remote server. Unhealthy if disconnected and max retries exceeded.

{
  "healthy": true,
  "status": "connected",
  "details": {
    "mode": "client",
    "remote_url": "ws://hub.example.com:8080/stream",
    "queue": {
      "depth": 12,
      "utilization": 0.012
    }
  }
}

Security

TLS/SSL Support

Use reverse proxy (nginx, Caddy) for TLS termination in production environments.

server {
    listen 443 ssl;
    server_name ingest.example.com;

    ssl_certificate /etc/ssl/certs/ingest.crt;
    ssl_certificate_key /etc/ssl/private/ingest.key;

    location /ingest {
        proxy_pass http://localhost:8081;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}
CORS Configuration

Restrict cross-origin requests by specifying allowed origins:

server:
  allowed_origins:
    - https://app.example.com
    - https://dashboard.example.com

Empty allowed_origins enforces same-origin policy. Use ["*"] to allow all origins (development only).

Error Handling

Errors are classified using the SemStreams error framework:

  • Fatal: Invalid mode, missing required configuration (prevents startup)
  • Transient: Connection errors, read errors, publish errors (triggers reconnection)
  • Invalid: Message parse errors, unknown message types (dropped and counted)

All errors increment the websocket_input_errors_total metric with appropriate type labels.

Thread Safety

All public methods are safe for concurrent use. Message processing uses dedicated goroutines:

  • Server mode: One goroutine per client connection
  • Client mode: One read goroutine + one reconnect goroutine
  • Common: One message processor goroutine

Internal state is protected by appropriate mutexes and atomic operations.

Documentation

Overview

Package websocket provides WebSocket input component for receiving federated data

Package websocket provides WebSocket input component for StreamKit federation.

Overview

The WebSocket Input component enables StreamKit instances to receive data over WebSocket connections, completing the federation loop started by the WebSocket Output component. This unlocks edge-to-cloud, multi-region, and hierarchical processing topologies.

Key Features

  • Dual-mode operation: Server (listen) or Client (connect)
  • Bidirectional communication: Request/reply patterns
  • Authentication: Bearer token or Basic auth
  • Automatic reconnection: Exponential backoff for client mode
  • Backpressure handling: Drop oldest/newest or block
  • Prometheus metrics: Comprehensive observability

Modes of Operation

Server Mode (Listen):

Component acts as WebSocket server, accepting incoming connections from multiple remote StreamKit instances.

┌──────────────────┐          ┌──────────────────┐
│   Instance A     │          │   Instance B     │
│  (Edge Device)   │          │  (THIS COMPONENT)│
│                  │          │                  │
│  WS Output       ├─ ws:// ─►│  WS Input        │
│  (client)        │          │  (server)        │
│                  │          │  :8081/ingest    │
└──────────────────┘          └──────────────────┘

Use case: Cloud hub receiving data from edge devices

Client Mode (Connect):

Component acts as WebSocket client, connecting to a remote WebSocket server.

┌──────────────────┐          ┌──────────────────┐
│   Instance B     │          │   Instance A     │
│  (Edge Device)   │          │  (Cloud Hub)     │
│  (THIS COMPONENT)│          │                  │
│  WS Input        ├─ ws:// ─►│  WS Output       │
│  (client)        │          │  (server)        │
│                  │          │  :8080/stream    │
└──────────────────┘          └──────────────────┘

Use case: Edge device pulling data from cloud hub

Message Protocol

All WebSocket messages use a JSON envelope to distinguish between data and control messages:

type MessageEnvelope struct {
    Type      string          // "data", "request", "reply", "ack", "nack", "slow"
    ID        string          // Unique message ID
    Timestamp int64           // Unix milliseconds
    Payload   json.RawMessage // Actual message content (optional for ack/nack)
}

Data Message:

{
  "type": "data",
  "id": "data-001",
  "timestamp": 1704844800000,
  "payload": {"sensor_id": "temp-01", "value": 23.5}
}

Request Message:

{
  "type": "request",
  "id": "req-001",
  "timestamp": 1704844800000,
  "payload": {
    "method": "backpressure",
    "params": {"rate_limit": 100, "unit": "msg/sec"}
  }
}

Reply Message:

{
  "type": "reply",
  "id": "req-001",  // Matches request ID
  "timestamp": 1704844800050,
  "payload": {"status": "ok", "result": {...}}
}

Ack Message (Reliable Delivery):

{
  "type": "ack",
  "id": "data-001",  // Matches data message ID
  "timestamp": 1704844800100
}

Nack Message (Delivery Failed):

{
  "type": "nack",
  "id": "data-001",  // Matches data message ID
  "timestamp": 1704844800100,
  "payload": {"reason": "publish_failed", "error": "..."}
}

Slow Message (Backpressure):

{
  "type": "slow",
  "id": "bp-001",
  "timestamp": 1704844800200,
  "payload": {"queue_depth": 850, "queue_capacity": 1000, "utilization": 0.85}
}

Configuration

Server Mode Example:

{
  "type": "input",
  "name": "websocket_input",
  "config": {
    "mode": "server",
    "server": {
      "http_port": 8081,
      "path": "/ingest",
      "max_connections": 100,
      "enable_compression": true
    },
    "auth": {
      "type": "bearer",
      "bearer_token_env": "WS_INGEST_TOKEN"
    },
    "backpressure": {
      "enabled": true,
      "queue_size": 1000,
      "on_full": "drop_oldest"
    },
    "ports": {
      "outputs": [
        {
          "name": "ws_data",
          "subject": "federated.data",
          "type": "nats"
        }
      ]
    }
  }
}

Client Mode Example:

{
  "type": "input",
  "name": "websocket_input",
  "config": {
    "mode": "client",
    "client": {
      "url": "ws://edge-instance:8080/stream",
      "reconnect": {
        "enabled": true,
        "max_retries": 10,
        "initial_interval": "1s",
        "max_interval": "60s",
        "multiplier": 2.0
      }
    },
    "ports": {
      "outputs": [
        {
          "name": "ws_data",
          "subject": "federated.data",
          "type": "nats"
        }
      ]
    }
  }
}

Bidirectional Communication

When enabled, the component supports request/reply patterns over the WebSocket connection. This allows the receiving instance to send control messages back to the sender.

Supported Request Methods:

  1. Backpressure Control: Adjust upstream send rate
  2. Selective Subscription: Filter data at source
  3. Historical Query: Request replay of buffered messages
  4. Status Query: Request observability metrics
  5. Dynamic Routing: Announce capabilities for routing

Example: Backpressure Request

// Instance B sends request to Instance A
request := MessageEnvelope{
    Type: "request",
    ID: "req-bp-001",
    Payload: json.RawMessage(`{
        "method": "backpressure",
        "params": {"rate_limit": 100, "unit": "msg/sec"}
    }`),
}

// Instance A replies
reply := MessageEnvelope{
    Type: "reply",
    ID: "req-bp-001",
    Payload: json.RawMessage(`{
        "status": "ok",
        "result": {"adjusted_to": 100}
    }`),
}

Backpressure Handling

The component maintains an internal message queue to decouple WebSocket reception from NATS publishing. When the queue fills up, backpressure policies are applied:

Drop Oldest (default):

Queue: [msg1, msg2, msg3, msg4, msg5]  ← FULL
New:   msg6
Result: [msg2, msg3, msg4, msg5, msg6]
Lost:   msg1

Drop Newest:

Queue: [msg1, msg2, msg3, msg4, msg5]  ← FULL
New:   msg6
Result: [msg1, msg2, msg3, msg4, msg5]
Lost:   msg6

Block:

Queue: [msg1, msg2, msg3, msg4, msg5]  ← FULL
New:   msg6
Wait until queue has space...

Reconnection Logic (Client Mode)

When operating in client mode, the component automatically reconnects if the connection is lost:

Attempt 1: Wait 1 second
Attempt 2: Wait 2 seconds (1 * 2.0)
Attempt 3: Wait 4 seconds (2 * 2.0)
Attempt 4: Wait 8 seconds (4 * 2.0)
...
Attempt N: Wait up to max_interval (60s)

Reconnection stops when:

  • max_retries reached (if configured)
  • component stopped
  • connection succeeds

Metrics

Prometheus metrics exposed:

# Message throughput
websocket_input_messages_received_total{component,type} - Total received
websocket_input_messages_published_total{component,subject} - Total published
websocket_input_messages_dropped_total{component,reason} - Total dropped

# Connection state
websocket_input_connections_active{component} - Active connections
websocket_input_connections_total{component} - Total connections
websocket_input_reconnect_attempts_total{component} - Reconnection attempts

# Request/Reply (bidirectional mode)
websocket_input_requests_sent_total{component,method} - Requests sent
websocket_input_replies_received_total{component,status} - Replies received
websocket_input_request_timeouts_total{component} - Request timeouts
websocket_input_request_duration_seconds{component,method} - Round-trip time

# Queue state
websocket_input_queue_depth{component} - Current queue depth
websocket_input_queue_utilization{component} - Queue utilization (0.0-1.0)

# Errors
websocket_input_errors_total{component,type} - Errors by type

Health Checks

Component health response:

{
  "healthy": true,
  "status": "connected",
  "details": {
    "mode": "server",
    "connections": {
      "active": 5,
      "total": 37
    },
    "queue": {
      "depth": 45,
      "utilization": 0.045
    },
    "throughput": {
      "messages_per_second": 250
    }
  }
}

Unhealthy states:

  • Server mode: No active connections for > 5 minutes
  • Client mode: Not connected and max retries exceeded
  • Queue full for > 30 seconds (backpressure issue)

Security

Authentication:

Bearer Token (recommended):

export WS_INGEST_TOKEN="sk-1234567890abcdef"

Basic Auth (legacy):

export WS_USERNAME="semstreams"
export WS_PASSWORD="secret123"

TLS/SSL:

Recommendation: Use reverse proxy (nginx, Caddy) for TLS termination.

Client ───HTTPS───► Nginx ───HTTP───► StreamKit
        (TLS)               (localhost)

Error Handling

Errors are classified using the StreamKit error framework:

  • Fatal: Invalid mode, missing required config
  • Transient: Connection errors, read errors, publish errors
  • Invalid: Message parse errors, unknown message types

Fatal errors prevent component startup. Transient errors trigger reconnection (client mode) or are logged (server mode). Invalid messages are dropped and counted in metrics.

Thread Safety

All public methods are safe for concurrent use:

  • Start(): Protected by lifecycleMu
  • Stop(): Protected by lifecycleMu
  • Internal state: Protected by appropriate mutexes

Message processing is handled by dedicated goroutines:

  • Server mode: One goroutine per client connection
  • Client mode: One read goroutine + one reconnect goroutine
  • Common: One message processor goroutine

Federation Use Cases

Edge-to-Cloud:

Edge Instance: UDP → Processors → WebSocket Output
Cloud Instance: WebSocket Input → Storage/Analytics

Multi-Region Replication:

Region A: WebSocket Output → Region B: WebSocket Input
Region B: WebSocket Output → Region A: WebSocket Input

Hierarchical Processing:

Edge: Raw data → Pre-processing → WebSocket Output
Regional: WebSocket Input → Aggregation → WebSocket Output
Cloud: WebSocket Input → Analytics/Storage

Integration Example

import (
    "github.com/c360studio/semstreams/input/websocket_input"
    "github.com/c360studio/semstreams/component"
    "github.com/c360studio/semstreams/natsclient"
    "github.com/c360studio/semstreams/metric"
)

// Create NATS client
nats, _ := natsclient.NewClient("nats://localhost:4222")
nats.Connect(ctx)

// Create metrics registry
metrics := metric.NewMetricsRegistry()

// Create component
config := websocket_input.DefaultConfig()
config.Mode = websocket_input.ModeServer
config.ServerConfig.HTTPPort = 8081

input, _ := websocket_input.NewInput("ws_in", nats, config, metrics)

// Start component
input.Start(ctx)
defer input.Stop(ctx)

// Messages automatically published to NATS subject

See Also

  • output/websocket: WebSocket Output component (sends data)
  • component: Component lifecycle interfaces
  • natsclient: NATS client wrapper
  • metric: Metrics registry and Prometheus integration

Package websocket provides component registration for WebSocket input

Package websocket provides WebSocket input component for receiving federated data

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateInput

func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateInput is the factory function for creating WebSocket input components

func Register

func Register(registry *component.Registry) error

Register registers the WebSocket input component with the registry

Types

type AuthConfig

type AuthConfig struct {
	Type             string `json:"type" schema:"type:string,description:Authentication type (none bearer basic),category:basic"`
	BearerTokenEnv   string `json:"bearer_token_env,omitempty" schema:"type:string,description:Environment variable for bearer token,category:security"`
	BasicUsernameEnv string `` /* 131-byte string literal not displayed */
	BasicPasswordEnv string `` /* 131-byte string literal not displayed */
}

AuthConfig holds authentication configuration

type BackpressureConfig

type BackpressureConfig struct {
	Enabled   bool   `json:"enabled" schema:"type:bool,description:Enable backpressure handling,category:basic"`
	QueueSize int    `json:"queue_size" schema:"type:int,description:Internal message queue size,category:limits"`
	OnFull    string `json:"on_full" schema:"type:string,description:Action when queue full (drop_oldest drop_newest block),category:policy"`
}

BackpressureConfig holds backpressure handling configuration

type BidirectionalConfig

type BidirectionalConfig struct {
	Enabled               bool          `json:"enabled" schema:"type:bool,description:Enable request/reply patterns,category:basic"`
	RequestTimeout        time.Duration `json:"request_timeout" schema:"type:duration,description:Timeout for request/reply,category:timing"`
	MaxConcurrentRequests int           `json:"max_concurrent_requests" schema:"type:int,description:Maximum concurrent requests,category:limits"`
}

BidirectionalConfig holds bidirectional communication configuration

type ClientConfig

type ClientConfig struct {
	URL       string           `json:"url" schema:"type:string,description:WebSocket server URL to connect to,category:basic"`
	Reconnect *ReconnectConfig `json:"reconnect,omitempty" schema:"type:object,description:Reconnection configuration,category:reliability"`
}

ClientConfig holds client mode configuration

type Config

type Config struct {
	// Mode determines if component acts as server (listen) or client (connect)
	Mode Mode `json:"mode" schema:"type:string,description:Operation mode (server or client),category:basic"`

	// Server mode configuration
	ServerConfig *ServerConfig `json:"server,omitempty" schema:"type:object,description:Server mode configuration,category:server"`

	// Client mode configuration
	ClientConfig *ClientConfig `json:"client,omitempty" schema:"type:object,description:Client mode configuration,category:client"`

	// Authentication configuration
	Auth *AuthConfig `json:"auth,omitempty" schema:"type:object,description:Authentication configuration,category:security"`

	// Bidirectional communication configuration
	Bidirectional *BidirectionalConfig `json:"bidirectional,omitempty" schema:"type:object,description:Bidirectional request/reply configuration,category:advanced"`

	// Backpressure configuration
	Backpressure *BackpressureConfig `json:"backpressure,omitempty" schema:"type:object,description:Backpressure handling configuration,category:advanced"`

	// Port configuration for inputs and outputs
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
}

Config holds configuration for WebSocket input component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for WebSocket input

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input implements a WebSocket input component that receives federated data

func NewInput

func NewInput(
	name string,
	natsClient *natsclient.Client,
	config Config,
	metricsRegistry *metric.MetricsRegistry,
	securityCfg security.Config,
) (*Input, error)

NewInput creates a new WebSocket input component

func (*Input) ConfigSchema

func (i *Input) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Input) DataFlow

func (i *Input) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Input) Health

func (i *Input) Health() component.HealthStatus

Health returns current health status

func (*Input) Initialize

func (i *Input) Initialize() error

Initialize initializes the component (no-op for WebSocket input)

func (*Input) InputPorts

func (i *Input) InputPorts() []component.Port

InputPorts returns the input ports (none for input components)

func (*Input) Meta

func (i *Input) Meta() component.Metadata

Meta returns component metadata

func (*Input) OutputPorts

func (i *Input) OutputPorts() []component.Port

OutputPorts returns the output ports

func (*Input) Process

func (i *Input) Process(_ any) error

Process implements component.LifecycleComponent (not used for input components)

func (*Input) Start

func (i *Input) Start(ctx context.Context) error

Start starts the WebSocket input component

func (*Input) Stop

func (i *Input) Stop(timeout time.Duration) error

Stop stops the WebSocket input component

type MessageEnvelope

type MessageEnvelope struct {
	Type      string          `json:"type"`              // Message type (see above)
	ID        string          `json:"id"`                // Unique message ID (for correlation)
	Timestamp int64           `json:"timestamp"`         // Unix milliseconds
	Payload   json.RawMessage `json:"payload,omitempty"` // Optional payload (required for data/nack/slow)
}

MessageEnvelope wraps all WebSocket messages with type discrimination Supported types:

  • "data": Application data to be published to NATS
  • "request": Control plane request (future use)
  • "reply": Control plane reply (future use)
  • "ack": Acknowledge successful receipt/processing of data message
  • "nack": Negative acknowledgment (processing failed, may retry)
  • "slow": Backpressure signal indicating receiver is overloaded

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for Input component

type Mode

type Mode string

Mode defines the operation mode for WebSocket input

const (
	// ModeServer listens for incoming WebSocket connections
	ModeServer Mode = "server"
	// ModeClient connects to remote WebSocket server
	ModeClient Mode = "client"
)

type ReconnectConfig

type ReconnectConfig struct {
	Enabled         bool          `json:"enabled" schema:"type:bool,description:Enable automatic reconnection,category:basic"`
	MaxRetries      int           `json:"max_retries" schema:"type:int,description:Maximum reconnection attempts (0=unlimited),category:limits"`
	InitialInterval time.Duration `json:"initial_interval" schema:"type:duration,description:Initial retry interval,category:timing"`
	MaxInterval     time.Duration `json:"max_interval" schema:"type:duration,description:Maximum retry interval,category:timing"`
	Multiplier      float64       `json:"multiplier" schema:"type:float,description:Backoff multiplier,category:advanced"`
}

ReconnectConfig holds reconnection configuration for client mode

type ServerConfig

type ServerConfig struct {
	HTTPPort          int      `json:"http_port" schema:"type:int,description:HTTP port to listen on,category:basic"`
	Path              string   `json:"path" schema:"type:string,description:WebSocket endpoint path,category:basic"`
	MaxConnections    int      `json:"max_connections" schema:"type:int,description:Maximum concurrent connections,category:limits"`
	ReadBufferSize    int      `json:"read_buffer_size" schema:"type:int,description:WebSocket read buffer size,category:advanced"`
	WriteBufferSize   int      `json:"write_buffer_size" schema:"type:int,description:WebSocket write buffer size,category:advanced"`
	EnableCompression bool     `json:"enable_compression" schema:"type:bool,description:Enable per-message compression,category:advanced"`
	AllowedOrigins    []string `json:"allowed_origins" schema:"type:array,description:Allowed origins for CORS (empty=same-origin only),category:security"`
}

ServerConfig holds server mode configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL