batch

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: Apache-2.0, MIT Imports: 21 Imported by: 0

Documentation

Overview

Package batch provides batch processing for reading multiple entries from a blob archive.

Index

Constants

View Source
const (
	CompressionNone = blobtype.CompressionNone
	CompressionZstd = blobtype.CompressionZstd
)

Re-export compression constants for use in filesink.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedSink

type BufferedSink interface {
	PutBuffered(entry *Entry, content []byte) error
}

BufferedSink allows sinks to handle decoded content without copying.

Implementations should not mutate the content slice.

type Committer

type Committer interface {
	io.Writer

	// Commit finalizes the write, making content available.
	// Must be called after successful hash verification.
	Commit() error

	// Discard aborts the write and cleans up any temporary resources.
	// Must be called if verification fails or an error occurs.
	Discard() error
}

Committer is a writer that can be committed or discarded.

Implementations should buffer or stage writes until Commit is called. For example, a file-based implementation might write to a temp file and rename it on Commit, or delete it on Discard.

type Compression

type Compression = blobtype.Compression

Compression is an alias for blobtype.Compression.

type Entry

type Entry = blobtype.Entry

Entry is an alias for blobtype.Entry.

type FileSink

type FileSink struct {
	// contains filtered or unexported fields
}

FileSink writes entries to the filesystem.

By default, files are written to a temporary file in the same directory and renamed to the final path on Commit. This ensures that partially written files are never visible at the final path.

func NewFileSink

func NewFileSink(destDir string, opts ...FileSinkOption) *FileSink

NewFileSink creates a FileSink that writes to destDir.

destDir must be an absolute path or relative to the current directory. Parent directories are created automatically as needed.

func (*FileSink) ShouldProcess

func (s *FileSink) ShouldProcess(entry *Entry) bool

ShouldProcess returns false if the file already exists and overwrite is disabled.

func (*FileSink) Writer

func (s *FileSink) Writer(entry *Entry) (Committer, error)

Writer returns a Committer that writes to a temp file and renames on Commit.

type FileSinkOption

type FileSinkOption func(*FileSink)

FileSinkOption configures a FileSink.

func WithDirectWrites

func WithDirectWrites(enabled bool) FileSinkOption

WithDirectWrites disables temp files and writes directly to the final path.

func WithOverwrite

func WithOverwrite(overwrite bool) FileSinkOption

WithOverwrite allows overwriting existing files. By default, existing files are skipped.

func WithPreserveMode

func WithPreserveMode(preserve bool) FileSinkOption

WithPreserveMode preserves file permission modes from the archive. By default, modes are not preserved (files use umask defaults).

func WithPreserveTimes

func WithPreserveTimes(preserve bool) FileSinkOption

WithPreserveTimes preserves file modification times from the archive. By default, times are not preserved (files use current time).

type ProcessStats added in v1.1.0

type ProcessStats struct {
	// Processed is the number of entries successfully written to the sink.
	Processed int

	// Skipped is the number of entries skipped (ShouldProcess returned false).
	Skipped int

	// TotalBytes is the sum of OriginalSize for all processed entries.
	TotalBytes uint64
}

ProcessStats contains statistics from a batch processing operation.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles batch reading and processing of entries from a blob archive.

It provides efficient reading by grouping adjacent entries and processing them together, minimizing the number of read operations on the underlying source.

func NewProcessor

func NewProcessor(source file.ByteSource, pool *file.DecompressPool, maxFileSize uint64, opts ...ProcessorOption) *Processor

NewProcessor creates a new batch processor.

The source provides random access to the data blob. The pool provides reusable zstd decoders for compressed entries. maxFileSize limits the size of individual entries (0 for no limit).

func (*Processor) Process

func (p *Processor) Process(entries []*Entry, sink Sink) (ProcessStats, error)

Process reads and processes entries, writing results to the sink.

Entries are filtered through sink.ShouldProcess, sorted by offset, grouped into contiguous ranges, and processed efficiently. For each entry, the content is decompressed (if needed), hash-verified, and written to the sink.

Processing stops on the first error encountered. The returned ProcessStats contains counts for processed and skipped entries, and total bytes written. On error, partial stats are returned reflecting work completed before the error.

type ProcessorOption

type ProcessorOption func(*Processor)

ProcessorOption configures a Processor.

func WithProcessorLogger

func WithProcessorLogger(logger *slog.Logger) ProcessorOption

WithProcessorLogger sets the logger for batch processing operations. If not set, logging is disabled.

func WithProcessorProgress added in v1.1.0

func WithProcessorProgress(fn blobtype.ProgressFunc) ProcessorOption

WithProcessorProgress sets a callback to receive progress updates during processing. The callback receives events for each file extracted.

func WithReadAheadBytes

func WithReadAheadBytes(limit uint64) ProcessorOption

WithReadAheadBytes caps the total size of buffered group data. A value of 0 disables the byte budget.

func WithReadConcurrency

func WithReadConcurrency(n int) ProcessorOption

WithReadConcurrency sets the number of concurrent range reads. Values < 1 force serial reads.

func WithWorkers

func WithWorkers(n int) ProcessorOption

WithWorkers sets the number of workers for parallel processing. Values < 0 force serial processing. Zero uses automatic heuristics. Values > 0 force a specific worker count.

type Sink

type Sink interface {
	// ShouldProcess returns false if this entry should be skipped.
	// This allows implementations to skip already-cached entries or existing files.
	ShouldProcess(entry *Entry) bool

	// Writer returns a writer for the entry's content.
	// The returned Committer must have Commit() called after successful
	// write and verification, or Discard() called on any error.
	//
	// The caller will:
	// 1. Write decompressed content to the Committer
	// 2. Verify the SHA256 hash matches entry.Hash
	// 3. Call Commit() if verification succeeds, Discard() otherwise
	Writer(entry *Entry) (Committer, error)
}

Sink receives decompressed and verified file content during batch processing.

Implementations determine where content is written (cache, filesystem, etc.) and can filter which entries to process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL