Documentation
¶
Overview ¶
Package topic implements a generic, buffering publish-subscribe messaging system with dynamic fanout.
Messages sent to a Topic are duplicated and delivered to all subscribed receivers. Incoming messages are queued in-memory when receivers are not ready, with configurable buffering behavior per receiver. Receivers can be added or removed dynamically, and the most recent message can be queried.
The Topic type is safe for concurrent use by multiple goroutines.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ReceiveCh ¶
ReceiveCh returns a select-friendly channel to receive messages from a topic's receiver. Inter-mixing Receive/All methods with receiver-channels will result in out-of-order delivery of values, but values are delivered only once. Also, using receive channels is more expensive than direct receive methods.
NOTE: A background helper goroutine is used to retrieve messages from the topic and publish them to the topic. At max one background goroutine is created on demand and it is private to the input receiver.
Returned channel will be closed if the topic is closed or receiver is unsubscribed.
func SendCh ¶
SendCh returns a select-friendly channel to send messages over to a topic. This is an alternative to the Send method, where a channel is more appropriate.
NOTE: A background helper goroutine is used to receive messages over the channel and publish them to the topic. At max one background goroutine is created on demand and it is private to the input topic.
Returned channel will block forever after the topic is closed.
Types ¶
type Receiver ¶
type Receiver[T any] struct { // contains filtered or unexported fields }
Receiver represents a subscriber to a Topic. It provides methods to manage subscription lifecycle, such as unsubscribing, and to receive messages.
func Subscribe ¶
Subscribe adds a new receiver to the Topic, returning a Receiver for consuming messages via Receive. The limit parameter controls the receiver's queue behavior:
- limit == 0: Unbounded queue, buffering all messages (memory-limited).
- limit > 0: Buffers the most recent limit messages, discarding older ones.
- limit < 0: Buffers the oldest limit messages, discarding newer ones.
If the Topic is closed or its context is canceled, Subscribe returns an error.
func SubscribeFunc ¶
func SubscribeFunc[T, U any](t *Topic[T], fn func(T) U, limit int, includeLast bool) (*Receiver[U], error)
SubscribeFunc is similar to Subscribe, but converts the received value from one type to another using the input helper function. Converter function is invoked under the topic's Send context, so it SHOULD NOT block for optimal performance.
func (*Receiver[T]) All ¶
All returns an iterator to process all incoming values over the topic. Iterator stops with nil if the topic or the receiver is closed.
func (*Receiver[T]) Close ¶
func (r *Receiver[T]) Close()
Close removes the receiver from the Topic, discarding pending messages. Unsubscribe is idempotent. After unsubscribing, the receiver cannot be reused.
func (*Receiver[T]) Receive ¶
Receive returns the next available message from the receiver's queue, blocking until a message is available or the topic/receiver is closed. If closed, it returns os.ErrClosed. If the topic's context is canceled, it returns the context's error.
type Topic ¶
type Topic[T any] struct { // contains filtered or unexported fields }
Topic represents a publish-subscribe channel that duplicates messages to all subscribed receivers. Messages are queued in-memory for slow receivers, and the buffering behavior is configured per receiver via Subscribe. Topics are created with New and support generic message types.
func New ¶
New creates a new Topic for messages of type T. The returned Topic is ready to accept messages via Send and subscribers via Subscribe. The provided context controls the topic's lifecycle; when it cancels, the topic closes.
Example:
topic := topic.New[string]()
topic.Send("Hello, world!")
func (*Topic[T]) Close ¶
Close shuts down the Topic, preventing further subscriptions or message sends. After closing, Send is a no-op, Subscribe returns an error, and Recent may return false. Close is idempotent.