event_bus_client

package module
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: MIT Imports: 19 Imported by: 0

README

Event Bus Client

A high-performance Go client library and CLI for the Ambient Labs Event Bus, supporting publish/subscribe messaging with advanced filtering and load testing capabilities.

Go Version License

Features

  • 🚀 High Performance: HTTP POST publishing with WebSocket subscriptions achieving 596+ msg/s throughput
  • 🎯 Target Field Filtering: Filter messages by target_type and target_id for precise routing
  • 📊 Load Testing Suite: Comprehensive testing framework with configurable scenarios
  • 🔄 Auto-Reconnect: Built-in connection resilience and message offset management
  • 🛠️ Developer Friendly: Interactive shell mode and simple CLI commands
  • 📦 Consumer Groups: Support for scalable message processing with offset tracking

Table of Contents

Installation

Prerequisites
  • Go 1.24.3 or later
  • Access to an Event Bus server endpoint
Install as Library
go get github.com/ambientlabscomputing/event_bus_client
Build CLI and Tools
# Clone the repository
git clone https://github.com/ambientlabscomputing/event_bus_client.git
cd event_bus_client

# Build the CLI
go build -o bin/eventbus_cli cmd/eventbus_cli/main.go

# Build the load testing tool
go build -o bin/loadtest cmd/loadtest/main.go

Quick Start

Using the Library
package main

import (
    "context"
    "fmt"
    "log"
    
    client "github.com/ambientlabscomputing/event_bus_client"
)

func main() {
    // Initialize client
    opts := client.EventClientOpts{
        Endpoint:       "ws://localhost:9000/ws",
        AuthToken:      "your-jwt-token",
        CommitInterval: "5s",
        GroupID:        "my-consumer-group",
    }
    
    ec := client.NewEventClient(opts)
    
    // Subscribe to topics
    subs := []client.SubscriptionRequest{
        {
            GroupID: "my-consumer-group",
            Topic:   "orders",
        },
    }
    
    ctx := context.Background()
    if err := ec.Connect(ctx, &subs); err != nil {
        log.Fatal(err)
    }
    
    // Process messages
    msgChan := ec.IncomingMsgChannel()
    for msg := range msgChan {
        fmt.Printf("Received: %s\n", msg.Content)
    }
}
Using the CLI
# Configure credentials (one time)
export EVENT_BUS_ENDPOINT="ws://localhost:9000/ws"
export EVENT_BUS_TOKEN="your-jwt-token"
export EVENT_BUS_GROUP_ID="my-group"

# Publish a message
./bin/eventbus_cli publish my-topic "Hello, World!"

# Subscribe to messages
./bin/eventbus_cli subscribe my-topic

# Interactive shell mode
./bin/eventbus_cli shell

Usage

Library
Publishing Messages
// HTTP POST publishing (recommended for reliability)
err := ec.PublishHTTP(ctx, client.HTTPPublishRequest{
    Topic:      "orders",
    Content:    `{"order_id": "123", "amount": 99.99}`,
    TargetType: "user",
    TargetID:   "user-456",
    TraceID:    "trace-789",
})
Subscribing with Filters
// Helper function for string pointers
func stringPtr(s string) *string { return &s }

subs := []client.SubscriptionRequest{
    {
        GroupID:    "payment-processor",
        Topic:      "orders",
        TargetType: stringPtr("user"),      // Only receive user-targeted messages
        TargetID:   stringPtr("user-456"),  // Only for this specific user
    },
}
Processing Messages
msgChan := ec.IncomingMsgChannel()
for msg := range msgChan {
    // Access message fields
    fmt.Printf("Topic: %s\n", msg.Topic)
    fmt.Printf("Content: %s\n", msg.Content)
    
    // Access metadata
    if msg.TargetType != nil {
        fmt.Printf("Target Type: %s\n", *msg.TargetType)
    }
    if msg.TargetID != nil {
        fmt.Printf("Target ID: %s\n", *msg.TargetID)
    }
    
    // Offsets are automatically committed based on CommitInterval
}
CLI
Configuration

Create a config.yaml file (see config.yaml.example):

endpoint: ws://localhost:9000/ws
token: your-jwt-token
group_id: my-consumer-group

Or use environment variables:

export EVENT_BUS_ENDPOINT="ws://localhost:9000/ws"
export EVENT_BUS_TOKEN="your-jwt-token"
export EVENT_BUS_GROUP_ID="my-group"
Publishing
# Basic publish
./bin/eventbus_cli publish orders "New order received"

# With target fields
./bin/eventbus_cli publish orders "Payment required" \
  --target-type user \
  --target-id user-123

# With all metadata
./bin/eventbus_cli publish orders '{"order_id": "456"}' \
  --target-type user \
  --target-id user-789 \
  --trace-id trace-abc \
  --org-id org-xyz
Subscribing
# Basic subscribe
./bin/eventbus_cli subscribe orders

# Subscribe with filters (only receive matching messages)
./bin/eventbus_cli subscribe orders \
  --target-type user \
  --target-id user-123

# Subscribe with trace filtering
./bin/eventbus_cli subscribe orders \
  --trace-id trace-abc
Interactive Shell
./bin/eventbus_cli shell

# In shell mode:
> subscribe orders
> publish orders "Test message"
> help
> exit
Load Testing

The load testing suite allows you to simulate high-volume workloads and test various scenarios.

Quick Test
./bin/loadtest -config loadtest/examples/quick.json -cli bin/eventbus_cli
Stress Test
./bin/loadtest -config loadtest/examples/stress.json -cli bin/eventbus_cli
Accuracy Test (Target Filtering)
./bin/loadtest -config loadtest/examples/accuracy.json -cli bin/eventbus_cli

See loadtest/README.md for detailed documentation on creating custom test scenarios.

Architecture

Hybrid Publish/Subscribe Model

The client uses an optimized architecture:

  • Publishing: HTTP POST to /api/v1/publish for 100% reliability
  • Subscribing: WebSocket connection to /ws with long-polling (5s hold time)

This hybrid approach provides:

  • ✅ High publish success rate (100% vs 77% with WebSocket-only)
  • ✅ Efficient message delivery with long-polling
  • ✅ Automatic reconnection and offset management
  • ✅ Consumer group support for horizontal scaling
Message Flow
┌──────────────┐      HTTP POST       ┌──────────────┐
│   Publisher  │ ──────────────────> │  Event Bus   │
└──────────────┘    /api/v1/publish   │    Server    │
                                      └──────┬───────┘
                                             │
                    WebSocket + Long Poll    │
                                             ▼
┌──────────────┐                      ┌──────────────┐
│  Subscriber  │ <────────────────── │   Message    │
│   (Group A)  │      Filtered        │    Queue     │
└──────────────┘                      └──────────────┘
Target Field Filtering

Messages can be filtered on the server side based on:

  • target_type: Category of the target entity (e.g., "user", "organization", "order")
  • target_id: Specific identifier (e.g., "user-123", "org-456")

Filter Behavior:

  • Exact match on both type and ID if both specified
  • Type-only match if only target_type specified
  • Broadcast messages (no target fields) are delivered to all subscribers
  • No filter = receive all messages

Configuration

Client Options
type EventClientOpts struct {
    Endpoint       string  // WebSocket endpoint (e.g., "ws://localhost:9000/ws")
    AuthToken      string  // JWT authentication token
    CommitInterval string  // Offset commit frequency (e.g., "5s", "10s")
    GroupID        string  // Consumer group identifier
}
Subscription Request
type SubscriptionRequest struct {
    GroupID    string   // Consumer group ID
    Topic      string   // Topic name
    TargetType *string  // Optional: filter by target type
    TargetID   *string  // Optional: filter by target ID
    TraceID    *string  // Optional: filter by trace ID
    OrgID      *string  // Optional: filter by organization ID
    Limit      int      // Optional: max messages per fetch
}

Examples

Example 1: Simple Publisher
package main

import (
    "context"
    "fmt"
    "log"
    
    client "github.com/ambientlabscomputing/event_bus_client"
)

func main() {
    opts := client.EventClientOpts{
        Endpoint:  "ws://localhost:9000/ws",
        AuthToken: "your-token",
    }
    
    ec := client.NewEventClient(opts)
    ctx := context.Background()
    
    // Connect without subscriptions (publish-only)
    if err := ec.Connect(ctx, nil); err != nil {
        log.Fatal(err)
    }
    
    // Publish messages
    for i := 0; i < 100; i++ {
        err := ec.PublishHTTP(ctx, client.HTTPPublishRequest{
            Topic:   "events",
            Content: fmt.Sprintf("Message %d", i),
        })
        if err != nil {
            log.Printf("Publish error: %v", err)
        }
    }
}
Example 2: Filtered Subscriber
package main

import (
    "context"
    "fmt"
    "log"
    
    client "github.com/ambientlabscomputing/event_bus_client"
)

func main() {
    opts := client.EventClientOpts{
        Endpoint:       "ws://localhost:9000/ws",
        AuthToken:      "your-token",
        CommitInterval: "5s",
        GroupID:        "notification-service",
    }
    
    ec := client.NewEventClient(opts)
    
    // Subscribe with target filter
    userType := "user"
    userID := "user-123"
    
    subs := []client.SubscriptionRequest{
        {
            GroupID:    "notification-service",
            Topic:      "notifications",
            TargetType: &userType,
            TargetID:   &userID,
        },
    }
    
    ctx := context.Background()
    if err := ec.Connect(ctx, &subs); err != nil {
        log.Fatal(err)
    }
    
    // Process filtered messages
    msgChan := ec.IncomingMsgChannel()
    for msg := range msgChan {
        fmt.Printf("Notification for user-123: %s\n", msg.Content)
        // Automatically committed every 5s
    }
}
Example 3: Multi-Topic Subscriber
subs := []client.SubscriptionRequest{
    {
        GroupID: "analytics-service",
        Topic:   "orders",
    },
    {
        GroupID: "analytics-service",
        Topic:   "payments",
    },
    {
        GroupID: "analytics-service",
        Topic:   "shipments",
    },
}

if err := ec.Connect(ctx, &subs); err != nil {
    log.Fatal(err)
}

msgChan := ec.IncomingMsgChannel()
for msg := range msgChan {
    switch msg.Topic {
    case "orders":
        handleOrder(msg)
    case "payments":
        handlePayment(msg)
    case "shipments":
        handleShipment(msg)
    }
}

Developer Guide

Project Structure
event_bus_client/
├── cmd/
│   ├── eventbus_cli/       # CLI application entry point
│   │   └── main.go
│   └── loadtest/           # Load testing tool entry point
│       └── main.go
├── internal/
│   └── cli/                # CLI command implementations
│       ├── root.go         # Root command and setup
│       ├── publish.go      # Publish command
│       ├── subscribe.go    # Subscribe command
│       ├── shell.go        # Interactive shell
│       └── config.go       # Configuration management
├── loadtest/               # Load testing framework
│   ├── config.go           # Test configuration structures
│   ├── runner.go           # Test orchestration
│   ├── metrics.go          # Metrics collection
│   ├── README.md           # Load testing documentation
│   └── examples/           # Example test configurations
│       ├── quick.json
│       ├── basic.json
│       ├── stress.json
│       └── accuracy.json
├── docs/                   # Documentation and test results
├── bin/                    # Compiled binaries (gitignored)
├── main.go                 # Core client library
├── types.go                # Type definitions
├── go.mod                  # Go module definition
├── README.md               # This file
└── config.yaml.example     # Configuration template
Building from Source
# Install dependencies
go mod download

# Build CLI
go build -o bin/eventbus_cli cmd/eventbus_cli/main.go

# Build load tester
go build -o bin/loadtest cmd/loadtest/main.go

# Build for multiple platforms
GOOS=linux GOARCH=amd64 go build -o bin/eventbus_cli-linux-amd64 cmd/eventbus_cli/main.go
GOOS=darwin GOARCH=arm64 go build -o bin/eventbus_cli-darwin-arm64 cmd/eventbus_cli/main.go
GOOS=windows GOARCH=amd64 go build -o bin/eventbus_cli-windows-amd64.exe cmd/eventbus_cli/main.go
Running Tests
# Quick validation test (30 seconds)
./bin/loadtest -config loadtest/examples/quick.json -cli bin/eventbus_cli

# Full load test (2 minutes)
./bin/loadtest -config loadtest/examples/basic.json -cli bin/eventbus_cli

# Stress test (5 minutes)
./bin/loadtest -config loadtest/examples/stress.json -cli bin/eventbus_cli

# Accuracy test for filtering (1 minute)
./bin/loadtest -config loadtest/examples/accuracy.json -cli bin/eventbus_cli

Performance

Based on production testing with backend optimizations:

Metric Value Notes
Publish Rate 1,287 msg/s 77,240 msgs in 60s, 100% success via HTTP POST
Subscribe Rate 596 msg/s Per subscriber, with backend goroutine leak fixes
Message Fanout 5:1 ratio 5 subscribers × N messages = expected behavior
Target Filtering Server-side Reduces client-side processing overhead
Connection Model Hybrid HTTP POST publish + WebSocket subscribe
Before and After Optimizations
Component Before After Improvement
Publish Success 77% (WebSocket) 100% (HTTP POST) +23%
Subscribe Rate 1.4 msg/s 596 msg/s 423×
Target Filtering Not implemented Working

See docs/ for detailed test results and analysis.

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

Quick Contribution Guide
  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests if applicable
  5. Commit your changes (git commit -m 'Add amazing feature')
  6. Push to the branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Acknowledgments

Built with:


Maintained by Ambient Labs ComputingWebsiteGitHub

Documentation

Index

Constants

View Source
const (
	MessageTypePing          = "ping"
	MessageTypePong          = "pong"
	MessageTypeSubscribe     = "subscribe"
	MessageTypeSubscribeResp = "subscribe_response"
	MessageTypeEvent         = "event"
	MessageTypeEventAck      = "event_ack"
	MessageTypeCommitOffset  = "commit_offset"
	MessageTypeFetch         = "fetch"
	MessageTypeFetchResp     = "fetch_response"
)

message types

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendMessageResponse

type AppendMessageResponse struct {
	Offset      int    `json:"offset"`
	PartitionID string `json:"partition_id"`
	Topic       string `json:"topic"`
	Err         error
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewEventClient

func NewEventClient(opts EventClientOpts) (*Client, error)

func (*Client) Close added in v1.2.2

func (ec *Client) Close() error

Close gracefully shuts down the event bus client

func (*Client) CommitOffsets

func (ec *Client) CommitOffsets(ctx context.Context) error

func (*Client) Connect

func (ec *Client) Connect(ctx context.Context, startingSubs *[]SubscriptionRequest) error

Connect establishes the connection to the event bus and sets up initial subscriptions if provided.

func (*Client) HandleConnection

func (ec *Client) HandleConnection(ctx context.Context)

func (*Client) IncomingMsgChannel

func (ec *Client) IncomingMsgChannel() chan Message

func (*Client) OffsetManager

func (ec *Client) OffsetManager(ctx context.Context)

func (*Client) PollingLoop

func (ec *Client) PollingLoop(ctx context.Context)

PollingLoop continuously sends fetch requests using long polling The server holds each request for up to 5 seconds, providing near-instant message delivery when available while minimizing network overhead

func (*Client) Publish

func (ec *Client) Publish(
	ctx context.Context,
	topic, content string,
	targetType, targetID, traceID, orgID *string,
) (*AppendMessageResponse, error)

func (*Client) RawMsgChannel

func (ec *Client) RawMsgChannel() chan WebsocketFrame

func (*Client) SetVerbose

func (ec *Client) SetVerbose(verbose bool)

func (*Client) Stop added in v1.2.3

func (ec *Client) Stop() error

Stop is an alias for Close for backward compatibility

func (*Client) Subscribe

func (ec *Client) Subscribe(ctx context.Context, subReq SubscriptionRequest) error

Subscribe adds a new subscription to an existing connection

type ECDSASignature added in v1.2.0

type ECDSASignature struct {
	R, S *big.Int
}

ECDSASignature represents an ECDSA signature in ASN.1 DER format

type EventClient

type EventClient interface {
	Connect(ctx context.Context, startingSubs *[]SubscriptionRequest) error
	IncomingMsgChannel() chan Message
	Publish(
		ctx context.Context,
		topic, content string,
		targetType, targetID, traceID, orgID *string,
	) (*AppendMessageResponse, error)
	Subscribe(ctx context.Context, subReq SubscriptionRequest) error
}

type EventClientOpts

type EventClientOpts struct {
	Endpoint       string
	AuthToken      string // JWT token for authentication (optional if using mTLS)
	CommitInterval string
	GroupID        string
	// mTLS Authentication (optional, alternative to AuthToken)
	CertPath string // Path to client certificate PEM file
	KeyPath  string // Path to private key PEM file
}

type EventPayload

type EventPayload struct {
	Offset      int     `json:"offset"`
	PartitionID string  `json:"partition_id"`
	Message     Message `json:"message"`
}

EventPayload wraps a message with offset and partition metadata (for consuming)

type FetchPayload

type FetchPayload struct{}

FetchPayload is an empty payload for fetch requests

type FetchResponseEvent

type FetchResponseEvent struct {
	SubscriptionID string  `json:"subscription_id"`
	PartitionID    string  `json:"partition_id"`
	Offset         int     `json:"offset"`
	Message        Message `json:"message"`
}

FetchResponseEvent represents a single event in a fetch response

type FetchResponsePayload

type FetchResponsePayload struct {
	Count    int                  `json:"count"`
	Messages []FetchResponseEvent `json:"messages"`
}

FetchResponsePayload contains the array of events from a fetch request

type HTTPPublishRequest

type HTTPPublishRequest struct {
	Topic      string `json:"topic"`
	Content    string `json:"content"`
	OrgID      string `json:"org_id,omitempty"`
	TraceID    string `json:"trace_id,omitempty"`
	TargetType string `json:"target_type,omitempty"`
	TargetID   string `json:"target_id,omitempty"`
}

HTTP Publishing Types

type HTTPPublishResponse

type HTTPPublishResponse struct {
	Status  string `json:"status"`
	Message string `json:"message"`
	Topic   string `json:"topic"`
}

type MTLSTransport added in v1.2.0

type MTLSTransport struct {
	// contains filtered or unexported fields
}

MTLSTransport wraps http.RoundTripper to add mTLS headers

func (*MTLSTransport) RoundTrip added in v1.2.0

func (t *MTLSTransport) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implements http.RoundTripper

type Message

type Message struct {
	ID          string  `bson:"_id,omitempty" json:"id"`
	Topic       string  `bson:"topic" json:"topic"`
	Content     string  `bson:"content" json:"content"`
	Offset      int     `bson:"offset,omitempty" json:"offset,omitempty"`
	PartitionID string  `bson:"partition_id,omitempty" json:"partition_id,omitempty"`
	TargetType  *string `bson:"target_type" json:"target_type"`
	TargetID    *string `bson:"target_id" json:"target_id"`
	TraceID     *string `bson:"trace_id,omitempty" json:"trace_id,omitempty"`
	OrgID       *string `bson:"org_id,omitempty" json:"org_id,omitempty"`
}

the main message structure used in the event bus

type OffsetCommitMsg

type OffsetCommitMsg struct {
	SubscriptionID string `json:"subscription_id"`
	PartitionID    string `json:"partition_id"`
	Offset         int    `json:"offset"`
	GroupID        string `json:"group_id"`
}

type Subscription

type Subscription struct {
	SubscriptionRequest `json:",inline"`
	ID                  string `json:"id"`
}

type SubscriptionRequest

type SubscriptionRequest struct {
	GroupID    string  `json:"group_id"`
	Topic      string  `json:"topic"`
	TargetType *string `json:"target_type"`
	TargetID   *string `json:"target_id"`
	TraceID    *string `json:"trace_id,omitempty"`
	OrgID      *string `json:"org_id,omitempty"`
	Limit      int     `json:"limit,omitempty"`
}

type SubscriptionResponse

type SubscriptionResponse struct {
	SubscriptionID string `json:"subscription_id"`
	Topic          string `json:"topic"`
	GroupID        string `json:"group_id"`
	Err            error  `json:"error,omitempty"`
}

type WFAppendMessageResp

type WFAppendMessageResp struct {
	WebsocketFrame `json:",inline"`
	Payload        AppendMessageResponse `json:"payload"`
}

type WFFetchMessage

type WFFetchMessage struct {
	WebsocketFrame `json:",inline"`
	Payload        FetchPayload `json:"payload"`
}

WFFetchMessage represents a fetch request to pull new messages

type WFFetchResponse

type WFFetchResponse struct {
	WebsocketFrame `json:",inline"`
	Payload        FetchResponsePayload `json:"payload"`
}

WFFetchResponse represents a fetch response with messages

type WFMessage

type WFMessage struct {
	WebsocketFrame `json:",inline"`
	Payload        EventPayload `json:"payload"`
}

WFMessage for consuming events (with offset and partition wrapper)

type WFOffsetCommitMsg

type WFOffsetCommitMsg struct {
	WebsocketFrame `json:",inline"`
	Payload        OffsetCommitMsg `json:"payload"`
}

type WFPingMessage

type WFPingMessage struct {
	WebsocketFrame `json:",inline"`
}

func NewPingPongMessage

func NewPingPongMessage(msgTyp string) WFPingMessage

type WFPongMessage

type WFPongMessage struct {
	WebsocketFrame `json:",inline"`
}

type WFPublishMessage

type WFPublishMessage struct {
	WebsocketFrame `json:",inline"`
	Payload        Message `json:"payload"`
}

WFPublishMessage for publishing events (direct message fields)

func NewWebsocketFramedMessage

func NewWebsocketFramedMessage(msg Message) WFPublishMessage

type WFSubscriptionReq

type WFSubscriptionReq struct {
	WebsocketFrame `json:",inline"`
	Payload        SubscriptionRequest `json:"payload"`
}

func NewWebsocketFramedSubscriptionReq

func NewWebsocketFramedSubscriptionReq(subReq SubscriptionRequest) WFSubscriptionReq

type WFSubscriptionResp

type WFSubscriptionResp struct {
	WebsocketFrame `json:",inline"`
	Payload        SubscriptionResponse `json:"payload"`
}

type WebsocketFrame

type WebsocketFrame struct {
	MessageType string `json:"message_type"`
	Version     string `json:"version"`
	Payload     any    `json:"payload"`
}

WebSocket Types

func (*WebsocketFrame) ToAppendMessageResp

func (wf *WebsocketFrame) ToAppendMessageResp() WFAppendMessageResp

func (*WebsocketFrame) ToFetchResponse

func (wf *WebsocketFrame) ToFetchResponse() WFFetchResponse

func (*WebsocketFrame) ToMessage

func (wf *WebsocketFrame) ToMessage() WFMessage

func (*WebsocketFrame) ToPing

func (wf *WebsocketFrame) ToPing() WFPingMessage

func (*WebsocketFrame) ToSubscriptionResp

func (wf *WebsocketFrame) ToSubscriptionResp() WFSubscriptionResp

Directories

Path Synopsis
cmd
eventbus_cli command
loadtest command
internal
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL