topic

package module
v0.0.0-...-6dbbdfa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2025 License: Apache-2.0 Imports: 7 Imported by: 4

Documentation

Overview

Package topic implements a generic, buffering publish-subscribe messaging system with dynamic fanout.

Messages sent to a Topic are duplicated and delivered to all subscribed receivers. Incoming messages are queued in-memory when receivers are not ready, with configurable buffering behavior per receiver. Receivers can be added or removed dynamically, and the most recent message can be queried.

The Topic type is safe for concurrent use by multiple goroutines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ReceiveCh

func ReceiveCh[T any](r *Receiver[T]) (<-chan T, error)

ReceiveCh returns a select-friendly channel to receive messages from a topic's receiver. Inter-mixing Receive/All methods with receiver-channels will result in out-of-order delivery of values, but values are delivered only once. Also, using receive channels is more expensive than direct receive methods.

NOTE: A background helper goroutine is used to retrieve messages from the topic and publish them to the topic. At max one background goroutine is created on demand and it is private to the input receiver.

Returned channel will be closed if the topic is closed or receiver is unsubscribed.

func SendCh

func SendCh[T any](t *Topic[T]) (chan<- T, error)

SendCh returns a select-friendly channel to send messages over to a topic. This is an alternative to the Send method, where a channel is more appropriate.

NOTE: A background helper goroutine is used to receive messages over the channel and publish them to the topic. At max one background goroutine is created on demand and it is private to the input topic.

Returned channel will block forever after the topic is closed.

Types

type Receiver

type Receiver[T any] struct {
	// contains filtered or unexported fields
}

Receiver represents a subscriber to a Topic. It provides methods to manage subscription lifecycle, such as unsubscribing, and to receive messages.

func Subscribe

func Subscribe[T any](t *Topic[T], limit int, includeLast bool) (*Receiver[T], error)

Subscribe adds a new receiver to the Topic, returning a Receiver for consuming messages via Receive. The limit parameter controls the receiver's queue behavior:

  • limit == 0: Unbounded queue, buffering all messages (memory-limited).
  • limit > 0: Buffers the most recent limit messages, discarding older ones.
  • limit < 0: Buffers the oldest limit messages, discarding newer ones.

If the Topic is closed or its context is canceled, Subscribe returns an error.

func SubscribeFunc

func SubscribeFunc[T, U any](t *Topic[T], fn func(T) U, limit int, includeLast bool) (*Receiver[U], error)

SubscribeFunc is similar to Subscribe, but converts the received value from one type to another using the input helper function. Converter function is invoked under the topic's Send context, so it SHOULD NOT block for optimal performance.

func (*Receiver[T]) All

func (r *Receiver[T]) All(ctx context.Context, errp *error) iter.Seq[T]

All returns an iterator to process all incoming values over the topic. Iterator stops with nil if the topic or the receiver is closed.

func (*Receiver[T]) Close

func (r *Receiver[T]) Close()

Close removes the receiver from the Topic, discarding pending messages. Unsubscribe is idempotent. After unsubscribing, the receiver cannot be reused.

func (*Receiver[T]) Receive

func (r *Receiver[T]) Receive() (T, error)

Receive returns the next available message from the receiver's queue, blocking until a message is available or the topic/receiver is closed. If closed, it returns os.ErrClosed. If the topic's context is canceled, it returns the context's error.

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic represents a publish-subscribe channel that duplicates messages to all subscribed receivers. Messages are queued in-memory for slow receivers, and the buffering behavior is configured per receiver via Subscribe. Topics are created with New and support generic message types.

func New

func New[T any]() *Topic[T]

New creates a new Topic for messages of type T. The returned Topic is ready to accept messages via Send and subscribers via Subscribe. The provided context controls the topic's lifecycle; when it cancels, the topic closes.

Example:

topic := topic.New[string]()
topic.Send("Hello, world!")

func (*Topic[T]) Close

func (t *Topic[T]) Close() error

Close shuts down the Topic, preventing further subscriptions or message sends. After closing, Send is a no-op, Subscribe returns an error, and Recent may return false. Close is idempotent.

func (*Topic[T]) Last

func (t *Topic[T]) Last() (v T, ok bool)

Last returns the most recent message sent over the Topic. Returns false if no messages are ever sent over the topic.

func (*Topic[T]) Send

func (t *Topic[T]) Send(v T) error

Send publishes a message to the Topic. The message is duplicated and delivered to all subscribed receivers. Returns os.ErrClosed if the Topic is closed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL