corio

package module
v0.1.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2025 License: MIT Imports: 6 Imported by: 0

README

corio

Go Reference Go Report Card

Structured concurrency and batched I/O operations for Go.

Features

  • Coroutine-like task scheduling with suspendable functions
  • Batched I/O operations for improved performance
  • Concurrent task management with parent-child relationships
  • Full generic support for handling different input/output types
  • Synchronization primitives:
    • Mutex for mutual exclusion
    • WaitGroup for synchronized task completion
    • ErrGroup for handling errors from concurrent tasks
    • SingleFlight for deduplicating identical in-flight requests

Installation

go get github.com/webriots/corio

Quick Start

package main

import (
	"context"
	"fmt"

	"github.com/webriots/corio"
)

type batchIO struct{}

func (*batchIO) Dispatch(
	ctx context.Context,
	alloc *corio.IOAllocator[string, int],
	sema chan struct{},
	reqs []*corio.IORequest[string, int],
	resp chan *corio.IOBatch[string, int],
) {
	go func() {
		fmt.Printf("%d IO requests batched\n", len(reqs))

		sema <- struct{}{}
		defer func() { <-sema }()

		batch := alloc.NewBatch(reqs...)
		for i := range reqs {
			alloc.SetBatchResponse(batch, i, i)
		}

		resp <- batch
	}()
}

func main() {
	prog := func(_ context.Context, task *corio.Task[string, int]) {
		for i := 0; i < 10; i++ {
			task.Run(func(_ context.Context, task *corio.Task[string, int]) {
				_ = task.IO(fmt.Sprintf("create %v", i))
				_ = task.IO(fmt.Sprintf("read %v", i))
				_ = task.IO(fmt.Sprintf("update %v", i))
				_ = task.IO(fmt.Sprintf("delete %v", i))
			})
		}
	}

	corio.IO(new(batchIO)).Run(prog).Resume(context.Background())
}

Playground

Core Concepts

Tasks

Tasks are the core unit of work in corio. They can:

  • Perform I/O operations with task.IO()
  • Spawn child tasks with task.Go() or task.Run()
  • Wait for child tasks with task.Wait()
  • Use synchronization primitives like Mutex, WaitGroup, and ErrGroup
  • Deduplicate identical operations with task.Do()
task.Run(func(ctx context.Context, task *corio.Task[string, int]) {
    // This is a child task
    result := task.IO("some input")
    // Process result...
})
I/O Operations

I/O operations are processed through a custom dispatcher that can batch related requests:

// Define a custom dispatcher
type myDispatcher struct{}

func (d *myDispatcher) Dispatch(
    ctx context.Context,
    alloc *corio.IOAllocator[string, int],
    sema chan struct{},
    reqs []*corio.IORequest[string, int],
    resp chan *corio.IOBatch[string, int],
) {
    // Implementation that processes batches of I/O requests
    batch := alloc.NewBatch(reqs...)

    // Process in a goroutine with concurrency limiting
    go func() {
        sema <- struct{}{} // Acquire semaphore
        defer func() { <-sema }() // Release semaphore

        // Set responses for each request
        for i, req := range reqs {
            // Process req.GetData() and create response
            alloc.SetBatchResponse(batch, i, i)
        }

        // Send completed batch back through response channel
        resp <- batch.validate()
    }()
}

// Create a schedule with the dispatcher
sched := corio.IO[string, int](new(myDispatcher))
Synchronization

corio provides several synchronization primitives:

// Mutex for mutual exclusion
var mutex corio.Mutex
mutex.Lock(task)
// Critical section
mutex.Unlock()

// WaitGroup for waiting on multiple tasks
var wg corio.WaitGroup
wg.Add(1)
task.Go(func(ctx context.Context) {
    defer wg.Done()
    // Task work...
})
wg.Wait(task)

// ErrGroup for handling errors
group := task.Group()
group.Go(func(ctx context.Context) error {
    // Task that can return an error
    return nil
})
err := group.Wait(task)
SingleFlight Pattern

Deduplicate identical in-flight requests:

value, err, shared := task.Do("cache-key", func() (any, error) {
    // Expensive operation that will only be executed once
    // for concurrent requests with the same key
    return task.IO("expensive-operation"), nil
})

Use Cases

  • HTTP/API servers processing batched requests
  • Database operations that can be optimized by batching
  • Task orchestration with complex dependencies
  • File and network I/O with efficient resource utilization
  • Stateful workflows where tasks need to be suspended/resumed

Performance Considerations

  • Use batched I/O operations for better performance with high-volume I/O
  • The dispatcher controls how I/O requests are processed (sequentially, parallel, or a hybrid approach)
  • Default semaphore limit is 128 concurrent I/O operations
  • Task scheduling is cooperative; tasks suspend themselves during I/O

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License.

Documentation

Overview

Package corio provides structured concurrency primitives and batched I/O operations. It is designed to support coroutine-like task scheduling with a focus on efficient I/O operations through batching and concurrency management.

Key components:

  • Task: The core abstraction representing a coroutine-like unit of work. Tasks can spawn child tasks, perform I/O operations, and wait for completion.

  • Schedule: Manages the scheduling of tasks and I/O operations. It holds a reference to the I/O dispatcher and allocates resources.

  • IODispatch: Interface for implementing I/O dispatching strategies. Users can implement this interface to define how I/O requests are processed.

  • IORequest/IOResponse: Represent I/O operations with their input and output data.

  • IOBatch: A collection of I/O requests and their responses, allowing for batched processing.

  • Synchronization primitives: Mutex, WaitGroup, ErrGroup, and SingleFlight for synchronization and error handling.

Index

Constants

View Source
const (
	// ScheduleIOConcurrencyLimit defines the maximum number of
	// concurrent I/O operations.
	ScheduleIOConcurrencyLimit = 128
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrGroup

type ErrGroup interface {
	// Go starts a new task with the default context.
	Go(func(context.Context) error)
	// GoWithContext starts a new task with the specified context.
	GoWithContext(context.Context, func(context.Context) error)
	// Wait blocks until all tasks have completed and returns the first
	// error encountered.
	Wait(TaskBase) error
}

ErrGroup manages a group of tasks and collects the first error that occurs. It provides methods to start new tasks and wait for all tasks to complete.

type IOAllocator

type IOAllocator[I, O any] struct{}

IOAllocator provides methods for creating and manipulating I/O batches. It helps manage memory allocation for I/O operation processing.

func (*IOAllocator[I, O]) AddBatchRequest

func (ioa *IOAllocator[I, O]) AddBatchRequest(batch *IOBatch[I, O], requests ...*IORequest[I, O])

AddBatchRequest adds the provided requests to an existing I/O batch. It appends the requests to the batch's request slice.

func (*IOAllocator[I, O]) NewBatch

func (ioa *IOAllocator[I, O]) NewBatch(requests ...*IORequest[I, O]) *IOBatch[I, O]

NewBatch creates a new I/O batch with the provided requests. It initializes a new batch and adds the specified requests to it.

func (*IOAllocator[I, O]) SetBatchResponse

func (ioa *IOAllocator[I, O]) SetBatchResponse(batch *IOBatch[I, O], i int, data O)

SetBatchResponse adds a response to a batch for the request at index i. It creates a new IOResponse with the provided output data and adds it to the batch.

func (*IOAllocator[I, O]) SetBatchRetry

func (ioa *IOAllocator[I, O]) SetBatchRetry(batch *IOBatch[I, O], i int, retry *IORequest[I, O])

SetBatchRetry adds a request to the retry list of a batch. It appends the request to the batch's retries slice for later reprocessing.

type IOBatch

type IOBatch[I, O any] struct {
	// contains filtered or unexported fields
}

IOBatch represents a collection of I/O requests, their responses, and any requests that need to be retried. It provides a mechanism for batched processing of I/O operations.

func (*IOBatch[I, O]) Len

func (iob *IOBatch[I, O]) Len() int

Len returns the number of I/O requests in this batch.

func (*IOBatch[I, O]) Requests

func (iob *IOBatch[I, O]) Requests() []*IORequest[I, O]

Requests returns the slice of I/O requests in this batch.

type IODispatch

type IODispatch[I, O any] interface {
	// Dispatch handles a batch of I/O requests and sends responses back
	// through the resp channel. It takes a context for cancellation, an
	// allocator for creating response objects, a semaphore channel for
	// concurrency limiting, a slice of requests to process, and a
	// response channel to send completed batches.
	Dispatch(
		ctx context.Context,
		alloc *IOAllocator[I, O],
		sema chan struct{},
		reqs []*IORequest[I, O],
		resp chan *IOBatch[I, O],
	)
}

IODispatch is an interface that defines how I/O operations are dispatched. Implementers must provide a Dispatch method that processes batches of I/O requests.

type IORequest

type IORequest[I, O any] struct {
	// contains filtered or unexported fields
}

IORequest represents an I/O operation request with input data. It contains a reference to the task that initiated the request and the input data.

func (*IORequest[I, O]) GetData

func (ior *IORequest[I, O]) GetData() I

GetData returns the input data associated with this I/O request.

type IOResponse

type IOResponse[I, O any] struct {
	// contains filtered or unexported fields
}

IOResponse represents the result of a completed I/O operation. It contains a reference to the original request and the output data.

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex provides mutual exclusion for tasks. It allows only one task to hold the lock at a time, suspending other tasks that attempt to acquire the lock until it's released.

func (*Mutex) Lock

func (m *Mutex) Lock(task TaskBase)

Lock acquires the mutex for the given task. If the mutex is already locked, the task will be suspended until the mutex is available.

func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock releases the mutex. If there are tasks waiting to acquire the mutex, one of them will be resumed.

func (*Mutex) WaitCount

func (m *Mutex) WaitCount() int

WaitCount returns the number of tasks waiting to acquire the mutex.

type Resumable

type Resumable[I, O any] struct {
	// contains filtered or unexported fields
}

Resumable represents a function that can be resumed with a Schedule. It contains the function to be executed and a reference to the Schedule.

func (*Resumable[I, O]) Resume

func (r *Resumable[I, O]) Resume(ctx context.Context)

Resume starts the execution of a Resumable with the provided context. It creates a cancellable context and starts the main event loop with the function and Schedule.

type Schedule

type Schedule[I, O any] struct {
	// contains filtered or unexported fields
}

Schedule manages the scheduling of tasks and I/O operations. It holds references to the I/O dispatcher, allocator, and channels for responses and concurrency control.

func IO

func IO[I, O any](dispatch IODispatch[I, O]) *Schedule[I, O]

IO creates a new Schedule with the specified I/O dispatcher. It initializes the allocator, response channel, and semaphore for concurrency control.

func (*Schedule[I, O]) Fn

func (s *Schedule[I, O]) Fn(fn func(context.Context)) func(context.Context, *Task[I, O])

Fn adapts a context-only function to the Task-based function signature. It creates a wrapper function that ignores the Task parameter and calls the original function.

func (*Schedule[I, O]) Go

func (s *Schedule[I, O]) Go(fn func(context.Context)) *Resumable[I, O]

Go creates a Resumable from a function that only takes a context. It wraps the function with Fn to adapt it to the Task-based interface.

func (*Schedule[I, O]) Run

func (s *Schedule[I, O]) Run(fn func(context.Context, *Task[I, O])) *Resumable[I, O]

Run creates a Resumable from a function that takes a context and a Task. The function will be executed when the Resumable is resumed.

type Task

type Task[I, O any] struct {
	// contains filtered or unexported fields
}

Task represents a coroutine-like unit of work that can perform I/O operations. It can be suspended, resumed, and can spawn child tasks.

func TaskFromContext

func TaskFromContext[In, Out any](ctx context.Context) (*Task[In, Out], bool)

TaskFromContext retrieves a Task from a context with the specified generic types. Returns the task and a boolean indicating whether the task was found and had the correct type.

func (*Task[I, O]) Do

func (t *Task[I, O]) Do(key any, fn func() (any, error)) (any, error, bool)

Do executes the given function with deduplication based on the key. If multiple tasks call Do with the same key concurrently, only one execution occurs. Returns the result, error (if any), and whether this was a shared result.

func (*Task[I, O]) Go

func (t *Task[I, O]) Go(fn func(context.Context))

Go spawns a child task with the given context function. It adapts the function to the task interface using Fn.

func (*Task[I, O]) Group

func (t *Task[I, O]) Group() ErrGroup

Group creates a new error group associated with this task. The error group can be used to run functions that return errors and wait for their completion.

func (*Task[I, O]) IO

func (t *Task[I, O]) IO(in I) O

IO performs an I/O operation with the given input. It queues the request, suspends the task, and returns the result when resumed.

func (*Task[I, O]) Log

func (t *Task[I, O]) Log(msg string)

Log adds a log message to the runtime trace if tracing is enabled. The message is prefixed with the task's path in the task hierarchy.

func (*Task[I, O]) Logf

func (t *Task[I, O]) Logf(format string, args ...any)

Logf adds a formatted log message to the runtime trace if tracing is enabled. The message is prefixed with the task's path in the task hierarchy.

func (*Task[I, O]) Run

func (t *Task[I, O]) Run(fn func(context.Context, *Task[I, O]))

Run spawns a child task with the given task function using the current context.

func (*Task[I, O]) Wait

func (t *Task[I, O]) Wait()

Wait suspends the current task until all child tasks complete. If there are no child tasks, it returns immediately.

type TaskBase

type TaskBase interface {
	// Public methods
	Do(any, func() (any, error)) (any, error, bool) // Execute with deduplication
	Go(func(context.Context))                       // Spawn a child task
	Group() ErrGroup                                // Create an error group
	Wait()                                          // Wait for child tasks

	// Logging methods
	Log(string)          // Log a message
	Logf(string, ...any) // Log a formatted message
	// contains filtered or unexported methods
}

TaskBase defines the common interface for all task types. It provides methods for task management, synchronization, and logging.

func MustTaskBaseFromContext

func MustTaskBaseFromContext(ctx context.Context) TaskBase

MustTaskBaseFromContext retrieves a TaskBase from a context, panicking if not found. This function is useful when the caller expects the context to definitely contain a task.

func TaskBaseFromContext

func TaskBaseFromContext(ctx context.Context) (TaskBase, bool)

TaskBaseFromContext retrieves a TaskBase from a context. Returns the task base and a boolean indicating whether a task was found in the context.

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup is used to wait for a collection of tasks to finish. Tasks call Add(1) when they start and Done() when they finish. Other tasks can call Wait() to block until all tasks have finished.

func (*WaitGroup) Add

func (wg *WaitGroup) Add(delta int)

Add adds delta to the WaitGroup counter. If the counter becomes zero and there are tasks waiting, they will be resumed. If the counter goes negative, Add panics.

func (*WaitGroup) Done

func (wg *WaitGroup) Done()

Done decrements the WaitGroup counter by one. It's a convenience method equivalent to Add(-1).

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait(task TaskBase)

Wait blocks the calling task until the WaitGroup counter is zero. If the counter is already zero, it returns immediately.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL