engine

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Overview

Package engine implements the core vector database engine.

The engine orchestrates:

  • MemTable for hot writes (append-only, lock-free reads)
  • Flat segments for medium-sized data
  • DiskANN segments for large-scale ANN search
  • Background flush and compaction loops
  • MVCC snapshot isolation for concurrent readers
  • Tombstone-based deletion with COW semantics
  • WAL for durability and crash recovery

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is returned when an operation is attempted on a closed engine or segment.
	ErrClosed = errors.New("engine closed")

	// ErrInvalidArgument is returned when an argument is invalid (e.g. wrong dimension, k <= 0).
	ErrInvalidArgument = errors.New("invalid argument")

	// ErrCorrupt is returned when data corruption is detected (checksum mismatch, etc.).
	ErrCorrupt = errors.New("data corruption detected")

	// ErrIncompatibleFormat is returned when the on-disk format is not supported.
	ErrIncompatibleFormat = errors.New("incompatible format")

	// ErrBackpressure is returned when the system is under heavy load and rejects the operation.
	ErrBackpressure = errors.New("backpressure: resource limit exceeded")

	// ErrNotFound is returned when a requested ID is not found.
	ErrNotFound = errors.New("not found")

	// ErrReadOnly is returned when a write operation is attempted on a read-only engine.
	// Use ReadOnly() option for stateless serverless deployments.
	ErrReadOnly = errors.New("engine is read-only")
)

Functions

func GoSafe

func GoSafe(fn func())

GoSafe runs a function in a goroutine and recovers from panics. It logs the panic and stack trace instead of crashing the process.

func WithFilter

func WithFilter(filter *metadata.FilterSet) func(*model.SearchOptions)

WithFilter sets the metadata filter for the search.

func WithMetadata

func WithMetadata() func(*model.SearchOptions)

WithMetadata requests the metadata to be returned in the search results.

func WithNProbes

func WithNProbes(n int) func(*model.SearchOptions)

WithNProbes sets the number of probes for the search.

func WithPayload

func WithPayload() func(*model.SearchOptions)

WithPayload requests the payload to be returned in the search results.

func WithPreFilter

func WithPreFilter(preFilter bool) func(*model.SearchOptions)

WithPreFilter forces pre-filtering (or post-filtering if false).

func WithRefineFactor

func WithRefineFactor(factor float32) func(*model.SearchOptions)

WithRefineFactor sets the refine factor for the search.

func WithVector

func WithVector() func(*model.SearchOptions)

WithVector requests the vector to be returned in the search results.

Types

type BatchOp

type BatchOp struct {
	Type OpType
	// ID is used for Delete. Ignored for Insert (auto-generated).
	ID       model.ID
	Vector   []float32
	Metadata metadata.Document
	Payload  []byte
}

BatchOp represents a single operation in a WriteBatch.

type BoundedSizeTieredPolicy

type BoundedSizeTieredPolicy struct {
	Threshold int
}

BoundedSizeTieredPolicy implements a size-tiered compaction strategy with explicit bounds. - Segment size buckets: [0-10MB], [10-100MB], [100MB-1GB], [1GB+] - Compact within bucket only when threshold exceeded - Never compact segments spanning multiple buckets in single operation - Max compaction bytes: 2GB hard limit

func (*BoundedSizeTieredPolicy) Pick

type CompactionConfig

type CompactionConfig struct {
	// DiskANNThreshold is the number of rows above which DiskANN is used.
	// If 0, defaults to 10000.
	DiskANNThreshold int

	// FlatQuantizationType is the quantization type for Flat segments.
	// 0=None, 1=SQ8, 2=PQ.
	FlatQuantizationType int

	// DiskANNOptions are the options for DiskANN segments.
	DiskANNOptions diskann.Options
}

CompactionConfig holds configuration for compaction.

type CompactionPolicy

type CompactionPolicy interface {
	// Pick selects segments to compact.
	// Returns a task or nil if no compaction is needed.
	Pick(segments []SegmentStats) *CompactionTask
}

CompactionPolicy determines which segments should be compacted.

type CompactionTask

type CompactionTask struct {
	Segments    []model.SegmentID
	TargetLevel int
}

CompactionTask describes a compaction unit of work.

type CursorFilterable

type CursorFilterable interface {
	FilterCursor(filter *metadata.FilterSet) imetadata.FilterCursor
}

CursorFilterable is the interface for segments that support cursor-based filtering.

type CursorSearchConfig

type CursorSearchConfig struct {
	// SelectivityCutoff: above this threshold, use HNSW instead of brute-force.
	// Default: 0.30 (30%)
	SelectivityCutoff float64

	// BatchSize: number of vectors to fetch per I/O batch.
	// Default: 64
	BatchSize int

	// ForcePreFilter: if true, always use pre-filtering even at high selectivity.
	ForcePreFilter bool
}

CursorSearchConfig holds configuration for cursor-based search.

func DefaultCursorSearchConfig

func DefaultCursorSearchConfig() CursorSearchConfig

DefaultCursorSearchConfig returns sensible defaults.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the main entry point for the vector database.

func OpenLocal

func OpenLocal(ctx context.Context, dir string, opts ...Option) (*Engine, error)

OpenLocal opens or creates an Engine using local storage. If dim/metric are not provided via options, they are loaded from an existing manifest. The ctx parameter is used for initialization I/O (loading manifest, segments, etc.).

func OpenRemote

func OpenRemote(ctx context.Context, store blobstore.BlobStore, opts ...Option) (*Engine, error)

OpenRemote opens an Engine backed by a remote store (e.g., S3, GCS). The store is the source of truth. A local cache directory is auto-created if not specified. Dim/metric are loaded from the remote manifest (use WithDimension/WithMetric for new indexes). The ctx parameter is used for initialization I/O (loading manifest, segments, etc.).

OpenRemote supports both reads and writes. Segments are written directly to the remote store via atomic Put operations. Segments are built in memory and uploaded as immutable blobs, requiring no local filesystem.

For read-only search nodes (e.g., serverless/K8s replicas), use the ReadOnly() option:

vecgo.Open(ctx, vecgo.Remote(store), vecgo.ReadOnly())

This enables stateless, horizontally scalable search nodes.

func (*Engine) ApplyBatch

func (e *Engine) ApplyBatch(ctx context.Context, batch *WriteBatch) ([]model.ID, error)

ApplyBatch executes a batch of operations atomically. It returns the IDs of inserted records (0 for non-inserts) and any error.

func (*Engine) BatchDelete

func (e *Engine) BatchDelete(ctx context.Context, ids []model.ID) error

BatchDelete removes multiple vectors from the engine in a single operation. It is atomic and more efficient than calling Delete in a loop.

func (*Engine) BatchInsert

func (e *Engine) BatchInsert(ctx context.Context, vectors [][]float32, mds []metadata.Document, payloads [][]byte) ([]model.ID, error)

BatchInsert adds multiple vectors to the engine with immediate HNSW indexing.

All vectors are indexed into the in-memory HNSW graph and become searchable immediately. This amortizes lock overhead compared to individual Insert calls.

Performance: ~3,000 vectors/sec at batch=100 (768 dim).

Use this when you have batches of vectors that need immediate searchability (e.g., processing a batch of documents in a RAG pipeline).

For bulk loading where immediate search is not required, use BatchInsertDeferred which is ~1000x faster but vectors only become searchable after flush.

func (*Engine) BatchInsertDeferred

func (e *Engine) BatchInsertDeferred(ctx context.Context, vectors [][]float32, mds []metadata.Document, payloads [][]byte) ([]model.ID, error)

BatchInsertDeferred adds multiple vectors WITHOUT HNSW indexing (Bulk Load Mode).

This is the fastest insert path — vectors are stored in columnar format but NOT indexed into the in-memory HNSW graph. Vectors become searchable only after Commit() triggers a flush, which writes them to a DiskANN segment.

Performance: ~2,000,000 vectors/sec (768 dim) — approximately 1000x faster than BatchInsert because it skips HNSW graph construction entirely.

Use cases:

  • Initial data loading (embedding a corpus)
  • Database migration
  • Nightly reindex jobs
  • Any scenario where immediate searchability is not required

NOT suitable for:

  • Real-time RAG (vectors must be searchable immediately)
  • Interactive applications with instant feedback

Example:

// Bulk load 1M vectors
for batch := range batches {
    db.BatchInsertDeferred(ctx, batch.Vectors, batch.Metadata, nil)
}
db.Commit(ctx)  // Flush to DiskANN, now searchable

Implementation: Uses sequential processing (no goroutines) because the bottleneck is memory allocation and data copying, not CPU. Parallel execution with semaphores actually adds ~20% overhead from channel operations.

func (*Engine) BatchSearch

func (e *Engine) BatchSearch(ctx context.Context, queries [][]float32, k int, opts ...func(*model.SearchOptions)) ([][]model.Candidate, error)

BatchSearch performs multiple k-NN searches in parallel. Uses pooled result slices to minimize allocations under concurrent load.

func (*Engine) CacheStats

func (e *Engine) CacheStats() (hits, misses int64)

CacheStats returns the combined statistics of the block caches.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine.

func (*Engine) Commit

func (e *Engine) Commit(ctx context.Context) (err error)

Commit flushes the in-memory buffer to a durable immutable segment.

This is the durability boundary in commit-oriented mode. After Commit() returns successfully, all previously inserted/deleted data is guaranteed to survive crashes.

Usage pattern:

db.Insert(ctx, vec1, meta1, payload1)
db.Insert(ctx, vec2, meta2, payload2)  // Buffered, not yet durable
db.Commit(ctx)                          // Now durable

Commit is safe to call concurrently. It blocks until the flush completes. If the buffer is empty, Commit returns immediately with no error.

func (*Engine) Compact

func (e *Engine) Compact(segmentIDs []model.SegmentID, targetLevel int) error

Compact merges the specified segments into a new segment. It is designed to be concurrency-friendly: 1. Holds Lock to snapshot state (segments + tombstones). 2. Releases Lock to perform heavy I/O (merge). 3. Re-acquires Lock to commit changes (CAS on PK Index).

func (*Engine) CompactWithContext

func (e *Engine) CompactWithContext(ctx context.Context, segmentIDs []model.SegmentID, targetLevel int) (err error)

CompactWithContext performs compaction with a context for cancellation.

func (*Engine) DebugInfo

func (e *Engine) DebugInfo() string

DebugInfo returns a detailed string representation of the engine state.

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, id model.ID) (err error)

Delete removes a vector from the engine.

func (*Engine) Get

func (e *Engine) Get(ctx context.Context, id model.ID) (rec *model.Record, err error)

Get returns the full record (vector, metadata, payload) for the given primary key.

func (*Engine) HybridSearch

func (e *Engine) HybridSearch(ctx context.Context, q []float32, textQuery string, k int, rrfK int, opts ...func(*model.SearchOptions)) ([]model.Candidate, error)

HybridSearch performs a combination of vector search and lexical search. It uses Reciprocal Rank Fusion (RRF) to combine the results. rrfK is the constant k in the RRF formula: score = 1 / (k + rank). Typically rrfK is 60.

func (*Engine) Insert

func (e *Engine) Insert(ctx context.Context, vec []float32, md metadata.Document, payload []byte) (id model.ID, err error)

Insert adds a single vector to the engine with immediate HNSW indexing.

This is the real-time insert path — the vector is indexed into the in-memory HNSW graph and becomes searchable immediately. Use this when you need vectors to be searchable right after insertion (e.g., real-time RAG).

Performance: ~625 vectors/sec (768 dim), 2 allocations per insert.

For bulk loading where immediate search is not required, use BatchInsertDeferred which is ~1000x faster but vectors only become searchable after flush.

Thread-safe: the underlying HNSW is 16-way sharded for concurrent inserts.

func (*Engine) Scan

func (e *Engine) Scan(ctx context.Context, opts ...ScanOption) iter.Seq2[*model.Record, error]

Scan returns an iterator over all records matching the filter. Uses Go 1.23+ iter.Seq2 for best-in-class ergonomics and performance.

func (*Engine) Search

func (e *Engine) Search(ctx context.Context, q []float32, k int, opts ...func(*model.SearchOptions)) ([]model.Candidate, error)

Search performs a k-NN search and returns a slice of candidates. This is a convenience wrapper around SearchIter. If options.Stats is non-nil, it will be populated with query execution statistics.

func (*Engine) SearchIter

func (e *Engine) SearchIter(ctx context.Context, q []float32, k int, opts ...func(*model.SearchOptions)) iter.Seq2[model.Candidate, error]

SearchIter performs a k-NN search and yields results via an iterator. This allows for zero-copy streaming of results.

New in v1.0: Returns iter.Seq2 for high-performance streaming.

func (*Engine) SearchThreshold

func (e *Engine) SearchThreshold(ctx context.Context, q []float32, threshold float32, maxResults int, opts ...func(*model.SearchOptions)) ([]model.Candidate, error)

SearchThreshold returns all vectors within the given distance threshold. It uses maxResults to bound the search.

func (*Engine) SegmentInfo

func (e *Engine) SegmentInfo() []manifest.SegmentInfo

SegmentInfo returns information about all segments in the engine.

func (*Engine) Stats

func (e *Engine) Stats() EngineStats

Stats returns the current engine statistics.

func (*Engine) Vacuum

func (e *Engine) Vacuum(ctx context.Context) error

Vacuum cleans up old versions based on the retention policy. It deletes old manifest files and any segments that are no longer referenced by kept manifest versions.

type EngineStats

type EngineStats struct {
	ManifestID       uint64 // Current manifest version (for time-travel)
	SegmentCount     int
	RowCount         int
	TombstoneCount   int
	DiskUsageBytes   int64
	MemoryUsageBytes int64 // L0 + overhead
}

EngineStats contains runtime statistics for the engine.

type FlushConfig

type FlushConfig struct {
	// MaxMemTableSize is the maximum size of the MemTable in bytes before a flush is triggered.
	// If 0, defaults to 64MB.
	MaxMemTableSize int64
}

FlushConfig holds configuration for automatic flushing.

type GraphIndexSegment

type GraphIndexSegment interface {
	HasGraphIndex() bool
}

GraphIndexSegment is an optional interface for segments that have graph-based indexes. Used to distinguish between flat segments (brute-force) and graph segments (HNSW/DiskANN).

type LeveledCompactionPolicy

type LeveledCompactionPolicy struct {
	L0Threshold int   // Number of files in L0 to trigger compaction (default 4)
	LevelRatio  int   // Growth ratio between levels (default 10)
	BaseSize    int64 // Target size of L1 (default 100MB)
	MaxLevels   int   // Maximum number of levels (default 7)
}

LeveledCompactionPolicy implements a Level-Based compaction strategy. Features: - L0: Overlapping segments (flushed from MemTable). Size unconstrained. - L1..N: Non-overlapping segments (target size). - Strategy:

  • If L0 count > 4 -> Compact all L0 -> L1.
  • If size(L_i) > 10^i * BaseSize -> Select 1 segment from L_i + overlapping from L_{i+1} -> L_{i+1}.

For VectorDB simplicity, we simplify "overlapping" to just "merge N segments from L_i to L_{i+1}".

func NewLeveledCompactionPolicy

func NewLeveledCompactionPolicy() *LeveledCompactionPolicy

func (*LeveledCompactionPolicy) Pick

type MemTableFilterCursor

type MemTableFilterCursor interface {
	segment.Segment
	FilterCursor(filter *metadata.FilterSet) imetadata.FilterCursor
}

MemTableFilterCursor is the interface for memtables that support cursor filtering.

type MetricsObserver

type MetricsObserver interface {
	// Write path
	OnInsert(latency time.Duration, err error)
	OnDelete(latency time.Duration, err error)
	OnMemTableStatus(sizeBytes int64, percentFull float64)
	OnBackpressure(reason string)

	// Read path
	OnSearch(latency time.Duration, segmentType string, k int, retrieved int, err error)
	OnGet(latency time.Duration, err error)

	// Background operations
	OnFlush(duration time.Duration, rows int, bytes uint64, err error)
	OnCompaction(duration time.Duration, dropped int, created int, err error)
	OnBuild(duration time.Duration, indexType string, err error)

	// Generic counts/gauges
	OnQueueDepth(name string, depth int)
	OnThroughput(name string, bytes int64)
}

MetricsObserver defines the interface for observing engine events.

type NoopMetricsObserver

type NoopMetricsObserver struct{}

NoopMetricsObserver is a no-op implementation of MetricsObserver.

func (*NoopMetricsObserver) OnBackpressure

func (o *NoopMetricsObserver) OnBackpressure(string)

func (*NoopMetricsObserver) OnBuild

func (*NoopMetricsObserver) OnCompaction

func (o *NoopMetricsObserver) OnCompaction(time.Duration, int, int, error)

func (*NoopMetricsObserver) OnDelete

func (o *NoopMetricsObserver) OnDelete(time.Duration, error)

func (*NoopMetricsObserver) OnFlush

func (*NoopMetricsObserver) OnGet

func (*NoopMetricsObserver) OnInsert

func (o *NoopMetricsObserver) OnInsert(time.Duration, error)

func (*NoopMetricsObserver) OnMemTableStatus

func (o *NoopMetricsObserver) OnMemTableStatus(int64, float64)

func (*NoopMetricsObserver) OnQueueDepth

func (o *NoopMetricsObserver) OnQueueDepth(string, int)

func (*NoopMetricsObserver) OnSearch

func (o *NoopMetricsObserver) OnSearch(time.Duration, string, int, int, error)

func (*NoopMetricsObserver) OnThroughput

func (o *NoopMetricsObserver) OnThroughput(string, int64)

type OpType

type OpType uint8

OpType represents the type of operation in a batch.

const (
	OpInsert OpType = iota
	OpDelete
)

type Option

type Option func(*Engine)

Option defines a configuration option for the Engine.

func ReadOnly

func ReadOnly() Option

ReadOnly puts the engine in read-only mode. In this mode:

  • No writes, purely stateless
  • Insert/Delete operations return ErrReadOnly
  • No local state is required (pure memory cache)
  • Ideal for stateless serverless search nodes

Use with OpenRemote() for horizontally scalable search replicas.

func WithBlockCacheBlockSize

func WithBlockCacheBlockSize(size int64) Option

WithBlockCacheBlockSize sets the size of blocks in the block cache. Defaults to 4KB (4096 bytes). For S3/Cloud stores, use 1MB+.

func WithBlockCacheSize

func WithBlockCacheSize(size int64) Option

WithBlockCacheSize sets the size of the block cache in bytes. If 0, defaults to 256MB.

func WithCacheDir

func WithCacheDir(dir string) Option

WithCacheDir sets the local directory for caching remote data. Only applicable when opening a remote store.

func WithCompactionConfig

func WithCompactionConfig(cfg CompactionConfig) Option

WithCompactionConfig sets the compaction configuration.

func WithCompactionPolicy

func WithCompactionPolicy(policy CompactionPolicy) Option

WithCompactionPolicy sets the compaction policy used by the background loop. If unset, the engine uses a default size-tiered policy.

func WithCompactionThreshold

func WithCompactionThreshold(threshold int) Option

WithCompactionThreshold sets the segment-count threshold for the default size-tiered compaction policy.

func WithDimension

func WithDimension(dim int) Option

WithDimension sets the vector dimension for the engine. Required when creating a new index.

func WithDiskANNThreshold

func WithDiskANNThreshold(threshold int) Option

WithDiskANNThreshold sets the number of vectors required to build a DiskANN segment. Segments smaller than this will use a Flat (IVF) index.

func WithDiskCache

func WithDiskCache(dir string, size, blockSize int64) Option

WithDiskCache enables a secondary disk-based cache. dir: directory to store cache files. size: maximum size in bytes. blockSize: size of blocks on disk (e.g. 1MB or 4MB).

func WithFileSystem

func WithFileSystem(fs fs.FileSystem) Option

WithFileSystem sets the file system for the engine. This is primarily used for testing and fault injection.

func WithFlushConfig

func WithFlushConfig(cfg FlushConfig) Option

WithFlushConfig sets the flush configuration.

func WithLexicalIndex

func WithLexicalIndex(idx lexical.Index, field string) Option

WithLexicalIndex sets the lexical index and the metadata field to index.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets the logger for the engine.

func WithMemoryLimit

func WithMemoryLimit(bytes int64) Option

WithMemoryLimit sets the memory limit for the engine in bytes. If set to 0, memory is unlimited.

func WithMetric

func WithMetric(m distance.Metric) Option

WithMetric sets the distance metric for the engine. Required when creating a new index.

func WithMetricsObserver

func WithMetricsObserver(observer MetricsObserver) Option

WithMetricsObserver sets the metrics observer for the engine.

func WithQuantization

func WithQuantization(t quantization.Type) Option

WithQuantization sets the vector quantization method (e.g. PQ, SQ8, INT4). This applies to new segments created during flush/compaction.

func WithResourceController

func WithResourceController(rc *resource.Controller) Option

WithResourceController sets the resource controller for the engine.

func WithRetentionPolicy

func WithRetentionPolicy(p RetentionPolicy) Option

WithRetentionPolicy sets the retention policy for vacuuming/compaction.

func WithSchema

func WithSchema(schema metadata.Schema) Option

WithSchema sets the metadata schema for the engine.

func WithTimestamp

func WithTimestamp(t time.Time) Option

WithTimestamp opens the database at the state closest to the given time (Time Travel). The engine will be in Read-Only mode.

func WithVersion

func WithVersion(v uint64) Option

WithVersion opens the database at a specific version ID (Time Travel). The engine will be in Read-Only mode.

type RefCountedSegment

type RefCountedSegment struct {
	segment.Segment
	// contains filtered or unexported fields
}

RefCountedSegment wraps a Segment with a reference count.

func NewRefCountedSegment

func NewRefCountedSegment(seg segment.Segment) *RefCountedSegment

func (*RefCountedSegment) DecRef

func (r *RefCountedSegment) DecRef()

func (*RefCountedSegment) IncRef

func (r *RefCountedSegment) IncRef()

func (*RefCountedSegment) SetOnClose

func (r *RefCountedSegment) SetOnClose(f func())

SetOnClose sets a callback function to be executed when the segment is closed. This is typically used to delete the underlying file.

func (*RefCountedSegment) UnwrapSegment

func (r *RefCountedSegment) UnwrapSegment() segment.Segment

UnwrapSegment returns the underlying segment, implementing SegmentUnwrapper. This is used by segmentHasGraphIndex to access the real segment type.

type RetentionPolicy

type RetentionPolicy struct {
	// KeepVersions is the minimum number of recent versions to keep.
	KeepVersions int
	// KeepDuration is the minimum duration of history to keep.
	KeepDuration time.Duration
}

RetentionPolicy defines rules for retaining old versions.

type ScanConfig

type ScanConfig struct {
	BatchSize int
	Filter    *metadata.Filter
}

ScanConfig holds configuration for Scan.

type ScanOption

type ScanOption func(*ScanConfig)

ScanOption configures a Scan operation.

func WithScanBatchSize

func WithScanBatchSize(n int) ScanOption

WithScanBatchSize sets the batch size for prefetching (hint).

func WithScanFilter

func WithScanFilter(f *metadata.Filter) ScanOption

WithScanFilter sets a metadata filter for the scan.

type SeekableBuffer

type SeekableBuffer struct {
	// contains filtered or unexported fields
}

SeekableBuffer is an in-memory buffer that implements io.Writer, io.Seeker, and io.Reader. It's used for building segments in memory before uploading to cloud storage.

func NewSeekableBuffer

func NewSeekableBuffer() *SeekableBuffer

NewSeekableBuffer creates a new SeekableBuffer.

func (*SeekableBuffer) Bytes

func (b *SeekableBuffer) Bytes() []byte

Bytes returns the underlying byte slice.

func (*SeekableBuffer) Len

func (b *SeekableBuffer) Len() int

Len returns the length of the buffer.

func (*SeekableBuffer) Read

func (b *SeekableBuffer) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*SeekableBuffer) Reset

func (b *SeekableBuffer) Reset()

Reset clears the buffer.

func (*SeekableBuffer) Seek

func (b *SeekableBuffer) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker.

func (*SeekableBuffer) Sync

func (b *SeekableBuffer) Sync() error

Sync is a no-op for in-memory buffers.

func (*SeekableBuffer) Write

func (b *SeekableBuffer) Write(p []byte) (n int, err error)

Write implements io.Writer.

type SegmentStats

type SegmentStats struct {
	ID    model.SegmentID
	Size  int64
	Level int
	MinID model.ID
	MaxID model.ID
}

SegmentStats holds metadata about a segment needed for compaction decisions.

type SegmentUnwrapper

type SegmentUnwrapper interface {
	UnwrapSegment() segment.Segment
}

SegmentUnwrapper is an interface for wrappers (like RefCountedSegment) that embed a segment.

type SegmentWithFilterCursor

type SegmentWithFilterCursor interface {
	segment.Segment
	FilterCursor(filter *metadata.FilterSet) imetadata.FilterCursor
}

SegmentWithFilterCursor is the interface for segments that support cursor-based filtering. This is the preferred interface for zero-allocation filter evaluation.

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

Snapshot represents a consistent view of the database.

func NewSnapshot

func NewSnapshot(active *memtable.MemTable, lsn uint64) *Snapshot

func (*Snapshot) Clone

func (s *Snapshot) Clone() *Snapshot

Clone creates a new Snapshot sharing the same segments (inc refs) and active memtable. Note: sortedSegments is NOT cloned; caller must call RebuildSorted() after modifications.

func (*Snapshot) CloneShared

func (s *Snapshot) CloneShared() *Snapshot

CloneShared creates a new Snapshot sharing the same segments (inc refs) and active memtable.

func (*Snapshot) DecRef

func (s *Snapshot) DecRef()

func (*Snapshot) IncRef

func (s *Snapshot) IncRef()

func (*Snapshot) RebuildSorted

func (s *Snapshot) RebuildSorted()

RebuildSorted rebuilds the sortedSegments slice. Must be called before the snapshot is published or used for search.

func (*Snapshot) TryIncRef

func (s *Snapshot) TryIncRef() bool

TryIncRef attempts to increment the reference count. Returns true if successful, false if the snapshot is already destroyed (refs == 0).

type TieredCompactionPolicy

type TieredCompactionPolicy struct {
	Threshold int
}

TieredCompactionPolicy implements a simple size-tiered compaction strategy. It triggers compaction when there are at least `Threshold` segments.

func (*TieredCompactionPolicy) Pick

func (p *TieredCompactionPolicy) Pick(segments []SegmentStats) *CompactionTask

type TombstoneFilter

type TombstoneFilter struct {
	// contains filtered or unexported fields
}

TombstoneFilter adapts VersionedTombstones to segment.Filter interface

func NewTombstoneFilter

func NewTombstoneFilter(vt *VersionedTombstones, snapshotLSN uint64) *TombstoneFilter

NewTombstoneFilter creates a new filter with a snapshot of the current state.

func (*TombstoneFilter) AsBitmap

func (tf *TombstoneFilter) AsBitmap() segment.Bitmap

func (*TombstoneFilter) Matches

func (tf *TombstoneFilter) Matches(rowID uint32) bool

func (*TombstoneFilter) MatchesBatch

func (tf *TombstoneFilter) MatchesBatch(ids []uint32, out []bool)

func (*TombstoneFilter) MatchesBlock

func (tf *TombstoneFilter) MatchesBlock(stats map[string]segment.FieldStats) bool

type VacuumStats

type VacuumStats struct {
	ManifestsDeleted int
	SegmentsDeleted  int
	BytesReclaimed   int64
}

VacuumStats holds results of the vacuum operation.

type VersionedTombstones

type VersionedTombstones struct {
	// contains filtered or unexported fields
}

VersionedTombstones tracks deletion LSNs using a sharded-mutex + COW architecture.

Architecture:

  • Reads are lock-free via atomic.Pointer (O(1) wait-free)
  • Writes use sharded mutexes (by rowID) for zero-allocation updates
  • Chunk data is copy-on-write for snapshot isolation

This design provides:

  • O(1) reads without locks
  • O(1) writes with minimal contention (64 shards)
  • Zero allocations on successful writes to existing chunks
  • Memory efficiency via COW (only modified chunks are copied)

func NewVersionedTombstones

func NewVersionedTombstones(initialCapacity int) *VersionedTombstones

func (*VersionedTombstones) Count

func (vt *VersionedTombstones) Count(snapshotLSN uint64) int

Count returns the number of deleted rows visible at the given snapshot LSN.

func (*VersionedTombstones) IsDeleted

func (vt *VersionedTombstones) IsDeleted(rowID uint32, snapshotLSN uint64) bool

IsDeleted returns true if the row is deleted at the snapshot LSN. This operation is wait-free.

func (*VersionedTombstones) LoadFromBitmap

func (vt *VersionedTombstones) LoadFromBitmap(bm *imetadata.LocalBitmap)

LoadFromBitmap populates tombstones from a bitmap with LSN 1 (Legacy/Restart). This is a bulk operation used during recovery; not a hot path.

func (*VersionedTombstones) MarkDeleted

func (vt *VersionedTombstones) MarkDeleted(rowID uint32, lsn uint64)

MarkDeleted marks a row as deleted at the given LSN. Uses sharded mutexes for zero-allocation updates with minimal contention. Reads remain lock-free via atomic state pointer.

func (*VersionedTombstones) ToBitmap

func (vt *VersionedTombstones) ToBitmap(snapshotLSN uint64) *imetadata.LocalBitmap

ToBitmap converts the tombstones to a LocalBitmap visible at snapshotLSN.

type WriteBatch

type WriteBatch struct {
	// contains filtered or unexported fields
}

WriteBatch accumulates a batch of operations to be executed atomically. It is not thread-safe and should be used by a single goroutine.

func NewWriteBatch

func NewWriteBatch() *WriteBatch

NewWriteBatch creates a new empty WriteBatch.

func (*WriteBatch) AddDelete

func (b *WriteBatch) AddDelete(id model.ID)

AddDelete adds a delete operation to the batch.

func (*WriteBatch) AddInsert

func (b *WriteBatch) AddInsert(vector []float32, md metadata.Document, payload []byte)

AddInsert adds an insert operation to the batch.

func (*WriteBatch) Clear

func (b *WriteBatch) Clear()

Clear resets the batch for reuse.

func (*WriteBatch) Len

func (b *WriteBatch) Len() int

Len returns the number of operations in the batch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL