Documentation
¶
Overview ¶
Package corio provides structured concurrency primitives and batched I/O operations. It is designed to support coroutine-like task scheduling with a focus on efficient I/O operations through batching and concurrency management.
Key components:
Task: The core abstraction representing a coroutine-like unit of work. Tasks can spawn child tasks, perform I/O operations, and wait for completion.
Schedule: Manages the scheduling of tasks and I/O operations. It holds a reference to the I/O dispatcher and allocates resources.
IODispatch: Interface for implementing I/O dispatching strategies. Users can implement this interface to define how I/O requests are processed.
IORequest/IOResponse: Represent I/O operations with their input and output data.
IOBatch: A collection of I/O requests and their responses, allowing for batched processing.
Synchronization primitives: Mutex, WaitGroup, ErrGroup, and SingleFlight for synchronization and error handling.
Index ¶
- Constants
- type ErrGroup
- type IOAllocator
- func (ioa *IOAllocator[I, O]) AddBatchRequest(batch *IOBatch[I, O], requests ...*IORequest[I, O])
- func (ioa *IOAllocator[I, O]) NewBatch(requests ...*IORequest[I, O]) *IOBatch[I, O]
- func (ioa *IOAllocator[I, O]) SetBatchResponse(batch *IOBatch[I, O], i int, data O)
- func (ioa *IOAllocator[I, O]) SetBatchRetry(batch *IOBatch[I, O], i int, retry *IORequest[I, O])
- type IOBatch
- type IODispatch
- type IORequest
- type IOResponse
- type Mutex
- type Resumable
- type Schedule
- type Task
- func (t *Task[I, O]) Do(key any, fn func() (any, error)) (any, error, bool)
- func (t *Task[I, O]) Go(fn func(context.Context))
- func (t *Task[I, O]) Group() ErrGroup
- func (t *Task[I, O]) IO(in I) O
- func (t *Task[I, O]) Log(msg string)
- func (t *Task[I, O]) Logf(format string, args ...any)
- func (t *Task[I, O]) Run(fn func(context.Context, *Task[I, O]))
- func (t *Task[I, O]) Wait()
- type TaskBase
- type WaitGroup
Constants ¶
const ( // ScheduleIOConcurrencyLimit defines the maximum number of // concurrent I/O operations. ScheduleIOConcurrencyLimit = 128 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrGroup ¶
type ErrGroup interface {
// Go starts a new task with the default context.
Go(func(context.Context) error)
// GoWithContext starts a new task with the specified context.
GoWithContext(context.Context, func(context.Context) error)
// Wait blocks until all tasks have completed and returns the first
// error encountered.
Wait(TaskBase) error
}
ErrGroup manages a group of tasks and collects the first error that occurs. It provides methods to start new tasks and wait for all tasks to complete.
type IOAllocator ¶
type IOAllocator[I, O any] struct{}
IOAllocator provides methods for creating and manipulating I/O batches. It helps manage memory allocation for I/O operation processing.
func (*IOAllocator[I, O]) AddBatchRequest ¶
func (ioa *IOAllocator[I, O]) AddBatchRequest(batch *IOBatch[I, O], requests ...*IORequest[I, O])
AddBatchRequest adds the provided requests to an existing I/O batch. It appends the requests to the batch's request slice.
func (*IOAllocator[I, O]) NewBatch ¶
func (ioa *IOAllocator[I, O]) NewBatch(requests ...*IORequest[I, O]) *IOBatch[I, O]
NewBatch creates a new I/O batch with the provided requests. It initializes a new batch and adds the specified requests to it.
func (*IOAllocator[I, O]) SetBatchResponse ¶
func (ioa *IOAllocator[I, O]) SetBatchResponse(batch *IOBatch[I, O], i int, data O)
SetBatchResponse adds a response to a batch for the request at index i. It creates a new IOResponse with the provided output data and adds it to the batch.
func (*IOAllocator[I, O]) SetBatchRetry ¶
func (ioa *IOAllocator[I, O]) SetBatchRetry(batch *IOBatch[I, O], i int, retry *IORequest[I, O])
SetBatchRetry adds a request to the retry list of a batch. It appends the request to the batch's retries slice for later reprocessing.
type IOBatch ¶
type IOBatch[I, O any] struct { // contains filtered or unexported fields }
IOBatch represents a collection of I/O requests, their responses, and any requests that need to be retried. It provides a mechanism for batched processing of I/O operations.
type IODispatch ¶
type IODispatch[I, O any] interface { // Dispatch handles a batch of I/O requests and sends responses back // through the resp channel. It takes a context for cancellation, an // allocator for creating response objects, a semaphore channel for // concurrency limiting, a slice of requests to process, and a // response channel to send completed batches. Dispatch( ctx context.Context, alloc *IOAllocator[I, O], sema chan struct{}, reqs []*IORequest[I, O], resp chan *IOBatch[I, O], ) }
IODispatch is an interface that defines how I/O operations are dispatched. Implementers must provide a Dispatch method that processes batches of I/O requests.
type IORequest ¶
type IORequest[I, O any] struct { // contains filtered or unexported fields }
IORequest represents an I/O operation request with input data. It contains a reference to the task that initiated the request and the input data.
type IOResponse ¶
type IOResponse[I, O any] struct { // contains filtered or unexported fields }
IOResponse represents the result of a completed I/O operation. It contains a reference to the original request and the output data.
type Mutex ¶
type Mutex struct {
// contains filtered or unexported fields
}
Mutex provides mutual exclusion for tasks. It allows only one task to hold the lock at a time, suspending other tasks that attempt to acquire the lock until it's released.
func (*Mutex) Lock ¶
Lock acquires the mutex for the given task. If the mutex is already locked, the task will be suspended until the mutex is available.
type Resumable ¶
type Resumable[I, O any] struct { // contains filtered or unexported fields }
Resumable represents a function that can be resumed with a Schedule. It contains the function to be executed and a reference to the Schedule.
type Schedule ¶
type Schedule[I, O any] struct { // contains filtered or unexported fields }
Schedule manages the scheduling of tasks and I/O operations. It holds references to the I/O dispatcher, allocator, and channels for responses and concurrency control.
func IO ¶
func IO[I, O any](dispatch IODispatch[I, O]) *Schedule[I, O]
IO creates a new Schedule with the specified I/O dispatcher. It initializes the allocator, response channel, and semaphore for concurrency control.
func (*Schedule[I, O]) Fn ¶
Fn adapts a context-only function to the Task-based function signature. It creates a wrapper function that ignores the Task parameter and calls the original function.
type Task ¶
type Task[I, O any] struct { // contains filtered or unexported fields }
Task represents a coroutine-like unit of work that can perform I/O operations. It can be suspended, resumed, and can spawn child tasks.
func TaskFromContext ¶
TaskFromContext retrieves a Task from a context with the specified generic types. Returns the task and a boolean indicating whether the task was found and had the correct type.
func (*Task[I, O]) Do ¶
Do executes the given function with deduplication based on the key. If multiple tasks call Do with the same key concurrently, only one execution occurs. Returns the result, error (if any), and whether this was a shared result.
func (*Task[I, O]) Go ¶
Go spawns a child task with the given context function. It adapts the function to the task interface using Fn.
func (*Task[I, O]) Group ¶
Group creates a new error group associated with this task. The error group can be used to run functions that return errors and wait for their completion.
func (*Task[I, O]) IO ¶
func (t *Task[I, O]) IO(in I) O
IO performs an I/O operation with the given input. It queues the request, suspends the task, and returns the result when resumed.
func (*Task[I, O]) Log ¶
Log adds a log message to the runtime trace if tracing is enabled. The message is prefixed with the task's path in the task hierarchy.
func (*Task[I, O]) Logf ¶
Logf adds a formatted log message to the runtime trace if tracing is enabled. The message is prefixed with the task's path in the task hierarchy.
type TaskBase ¶
type TaskBase interface {
// Public methods
Do(any, func() (any, error)) (any, error, bool) // Execute with deduplication
Go(func(context.Context)) // Spawn a child task
Group() ErrGroup // Create an error group
Wait() // Wait for child tasks
// Logging methods
Log(string) // Log a message
Logf(string, ...any) // Log a formatted message
// contains filtered or unexported methods
}
TaskBase defines the common interface for all task types. It provides methods for task management, synchronization, and logging.
func MustTaskBaseFromContext ¶
MustTaskBaseFromContext retrieves a TaskBase from a context, panicking if not found. This function is useful when the caller expects the context to definitely contain a task.
type WaitGroup ¶
type WaitGroup struct {
// contains filtered or unexported fields
}
WaitGroup is used to wait for a collection of tasks to finish. Tasks call Add(1) when they start and Done() when they finish. Other tasks can call Wait() to block until all tasks have finished.
func (*WaitGroup) Add ¶
Add adds delta to the WaitGroup counter. If the counter becomes zero and there are tasks waiting, they will be resumed. If the counter goes negative, Add panics.